Redis如何使用Pipeline实现批处理操作

 更新时间:2025年02月17日 10:03:55   作者:记得开心一点嘛  
Redis Pipeline 是一种优化 Redis 操作的机制,通过将多个命令打包发送到 Redis 服务器,减少客户端与服务器之间的网络往返时间,本文主要来聊聊Redis如何使用Pipeline实现批处理操作,需要的可以了解下

在正常情况下,我们每次发送 Redis 命令时,客户端会等待 Redis 服务器的响应,直到接收到结果后,才会发送下一个命令。这种方式虽然保证了操作的顺序性,但在执行大量命令时会产生很大的网络延迟。

通过 Pipeline 技术,我们的客户端可以将多个命令同时发送给 Redis 服务器,并且不需要等待每个命令的返回结果,直到所有命令都被执行完毕,客户端再一起获取返回值。这样能减少每个命令的等待时间,大幅提高执行效率。

Redis Pipeline 是一种优化 Redis 操作的机制,通过将多个命令打包发送到 Redis 服务器,减少客户端与服务器之间的网络往返时间(RTT),从而显著提升性能。 

在默认情况下,Redis 客户端与服务器之间的通信是请求-响应模式,即:

1客户端发送一个命令到服务器。

2.服务器执行命令并返回结果。

3.客户端等待响应后再发送下一个命令。

这种模式在命令数量较少时没有问题,但在需要执行大量命令时,网络往返时间(RTT)会成为性能瓶颈。所以我们需要实现下面目的:

1.将多个命令打包发送到服务器。

2.服务器依次执行这些命令,并将结果一次性返回给客户端。

3.减少网络开销,提升性能。

以下是一个简单的 Java 示例,展示了如何使用 Jedis(Redis 的一个 Java 客户端)执行 Pipeline:

注意:批处理时不建议一次携带太多命令,并且Pipeline的多个命令之间不具备原子性。

// 创建 Jedis 实例
Jedis jedis = new Jedis("localhost", 6379);
 
// 使用 pipelining 方式批量执行命令
Pipeline pipeline = jedis.pipelined();
 
// 批量操作:使用 pipeline 来缓存命令
for (int i = 0; i < 1000; i++) {
    pipeline.set("key" + i, "value" + i);
}
 
// 同步执行所有命令
pipeline.sync();

pipelined() 方法: 创建一个 Pipeline 对象,它缓存所有要执行的命令。

批量设置命令: 通过 pipeline.set() 将多个 SET 命令放入管道中,但命令并不会立即执行。

sync() 方法: 通过调用 sync() 方法,客户端将会把所有缓存的命令一次性发送给 Redis,并等待它们完成执行。

但是这些都是在单机模式下的批处理,那对于集群来说该如何使用呢?

向MSet或Pipeline这样的批处理需要在一次请求中携带多条命令,而此时如何Redis是一个集群,那批处理命令的多个key必须落在同一个插槽中,否则就会导致执行失败。

一般推荐使用并行插槽来解决,如果使用hash_tag,可能会出现大量的key分配同一插槽导致数据倾斜,而并行插槽不会。

那么这里我们模拟一下并行插槽实现:

将多个键值对按照Redis集群的槽位进行分组,然后分别使用jedisCluster.mset()方法按组设置键值对。

public class JedisClusterTest {
 
    // 声明一个JedisCluster对象,用于与Redis集群进行交互
    private JedisCluster jedisCluster;
 
    // 在每个测试方法执行之前,初始化JedisCluster连接
    @BeforeEach
    void setUp() {
        // 配置Jedis连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(8);
        poolConfig.setMaxIdle(8);
        poolConfig.setMinIdle(0);
        poolConfig.setMaxWaitMillis(1000);
 
        // 创建一个HashSet,用于存储Redis集群的节点信息
        HashSet<HostAndPort> nodes = new HashSet<>();
        // 添加Redis集群的节点信息(IP和端口)
        nodes.add(new HostAndPort("192.168.150.101", 7001));
        nodes.add(new HostAndPort("192.168.150.101", 7002));
        nodes.add(new HostAndPort("192.168.150.101", 7003));
        nodes.add(new HostAndPort("192.168.150.101", 8001));
        nodes.add(new HostAndPort("192.168.150.101", 8002));
        nodes.add(new HostAndPort("192.168.150.101", 8003));
 
        // 使用配置的连接池和节点信息初始化JedisCluster对象
        jedisCluster = new JedisCluster(nodes, poolConfig);
    }
 
    // 测试方法:使用mset命令一次性设置多个键值对
    @Test
    void testMSet() {
        // 使用JedisCluster的mset方法,一次性设置多个键值对
        // 但是jedisCluster默认是无法解决批处理问题的,需要我们手动解决
        jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");
    }
 
    // 测试方法:使用mset命令按槽位分组设置多个键值对
    @Test
    void testMSet2() {
        // 创建一个HashMap,用于存储多个键值对
        Map<String, String> map = new HashMap<>(3);
        map.put("name", "Jack");
        map.put("age", "21");
        map.put("sex", "Male");
 
        // 将map中的键值对按照Redis集群的槽位进行分组
        Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet()
                .stream()
                .collect(Collectors.groupingBy(
                        // 使用ClusterSlotHashUtil计算每个键对应的槽位
                        entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey()))
                );
 
        // 遍历按哈希槽分组后的结果
        for (List<Map.Entry<String, String>> list : result.values()) {
            // 创建一个数组用于批量设置Redis的键值对
            String[] arr = new String[list.size() * 2];  // 每个键值对包含两个元素
            int j = 0;  // 索引变量,用于在数组中定位位置
            for (int i = 0; i < list.size(); i++) {
                j = i << 1;  // 通过位移计算数组中的位置
                Map.Entry<String, String> e = list.get(i);  // 获取当前的键值对
                arr[j] = e.getKey();  // 将键放入数组中
                arr[j + 1] = e.getValue();  // 将值放入数组中
            }
            // 批量设置Redis集群中的键值对
            jedisCluster.mset(arr);
        }
    }
 
    // 在每个测试方法执行之后,关闭JedisCluster连接
    @AfterEach
    void tearDown() {
        // 如果JedisCluster对象不为空,则关闭连接
        if (jedisCluster != null) {
            jedisCluster.close();
        }
    }
}

而在Redis集群环境下,如果需要批量获取多个键的值,可以使用multiGet方法。multiGet是RedisTemplate提供的一个方法,用于一次性获取多个键的值。然而,需要注意的是,multiGet在集群环境下要求所有键必须位于同一个槽位(slot),否则会抛出异常。

@Service
public class RedisService {
 
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    /**
     * 跨槽位批量获取多个键的值
     */
    public Map<String, Object> batchGetCrossSlot(List<String> keys) {
        // 按槽位分组
        Map<Integer, List<String>> slotKeyMap = keys.stream()
                .collect(Collectors.groupingBy(ClusterSlotHashUtil::calculateSlot));
 
        // 存储最终结果
        Map<String, Object> result = new HashMap<>();
 
        // 对每个槽位的键分别调用multiGet
        for (Map.Entry<Integer, List<String>> entry : slotKeyMap.entrySet()) {
            List<String> slotKeys = entry.getValue();
            List<Object> slotValues = redisTemplate.opsForValue().multiGet(slotKeys);
 
            // 将结果存入Map
            for (int i = 0; i < slotKeys.size(); i++) {
                result.put(slotKeys.get(i), slotValues.get(i));
            }
        }
 
        return result;
    }
 
    /**
     * 测试跨槽位批量获取方法
     */
    public void testBatchGetCrossSlot() {
        List<String> keys = Arrays.asList("name", "age", "sex");
        Map<String, Object> values = batchGetCrossSlot(keys);
 
        // 打印结果
        values.forEach((key, value) -> {
            System.out.println("Key: " + key + ", Value: " + value);
        });
    }
}

到此这篇关于Redis如何使用Pipeline实现批处理操作的文章就介绍到这了,更多相关Redis Pipeline批处理操作内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Linux安装单机版Redis的完整步骤

    Linux安装单机版Redis的完整步骤

    这篇文章主要给大家介绍了关于Linux安装单机版Redis的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-09-09
  • Redis 中的热点键和数据倾斜示例详解

    Redis 中的热点键和数据倾斜示例详解

    热点键是指在 Redis 中被频繁访问的特定键,这些键由于其高访问频率,可能导致 Redis 服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis 中的热点键和数据倾斜,感兴趣的朋友一起看看吧
    2025-03-03
  • reids自定义RedisTemplate以及乱码问题解决

    reids自定义RedisTemplate以及乱码问题解决

    本文主要介绍了reids自定义RedisTemplate以及乱码问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-04-04
  • Redis集群的三种部署方式及三种应用问题的处理

    Redis集群的三种部署方式及三种应用问题的处理

    这篇文章主要介绍了Redis集群的三种部署方式及三种应用问题的处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • Redis集群(cluster模式)搭建过程

    Redis集群(cluster模式)搭建过程

    文章介绍了Redis集群的概念、使用原因和搭建方法,Redis集群通过分区实现数据水平扩容,提供了一定的可用性,文章详细阐述了集群的连接方式,解释了如何分配节点,并提供了详细的集群搭建步骤,包括创建节点、清空数据、修改配置、启动节点、配置集群等
    2024-10-10
  • 使用JMeter插件Redis Data Set如何实现高性能数据驱动测试

    使用JMeter插件Redis Data Set如何实现高性能数据驱动测试

    RedisDataSet插件是JMeter的一个插件,可以实现从Redis中动态加载数据,并将其用作测试参数,本文详细介绍如何在JMeter中使用RedisDataSet插件,帮助你实现高效的数据驱动测试
    2025-01-01
  • Redis实现附近商铺的项目实战

    Redis实现附近商铺的项目实战

    本文主要介绍了Redis实现附近商铺的项目实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-01-01
  • windows下通过批处理脚本启动redis的操作

    windows下通过批处理脚本启动redis的操作

    本文主要给大家介绍了windows下通过批处理脚本启动redis的操作,windows下redis启动,需要进入redis安装目录,然后shift+右键,选择“在此处打开命令窗口”,然后输入redis-server.exe redis.conf,就可以启动redis了,文中有详细的图文参考,感兴趣的朋友可以参考下
    2023-12-12
  • redis使用watch秒杀抢购实现思路

    redis使用watch秒杀抢购实现思路

    这篇文章主要为大家详细介绍了redis使用watch秒杀抢购的实现思路,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-02-02
  • redis执行lua脚本的实现

    redis执行lua脚本的实现

    本文主要介绍了redis执行lua脚本的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-10-10

最新评论