redis cluster集群模式下实现批量可重入锁

 更新时间:2024年02月28日 09:32:17   作者:RachelHwang  
本文主要介绍了使用redis cluster集群版所遇到的问题解决方案及redis可重入锁是否会有死锁的问题等,具有一定的参考价值,感兴趣的可以了解一下

一、redis cluster 集群版

在Redis 3.0版本以后,Redis发布了Redis Cluster。该集群主要支持搞并发和海量数据处理等优势,当 Redis 在集群模式下运行时,它处理数据存储的方式与作为单个实例运行时不同。这是因为它应该准备好跨多个节点分发数据,从而实现水平可扩展性。具体能力表现为:

  • 自动分割数据到不同的节点上
  • 整个集群的部分节点失败或者不可达的情况下能够继续处理命令

Redis没有使用一致性hash,而是引入哈希槽的概念,也就是 Hash Slot。Redis集群由16384个哈希槽slot,每个key通过CRC16校验后对16384取模来决定放置那个槽,集群的每个节点负责一部分hash槽,也就是说数据存放在hash槽里,而每个节点只负责部分hash槽(这样数据就存放在不同的节点)。

例如:node1、node2、node3三个节点,node1节点负责0到5500号hash槽,node2节点负责5501到11000号hash槽,node3节点负责11001到16384号hash槽。这种结构很容易添加或者删除节点,比如如果我想新添加个节点node4, 我需要从节点 node1, node2, node3中得部分槽到node4上. 如果我想移除节点node1,需要将node1中的槽移到node2和node3节点上,然后将没有任何槽的node1节点从集群中移除即可. 由于从一个节点将哈希槽移动到另一个节点并不会停止服务,所以无论添加删除或者改变某个节点的哈希槽的数量都不会造成集群不可用的状态。

在这里插入图片描述

在某些集群方案中,涉及多个key的操作会被限制在一个slot中,如Redis Cluster中的mget/mset操作。这里就会涉及到 哈希标签 Hash Tag 的概念。

Hash Tag是用于计算哈希槽时的一个特殊场景,是一种确保多个键分配到同一个哈希槽中的方法。这是为了在Redis集群中实现多键操作而使用的。为了实现Hash Tag,在某些情况下,会以稍微不同的方式计算key的哈希槽。如果key包含"{…}"模式,则仅对{和}之间的子字符串进行散列以获取哈希槽。但由于可能存在多个{或}出现,因此该算法遵循以下规则:

  • 如果key包含字符 {
  • 并且如果 } 字符位于 { 的右侧
  • 并且在第一个 { 和第一个 } 之间存在一个或多个字符

对于符合上述规则的key,则不会对整个key进行散列处理,而只会对第一次出现 { 和随后第一次出现 } 之间的内容进行散列。否则,对整个key进行散列处理。

在这里插入图片描述

不使用hash tag批量设置不同名称的key:

127.0.0.1:6379> mset name name1 name2 name3
(error) CROSSSLOT Keys in request don't hash to the same slot

显示错误信息:CROSSSLOT 请求中的key没有哈希到同一个插槽。这个问题是因为多键操作的时候每个键对应的slot可能不是一个,客户端没法做move操作。

解决思路就是采用redis cluster的hashTag,当redis的key加上hashTag时,集群算key的slot是按照hashTag进行计算,即可保证hashTag一致的key能分配到相同的stlot中。:

127.0.0.1:6379> mset name {name} {name}1 {name}2 {name}3

二、redis 分布式锁

Redis锁使用起来比较简单,既可以锁定单个键,也可以批量锁定多个键,以实现更大规模的操作。它也是分布式应用中使用最广泛的分布式锁实施方式,可以有效解决单点故障、死锁和负载失衡等问题。

大规模锁定Redis,实现批量操作,一般通过以下实现:

  • 使用Redis的消息订阅机制,创建消息频道,用于锁定指定键之间的多个键。消息频道的名字称为锁名,它代表锁定的范围和跨度。
  • 然后,通过Redis的SUBSCRIBE命令订阅消息频道名字,比如“ lock_key”,并调用Redis BLPOP,将锁定的键占据,以实现批量锁定。
  • 此外,也可以使用Redis的Lua脚本实现批量锁定。获取带锁的Key数组,这里以数组形式表示。同时,以原子的形式执行多个SETNX命令,一旦全部执行成功,则实现批量锁定:
local locks = red:lrange("lock_keys", 1, -1)
for i, v in iprs(locks) do
    if redis.call("setnx", v, field) == 1 then
        red.lpush("locked_keys", v)
    end
end

释放锁定的键,实现批量解锁,语句如下:

local unlocked_locks = red:lrange("locked_keys",1, -1)
for i, v in iprs(unlocked_locks) do
    red.del(v)
end
red.del("locked_keys")

使用Redis的WATCH功能,防止多个客户端同时更新同一键,即如果更新发生乐观锁的冲突的情况下,返回失败给客户端,从而保证了锁定的原子性:

-- 使用Redis watch,开始监听
red.watch("lock_keys")
-- 进行具体操作
-- …
-- 解锁操作
red.unwatch()

Redis锁使用起来非常简单,可以用于单个键锁定和大规模锁定,从而实现批量操作,有效解决分布式应用中的死锁、负载失衡、单点故障等问题。

三、如何使用 redis 实现批量可重入锁?

1、方案一:Lua脚本批量加锁

Lua加锁脚本处理:

	/**
	 * 加锁脚本
	 * KEYS[1] key
	 * ARGV[1] value
	 * ARGV[2] expire
	 * 判断key是否存在,不存在则加锁,并记录加锁次数+1;若存在,则判断value是否相等,相等则记录加锁次数+1,不相等则返回0
	 */
	private static final String REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
        "    redis.call('SET', KEYS[1], ARGV[1]) " +
        "    redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "    redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "    return 1 " +
        "else " +
        "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
        "        redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
        "        redis.call('INCR', 'lockCount:' .. KEYS[1]) " +
        "        return 1 " +
        "    else " +
        "        return 0 " +
        "    end " +
        "end";
        
    @Bean
    public DefaultRedisScript<Long> reentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis reentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis reentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

Lua解锁脚本处理:

    private static final String RELEASE_REENTRANT_LOCK_SCRIPT = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
            "    return 0 " +
            "else " +
            "    if redis.call('GET', KEYS[1]) == ARGV[1] then " +
            "        redis.call('DECR', 'lockCount:' .. KEYS[1]) " +
            "        if redis.call('GET', 'lockCount:' .. KEYS[1]) == '0' then " +
            "            redis.call('DEL', KEYS[1]) " +
            "        end " +
            "        return 1 " +
            "    else " +
            "        return 0 " +
            "    end " +
            "end";
    @Bean
    public DefaultRedisScript<Long> releaseReentrantLockRedisScript(){
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_REENTRANT_LOCK_SCRIPT, Long.class);
        try {
            List<Boolean> results= redisTemplate.getConnectionFactory().getConnection().scriptExists(redisScript.getSha1());
            if(Boolean.FALSE.equals(results.get(0)))
            {
                redisTemplate.getConnectionFactory().getConnection().scriptLoad(redisScript.getScriptAsString().getBytes());
                log.info("redis releaseReentrantLockRedisScript load success");
            }

        }catch (Exception ex){
            log.error("redis releaseReentrantLockRedisScript load error",ex);
        }

        return redisScript;
    }

redis set 多个key场景需要hash tag处理:

private static final String KEY_FORMAT = "%s{%s}";
String slotKey = String.format(KEY_FORMAT, key, key);

结合以上封装,批量可重入锁的方法处理如下:

/**
     * 可重入锁,value相同的情况下,可重复加锁,当所有加锁方都解锁后才会释放锁
     *
     * @param key           锁的key
     * @param value         锁的value
     * @param expireSeconds 锁过期时间
     * @param waitSeconds   等待时间
     * @param process
     * @return
     */
    public boolean reentrantLock(String key, String value, long expireSeconds, long waitSeconds, Runnable process) {
        boolean lock = false;
        try {
            //特殊处理key,为了保证原key和计数key落在同一个slot,将原key拼装成: key{key}
            String slotKey = String.format(KEY_FORMAT, key, key);
            long start = System.currentTimeMillis();
            while (!(lock = redisUtil.execute(reentrantLockRedisScript, Collections.singletonList(slotKey), value, expireSeconds) == 1)) {
                Thread.sleep(100);
                if ((System.currentTimeMillis() - start) / 1000 > waitSeconds) {
                    break;
                }
            }
            //加锁成功,执行传入的方法,最后用lua脚本判断锁的value是否还是当前的value,是则执行解锁
            if (lock) {
                try {
                    process.run();
                } catch (Exception ex) {
                    throw ex instanceof BusinessException ? ex : new BusinessException(ex.getMessage());
                } finally {
                    redisUtil.execute(releaseReentrantLockRedisScript, Collections.singletonList(slotKey), value);
                }
            }

        } catch (BusinessException businessException) {
            throw businessException;
        } catch (Exception e) {
            log.error("redis lockAndRun error!lockKey=" + key, e);
        }
        return lock;
    }

    /**
     * 先获取可重入锁,获取成功后批量加锁,执行传入的方法
     *
     * @param keys
     * @param reentrantKey
     * @param value
     * @param expireSeconds
     * @param waitSeconds
     * @param process
     * @return
     */
    public boolean batchReentrantLock(Set<String> keys, String reentrantKey, String value, long expireSeconds, long waitSeconds, Runnable process) {
        List<Boolean> result = new ArrayList<>(1);
        boolean reentrantLock = reentrantLock(reentrantKey, value, expireSeconds, waitSeconds, () -> {
            result.add(batchLockAndRun(keys, expireSeconds, waitSeconds, process));
        });
        return reentrantLock && result.get(0);
    }

2、方案二:pipeline批量加锁

  • 不用lua以避免cross slot error
  • 批量加锁失败后立即全部解锁,防止死锁
	/**
	 * 使用redisTemplate
	 * @param script
	 * @param keys
	 * @param args
	 * @param <T>
	 */
	public List<Object> executePipelined(String[] keys, String[] values, long time, TimeUnit timeUnit) {
	      return redisTemplate.executePipelined(new SessionCallback<Object>() {
	           @Override
	           public Object execute(RedisOperations operations) throws DataAccessException {
	               for (int i = 0; i < keys.length; i++) {
	                   operations.opsForValue().setIfAbsent(keys[i], values[i], time, timeUnit);
	               }
	               return null;
	           }
	       });
	}
	
	/**
	 * pipeline批量加锁
	 * @param keys          需要加锁的key
	 * @param values        锁的value,用于解锁时判断是否是当前线程加的锁
	 * @param expireSeconds 锁过期时间
	 * @return boolean 是否加锁成功
	 */
	private boolean tryBatchLock(String[] keys, String[] values, long expireSeconds) {
	    List<Object> results = redisUtil.executePipelined(keys, values, expireSeconds, TimeUnit.SECONDS);
	    if (results == null || results.size() != keys.length || results.contains(false)) {
	        //加锁失败,立即解锁
	        redisUtil.executePipelined(releaseReentrantLockRedisScript, keys, values);
	        return false;
	    }
	    return true;
	}

四、总结

以上是使用redis cluster集群版所遇到的问题以及解决方案,主要在业务实现过程中,需要注意redis cluster key会被划分到不同的槽中的问题,以及redis可重入锁是否会有死锁的问题等,更多相关redis cluster 批量可重入锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Redis 2.8-4.0过期键优化过程全纪录

    Redis 2.8-4.0过期键优化过程全纪录

    这篇文章主要给大家介绍了关于Redis 2.8-4.0过期键优化的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-04-04
  • 基于Redis无序集合如何实现禁止多端登录功能

    基于Redis无序集合如何实现禁止多端登录功能

    这篇文章主要给你大家介绍了关于基于Redis无序集合如何实现禁止多端登录功能的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2018-12-12
  • Redis中的BigKey问题排查与解决思路详解

    Redis中的BigKey问题排查与解决思路详解

    Redis是一款性能强劲的内存数据库,但是在使用过程中,我们可能会遇到Big Key问题,这个问题就是Redis中某个key的value过大,所以Big Key问题本质是Big Value问题,这篇文章主要介绍了Redis中的BigKey问题:排查与解决思路,需要的朋友可以参考下
    2023-03-03
  • 从原理到实践分析 Redis 分布式锁的多种实现方案

    从原理到实践分析 Redis 分布式锁的多种实现方案

    在分布式系统中,为了保证多个进程或线程之间的数据一致性和正确性,需要使用锁来实现互斥访问共享资源,然而,使用本地锁在分布式系统中存在问题,这篇文章主要介绍了从原理到实践分析 Redis 分布式锁的多种实现方案,需要的朋友可以参考下
    2024-07-07
  • 详解Redis中的List是如何实现的

    详解Redis中的List是如何实现的

    List 的 Redis 中的 5 种主要数据结构之一,它是一种序列集合,可以存储一个有序的字符串列表,顺序是插入的顺序,本文将给大家介绍了一下Redis中的List是如何实现的,需要的朋友可以参考下
    2024-05-05
  • Linux下Redis安装配置教程

    Linux下Redis安装配置教程

    这篇文章主要为大家详细介绍了Linux下Redis安装配置教程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-11-11
  • 详解Redis中key的命名规范和值的命名规范

    详解Redis中key的命名规范和值的命名规范

    这篇文章主要介绍了详解Redis中key的命名规范和值的命名规范,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • Redis数据库分布式设计方案介绍

    Redis数据库分布式设计方案介绍

    大家好,本篇文章主要讲的是Redis数据库分布式设计方案介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-01-01
  • Redis 键值对(key-value)数据库实现方法

    Redis 键值对(key-value)数据库实现方法

    Redis 的键值对中的 key 就是字符串对象,而 value 可以是字符串对象,也可以是集合数据类型的对象,比如 List 对象,Hash 对象、Set 对象和 Zset 对象,这篇文章主要介绍了Redis 键值对数据库是怎么实现的,需要的朋友可以参考下
    2024-05-05
  • 在Redis中如何保存时间序列数据详解

    在Redis中如何保存时间序列数据详解

    与发生时间相关的一组数据,就是时间序列数据,这些数据的特点是没有严格的关系模型,记录的信息可以表示成键和值的关系,这篇文章主要给大家介绍了关于在Redis中如何保存时间序列数据的相关资料,需要的朋友可以参考下
    2021-10-10

最新评论