Spark删除redis千万级别set集合数据实现分析

 更新时间:2023年06月20日 10:08:24   作者:spark打酱油  
这篇文章主要为大家介绍了Spark删除redis千万级别set集合数据实现过程分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

1.使用pipline的原因

Redis 使用的是客户端-服务器(CS)模型和请求/响应协议的 TCP 服务器。

这意味着通常情况下一个请求会遵循以下步骤:

  • 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。
  • 管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline 通过减少客户端与 redis 的通信次数来实现降低往返延时时间,而且 Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。

通俗点:

  • pipeline就是把一组命令进行打包,然后一次性通过网络发送到Redis。同时将执行的结果批量的返回回来
  • pipelined.sync()表示我一次性的异步发送到redis,不关注执行结果。
  • pipeline.syncAndReturnAll ();将返回执行过的命令结果返回到List列表中

2.方法

2.1写入redis的方法

2.1.1参数说明

sc:SparkContext Spark上下文
spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点

def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={
    // spark读取数据集
     val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet")
    df.show(1,false)
    val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r"))
       // 这个集合写的是2000多万的数据
    sc.toRedisSET(rdd,"test:task:deplicate")
  }

2.2读取本地待删除数据的方法

2.2.1参数说明

sc:SparkContext Spark上下文

spark:SparkSession 使用Dataset和DataFrame API编程Spark的入口点

def readParquet(spark: SparkSession,path:String): RDD[String] ={
    val df: DataFrame = spark.read.parquet(path)
    val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r"))
    // 返回String类型的RDD
    strRDD
  }

2.3调用pipline删除的方法

2.3.1参数说明

collectionName 其中redis set集合的名称

num是要删除的数据量是多少

arr是要删除的数据存放的是set集合的key

jedis是redis的客户端

def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {
    try{
        val pipeline: Pipeline = jedis.pipelined()
        // 选择数据库  默认为 0
        pipeline.select(1)
        for(i <- 0 to (num - 1) ){
          pipeline.srem(collectionName,arr(i))
        }
        //表示我一次性的异步发送到redis,不关注执行结果
        pipeline.sync()
    }catch {
      case e : JedisException => e.printStackTrace()
    }finally if(jedis !=null) jedis.close()
  }

3.完整代码

import com.redislabs.provider.redis._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.exceptions.JedisException
import redis.clients.jedis.{Jedis, Pipeline}
/**
  * Date 2022/5/25 17:57
  */
object DelRedis {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      // 驱动进程使用的内核数,仅在集群模式下使用。
      .set("spark.driver.cores","5")
      /**
        * 驱动进程使用的内存数量,也就是SparkContext初始化的地方,
        * 其格式与JVM内存字符串具有大小单位后缀(“k”,“m”,“g”或“t”)(例如512m, 2g)相同。
        * 注意:在客户端模式下,不能直接在应用程序中通过SparkConf设置此配置,因为此时驱动程
        * 序JVM已经启动。相反,请通过——driver-memory命令行选项或在默认属性文件中设置。
        */
      .set("spark.driver.memory","5g")
      /**
        * 限制每个Spark操作(例如collect)的所有分区的序列化结果的总大小(以字节为单位)。
        * 应该至少是1M,或者0表示无限制。如果总大小超过此限制,则作业将被终止。
        * 过高的限制可能会导致驱动程序内存不足错误(取决于spark.driver.memory和JVM中对象的内存开销)。
        * 设置适当的限制可以防止驱动程序出现内存不足的错误。
        */
      .set("spark.driver.maxResultSize","10g")
      /**
        * 每个执行程序进程使用的内存数量,
        * 格式与带有大小单位后缀(“k”,“m”,“g”或“t”)的JVM内存字符串相同(例如512m, 2g)。
        *
        */
      .set("spark.executor.memory","5g")
      /**
        * 默认 1在YARN模式下,worker上所有可用的内核在standalone和Mesos粗粒度模式下。
        */
      .set("spark.executor.cores","5")
    val spark: SparkSession = SparkSession.builder().appName("DelRedis").master("local[*]")
      .config("spark.redis.host","192.168.100.201")
      .config("spark.redis.port","6379")
      .config("spark.redis.db","1")     // 可选的数据库编号。避免使用它,尤其是在集群模式下,redisRedis默认支持16个数据库,默认是选择数据库0,这里设置为1。
      .config("spark.redis.timeout","2000000")   // 连接超时,以毫秒为单位,默认为 2000 毫秒
      .config(conf)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
     //1.写入数据集
   writeRedis(sc,spark)
    // 2.读取待删除的数据key
      val path = "file:///F://delRedisData//test.parquet"
    val rdd: RDD[String] = readParquet(spark,path)
    //3.使用redis 中的 pipeline 方法 进行删除操作
    rdd.foreachPartition(iter=>{
      // 连接redis客户端
      val jedis = new Jedis("192.168.100.201",6379)
      val array: Array[String] = iter.toArray
      val length: Int = array.length
      val beginTime: Long = System.currentTimeMillis()
      delPipleine(collectionName,length,array,jedis)
      val endTime: Long = System.currentTimeMillis()
      println("删除:"+length+"条数据,耗时:"+(endTime-beginTime)/1000+"秒")
    })
    sc.stop()
    spark.stop()
  }
def delPipleine(collectionName:String,num:Int,arr:Array[String],jedis:Jedis):Unit = {
    try{
        val pipeline: Pipeline = jedis.pipelined()
        // 选择数据库  默认为 0
        pipeline.select(1)
        for(i <- 0 to (num - 1) ){
          pipeline.srem(collectionName,arr(i))
        }
        //表示我一次性的异步发送到redis,不关注执行结果
        pipeline.sync()
    }catch {
      case e : JedisException => e.printStackTrace()
    }finally if(jedis !=null) jedis.close()
  }
def writeRedis(sc: SparkContext,spark: SparkSession): Unit ={
    // spark读取数据集
    val df: DataFrame = spark.read.parquet("file:///F://delRedisData//1//delData.snappy.parquet")
    df.show(1,false)
    val rdd: RDD[String] = df.rdd.map(x=>x.getAs[String]("r"))
       // 这个集合写的是2000多万的数据
    sc.toRedisSET(rdd,"test:task:deplicate")
  }
def readParquet(spark: SparkSession,path:String): RDD[String] ={
    val df: DataFrame = spark.read.parquet(path)
    val strRDD: RDD[String] = df.rdd.map(_.getAs[String]("r"))
    // 返回String类型的RDD
    strRDD
  }
  }

4.总结

经检测:redis 的 pipeline(管道)方法 ,经单机版的redis测试 ,百万级别数据删除仅需要1分钟左右与硬件有关,还包括读取数据的时长等方面原因

以上就是Spark删除redis千万级别set集合数据实现分析的详细内容,更多关于Spark删除redis set集合的资料请关注脚本之家其它相关文章!

相关文章

  • 编译安装redisd的方法示例详解

    编译安装redisd的方法示例详解

    这篇文章主要介绍了编译安装redisd的方法示例详解,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-02-02
  • Java实现多级缓存的方法详解

    Java实现多级缓存的方法详解

    对于高并发系统来说,有三个重要的机制来保障其高效运行,它们分别是:缓存、限流和熔断,所以本文就来和大家探讨一下多级缓存的实现方法,希望对大家有所帮助
    2024-02-02
  • 一文详解如何停止/重启/启动Redis服务

    一文详解如何停止/重启/启动Redis服务

    Redis是当前比较热门的NOSQL系统之一,它是一个key-value存储系统,这篇文章主要给大家介绍了关于如何停止/重启/启动Redis服务的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-03-03
  • Redis如何存储对象与集合示例详解

    Redis如何存储对象与集合示例详解

    redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、 zset(sorted set --有序集合)和hash(哈希类型)本文介绍了关于Redis是如何存储对象与集合的相关资料,需要的朋友可以参考下
    2018-05-05
  • Redis设置Hash数据类型的过期时间

    Redis设置Hash数据类型的过期时间

    在Redis中,我们可以使用Hash数据结构来存储一组键值对,而有时候,我们可能需要设置这些键值对的过期时间,本文主要介绍了Redis设置Hash数据类型的过期时间,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01
  • Redis面试必备之缓存设计规范与性能优化详解

    Redis面试必备之缓存设计规范与性能优化详解

    你是否在使用Redis时,不清楚Redis应该遵循的设计规范而苦恼,你是否在Redis出现性能问题时,不知道该如何优化而发愁,快跟随小编一起学习起来吧
    2024-03-03
  • Redis+Caffeine两级缓存的实现

    Redis+Caffeine两级缓存的实现

    本文主要介绍了Redis+Caffeine两级缓存的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • 关于Redis中bitmap的原理和使用详解

    关于Redis中bitmap的原理和使用详解

    这篇文章主要介绍了关于Redis中bitmap的原理和使用详解,BitMap即位图,使用每个位表示某种状态,适合处理整型的海量数据,本质上是哈希表的一种应用实现,需要的朋友可以参考下
    2023-05-05
  • 使用Redis实现向量相似度搜索

    使用Redis实现向量相似度搜索

    在自然语言处理领域,有一个常见且重要的任务就是文本相似度搜索,所以本文为大家介绍一下如何利用Redis实现向量相似度搜索,解决文本、图像和音频之间的相似度匹配问题,需要的可以了解下
    2023-07-07
  • 详解redis端口号

    详解redis端口号

    在本篇内容中我们给大家整理了关于redis端口号的相关知识点内容,有兴趣的朋友们学习下。
    2019-06-06

最新评论