Redis分布式可重入锁实现方案

 更新时间:2024年02月19日 09:40:03   作者:程序员小潘  
在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可,在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案,本文介绍一下基于 Redis实现的分布式锁方案,感兴趣的朋友一起看看吧

前言

在单进程环境下,要保证一个代码块的同步执行,直接用synchronized 关键字或ReetrantLock 即可。在分布式环境下,要保证多个节点的线程对代码块的同步访问,就必须要用到分布式锁方案。
分布式锁实现方案有很多,有基于关系型数据库行锁实现的;有基于ZooKeeper临时顺序节点实现的;还有基于 Redis setnx 命令实现的。本文介绍一下基于 Redis 实现的分布式锁方案。

理解分布式锁

实现分布式锁有几个要求

  • 互斥性:任意时刻,最多只会有一个客户端线程可以获得锁
  • 可重入:同一客户端的同一线程,获得锁后能够再次获得锁
  • 避免死锁:客户端获得锁后即使宕机,后续客户端也可以获得锁
  • 避免误解锁:客户端A加的锁只能由A自己释放
  • 释放锁通知:持有锁的客户端释放锁后,最好可以通知其它客户端继续抢锁
  • 高性能和高可用

Redis 服务端命令是单线程串行执行的,天生就是原子的,并且支持执行自定义的 lua 脚本,功能上更加强大。
关于互斥性,我们可以用 setnx 命令实现,Redis 可以保证只会有一个客户端 set 成功。但是由于我们要实现的是一个分布式的可重入锁,数据结构得用 hash,用客户端ID+线程ID作为 field,value 记作锁的重入次数即可。
关于死锁,代码里建议把锁的释放写在 finally 里面确保一定执行,针对客户端抢到锁后宕机的场景,可以给 redis key 设置一个超时时间来解决。
关于误解锁,客户端在释放锁时,必须判断 field 是否当前客户端ID以及线程ID一致,不一致就不执行删除,这里需要用到 lua 脚本判断。
关于释放锁通知,可以利用 Redis 发布订阅模式,给每个锁创建一个频道,释放锁的客户端负责往频道里发送消息通知等待抢锁的客户端。
最后关于高性能和高可用,因为 Redis 是基于内存的,天生就是高性能的。但是 Redis 服务本身一旦出现问题,分布式锁也就不可用了,此时可以多部署几台独立的示例,使用 RedLock 算法来解决高可用的问题。

设计实现

首先我们定义一个 RedisLock 锁对象的抽象接口,只有尝试加锁和释放锁方法

public interface RedisLock {
    boolean tryLock();
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
    void unlock();
}

然后提供一个默认实现 DefaultRedisLock

public class DefaultRedisLock implements RedisLock {
    // 客户端ID UUID
    private final String clientId;
    private final StringRedisTemplate redisTemplate;
    // 锁频道订阅器 接收释放锁通知
    private final LockSubscriber lockSubscriber;
    // 加锁的key
    private final String lockKey;
}

关于tryLock() ,首先执行lua脚本尝试获取锁,如果加锁失败则返回其它客户端持有锁的过期时间,客户端订阅锁对应的频道,然后sleep,直到收到锁释放的通知再继续抢锁。最终不管有没有抢到锁,都会在 finally 取消频道订阅。

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
    final long timeout = System.currentTimeMillis() + unit.toMillis(waitTime);
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }
    if (System.currentTimeMillis() >= timeout) {
        return false;
    }
    final Semaphore semaphore = lockSubscriber.subscribe(getChannel(lockKey), threadId);
    try {
        while (true) {
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                return true;
            }
            if (System.currentTimeMillis() >= timeout) {
                return false;
            }
            semaphore.tryAcquire(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lockSubscriber.unsubscribe(getChannel(lockKey), threadId);
    }
    return false;
}

tryAcquire() 就是执行lua脚本来加锁,解释一下这段脚本的逻辑:首先判断 lockKey 是否存在,不存在则直接设置 lockKey并且设置过期时间,返回空,表示加锁成功。存在则判断 field 是否和当前客户端ID+线程ID一致,一致则代表锁重入,递增一下value即可,不一致代表加锁失败,返回锁的过期时间

private Long tryAcquire(long leaseTime, TimeUnit timeUnit, long threadId) {
    return redisTemplate.execute(RedisScript.of(
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; end;" +
                            "return redis.call('pttl', KEYS[1]);", Long.class), Collections.singletonList(lockKey),
            String.valueOf(timeUnit.toMillis(leaseTime)), getLockName(threadId));
}

lockName是由客户端ID和线程ID组成的:

private String getLockName(long threadId) {
    return clientId + ":" + threadId;
}

如果加锁失败,客户端会尝试订阅对应的频道,名称规则是:

private String getChannel(String lockKey) {
    return "__lock_channel__:" + lockKey;
}

订阅方法是LockSubscriber#subscribe ,同一个频道无需订阅多个监听器,所以用一个 Map 记录。订阅成功以后,会返回当前线程对应的一个 Semaphore 对象,默认许可数是0,当前线程会调用Semaphore#tryAcquire 等待许可数,监听器在收到锁释放消息后会给 Semaphore 对象增加许可数,唤醒线程继续抢锁。

@Component
public class LockSubscriber {
    @Autowired
    private RedisMessageListenerContainer messageListenerContainer;
    private final Map<String, Map<Long, Semaphore>> channelSemaphores = new HashMap<>();
    private final Map<String, MessageListener> listeners = new HashMap<>();
    private final StringRedisSerializer serializer = new StringRedisSerializer();
    public synchronized Semaphore subscribe(String channelName, long threadId) {
        MessageListener old = listeners.put(channelName, new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String channel = serializer.deserialize(message.getChannel());
                String ignore = serializer.deserialize(message.getBody());
                Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channel);
                if (semaphoreMap != null && !semaphoreMap.isEmpty()) {
                    semaphoreMap.values().stream().findFirst().ifPresent(Semaphore::release);
                }
            }
        });
        if (old == null) {
            messageListenerContainer.addMessageListener(listeners.get(channelName), new ChannelTopic(channelName));
        }
        Semaphore semaphore = new Semaphore(0);
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.getOrDefault(channelName, new HashMap<>());
        semaphoreMap.put(threadId, semaphore);
        channelSemaphores.put(channelName, semaphoreMap);
        return semaphore;
    }
    public synchronized void unsubscribe(String channelName, long threadId) {
        Map<Long, Semaphore> semaphoreMap = channelSemaphores.get(channelName);
        if (semaphoreMap != null) {
            semaphoreMap.remove(threadId);
            if (semaphoreMap.isEmpty()) {
                MessageListener listener = listeners.remove(channelName);
                if (listener != null) {
                    messageListenerContainer.removeMessageListener(listener);
                }
            }
        }
    }
}

对于 unlock,就只是一段 lua 脚本,这里解释一下:判断当前客户端ID+线程ID 这个 field 是否存在,存在说明是自己加的锁,可以释放。不存在说明不是自己加的锁,无需做任何处理。因为是可重入锁,每次 unlock 都只是递减一下 value,只有当 value 等于0时才是真正的释放锁。释放锁的时候会 del lockKey,再 publish 发送锁释放通知,让其他客户端可以继续抢锁。

@Override
public void unlock() {
    long threadId = Thread.currentThread().getId();
    redisTemplate.execute(RedisScript.of(
                    "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
                            "return nil;end;" +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
                            "if (counter > 0) then " +
                            "return 0; " +
                            "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], 1); " +
                            "return 1; " +
                            "end; " +
                            "return nil;", Long.class), Arrays.asList(lockKey, getChannel(lockKey)),
            getLockName(threadId));
}

最后,我们需要一个 RedisLockFactory 来创建锁对象,它同时会生成客户端ID

@Component
public class RedisLockFactory {
    private static final String CLIENT_ID = UUID.randomUUID().toString();
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private LockSubscriber lockSubscriber;
    public RedisLock getLock(String lockKey) {
        return new DefaultRedisLock(CLIENT_ID, redisTemplate, lockSubscriber, lockKey);
    }
}

至此,一个基于 Redis 实现的分布式可重入锁就完成了。

尾巴

目前这个版本的分布式锁,保证了互斥性、可重入、避免死锁和误解锁、实现了释放锁通知,但是并没有高可用的保证。如果 Redis 是单实例部署,就会存在单点问题,Redis 一旦故障,整个分布式锁将不可用。如果 Redis 是主从集群模式部署,虽然有主从自动切换,但是 Master 和 Slave 之间的数据同步是存在延迟的,分布式锁可能会出现问题。比如:客户端A加锁成功,lockKey 写入了 Master,此时 Master 宕机,其它 Slave 升级成了 Master,但是还没有同步到 lockKey,客户端B来加锁也会成功,这就没有保证互斥性。针对这个问题,可以参考 RedLock 算法,部署多个单独的 Redis 示例,只要一半以上的Redis节点加锁成功就算成功,来尽可能的保证服务高可用。

到此这篇关于Redis分布式可重入锁实现方案的文章就介绍到这了,更多相关Redis重入锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Redis三种特殊数据类型的具体使用

    Redis三种特殊数据类型的具体使用

    本文主要介绍了Redis三种特殊数据类型的具体使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • CentOS系统下Redis安装和自启动配置的步骤

    CentOS系统下Redis安装和自启动配置的步骤

    相信大家都知道Redis是一个C实现的基于内存、可持久化的键值对数据库,在分布式服务中常作为缓存服务。所以这篇文章将详细介绍在CentOS系统下如何从零开始安装到配置启动服务。有需要的可以参考借鉴。
    2016-09-09
  • redis计数器与数量控制的实现

    redis计数器与数量控制的实现

    使用Redis计数器可以轻松地解决数量控制的问题,同时还能有效地提高应用的性能,本文主要介绍了redis计数器与数量控制的实现,具有一定的参考价值,感兴趣的可以了解一下
    2023-12-12
  • 浅谈Redis缓存有哪些淘汰策略

    浅谈Redis缓存有哪些淘汰策略

    redis用做缓存是一种非常常见的手段,然而由于内存大小的限制,会导致redis在内存空间满了以后需要处理继续存入的数据,所以就需要淘汰策略,本文就详细的介绍一下
    2021-08-08
  • redis实现分布式session的解决方案

    redis实现分布式session的解决方案

    session存放在服务器,关闭浏览器不会失效,本文主要介绍了redis实现分布式session的解决方案,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-03-03
  • redis数据倾斜处理方法

    redis数据倾斜处理方法

    我们在使用Redis分片集群时,集群最好的状态就是每个实例可以处理相同或相近比例的请求,但如果不是这样,则会出现某些实例压力特别大,而某些实例特别空闲的情况发生,本文就一起来看下这种情况是如何发生的以及如何处理
    2022-12-12
  • Redis分布式锁实例分析讲解

    Redis分布式锁实例分析讲解

    分布式锁是控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性
    2022-12-12
  • 基于Redis Set轻松实现简单的抽奖系统

    基于Redis Set轻松实现简单的抽奖系统

    Redis Set集合是无序且元素唯一的String类型数据结构,支持高效增删查改操作,包括获取所有值、判断包含关系、计算交并差集等,底层基于Hash表实现O(1)时间复杂度,这篇文章主要介绍了基于Redis Set轻松实现简单的抽奖系统的相关资料,需要的朋友可以参考下
    2026-03-03
  • redis由于目标计算机积极拒绝,无法连接的解决

    redis由于目标计算机积极拒绝,无法连接的解决

    这篇文章主要介绍了redis由于目标计算机积极拒绝,无法连接的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-07-07
  • Redis延迟双删的具体使用

    Redis延迟双删的具体使用

    本文主要讨论了延时双删策略,用于解决缓存与数据库数据不一致的问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-08-08

最新评论