Redisson之分布式锁原理全面分析

 更新时间:2024年03月09日 16:15:38   作者:Charge8  
这篇文章主要介绍了Redisson分布式锁原理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Redisson是一个 Redis的开源客户端,也提供了分布式锁的实现。

Redisson官网:

Redisson 分布式锁使用

Redisson分布式锁使用起来还是蛮简单的。

1、添加 Redisson 配置类

引入依赖:

		<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.16.5</version>
        </dependency>

创建 Redisson 配置类,注入 RedissonClient客户端。

@Configuration
public class RedissonConfig {

	@Value("${spring.redis.host}")
	private String host;

	@Value("${spring.redis.port}")
	private String port;

	@Value("${spring.redis.password}")
	private String password;

	@Bean
	public RedissonClient getRedisson() {
		Config config = new Config();
		/**
		 * reids配置,支持单机、主从、哨兵、集群等配置。这里使用单机配置
		 */
		config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
		return Redisson.create(config);
	}
}

2、使用 Redisson分布式锁

代码如下:

	@Autowired
	private RedisTemplate<String, String> redisTemplate;

	@Autowired
	private RedissonClient redissonClient;

	public void disLockDemo(long productId) {
		String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId;

		//设置锁定资源名称,并获取分布式锁对象。
		RLock redissonLock = redissonClient.getLock(lockKey);
		//1.加锁
		redissonLock.lock();
		//boolean isLock = disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
		try {
			//2.执行业务代码
			// TODO
            
            //if (isLock) {
				// TODO
			//}
		} finally {
			//3.解锁
			redissonLock.unlock();
		}
	}

Redisson 分布式锁源码分析

使用分布式锁必须要考虑的一些问题:

  • 互斥性:在任意时刻,只能有一个进程持有锁。
  • 防死锁:即使有一个进程在持有锁的期间崩溃而未能主动释放锁,要有其他方式去释放锁从而保证其他进程能获取到锁。
  • 不能释放别人的锁:加锁和解锁的必须是同一个进程。
  • 锁的续期问题:业务执行时间超过锁的过期时间时,需要提前给锁的续期。

Redisson 是 Redis 官方推荐分布式锁实现方案,它采用 Watch Dog机制能够很好的解决锁续期的问题。

执行 lua脚本保证了多条命令执行的原子性操作。

带着上面分布式锁的一些问题查看源码。

1、获取分布式锁对象

简单了解一下。

1.1 创建 RedissonClient

我们在配置类中通过 Redisson.create(config)方法创建了 RedissonClient对象,并注入到 IOC容器中。

1.2 获取分布式锁对象

使用 Redisson 客户端来 获取分布式锁对象。

2、加锁代码

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //线程id
        long threadId = Thread.currentThread().getId();
        // 1.尝试获取锁
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
        if (ttl != null) {
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            if (interruptibly) {
                this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                this.commandExecutor.syncSubscription(future);
            }

            try {
                //2.死循环,反复去调用tryAcquire尝试获取锁
                while(true) {
                    // 再次尝试获取锁
                    ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                    // ttl为null时表示别的线程已经unlock了,自己加锁成功
                    if (ttl == null) {
                        return;
                    }
                    // 3.锁互斥:通过 JDK的信号量 Semaphore来阻塞线程
                    if (ttl >= 0L) {
                        try {                                                  
                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var13) {
                            if (interruptibly) {
                                throw var13;
                            }

                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                    } else {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                // 4.无论是否获得锁,都要取消订阅解锁消息
                this.unsubscribe(future, threadId);
            }
        }
    }

2.1 异步加锁机制

查看 tryAcquire()加锁方法。

通过源码,看到加锁其实是通过一段 lua 脚本实现的,如下:

if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end ;
return redis.call('pttl', KEYS[1]);
  • KEYS[1] 代表的是你加锁的 key。
  • ARGV[1] 代表的是锁 key 的默认生存时间,默认 30 秒。
  • ARGV[2] 代表的是加锁的客户端的线程 ID。通过 getLockName方法组装了一下。
  • ARGV[2] 后面的 1:为了支持可重入锁做的计数统计。

Redisson 实现分布式锁的共享资源的存储结构是 hash数据结构:

key 是锁的名称,field 是客户端 ID,value 是该客户端加锁(可重入)的次数。

假设此时,客户端 1 来尝试加锁,查看加锁的 lua 脚本:

第一段 if 判断语句,如果你要加锁的那个锁 key 不存在的话,进行加锁。此时锁 key不存在,向Redis中设置一个 hash 结构的数据,则客户端 1加锁成功,返回 null。

2.1.1 锁的续期机制

客户端 1 加锁的那个锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,就需要提前进行锁的续期操作。

Redisson 提供了一个 Watch dog 机制来解决锁的续期问题, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。

进入 scheduleExpirationRenewal方法,重点查看 renewExpiration方法。

锁续期的 lua 脚本如下:

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
return 0;

从源码我们看到 leaseTime 必须是 -1 才会开启 Watch Dog 机制,我们发现:

  • 如果想开启 Watch Dog 机制必须使用默认的加锁时间为 30s。
  • 如果自己自定义时间,即使用 tryLock,锁并不会延长,不会触发Watch Dog 机制。

Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,

然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,

其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,

如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。

如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。

2.1.2 可重入加锁机制

Redisson 也是支持可重入锁的,比如:客户端 1 加锁代码:

@Override
public void demo() {
    RLock lock = redissonSingle.getLock("myLock");
    try {
        lock.lock();
        // TODO 执行业务
        //锁重入
        lock.lock();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 释放锁
        lock.unlock();
        lock.unlock();
    }
}

此时,如果客户端 1 又来尝试加锁,继续分析加锁的 lua 脚本。

  • 首先,第一个 if 判断,你要加锁的那个锁 key 已经存在了。
  • 然后,第二个 if 判断,判断一下,加锁的那个锁 key的 hash 数据结构中,是否包含客户端 1 的 ID,
  • 此时数据结构的是客户端 1 的 ID,即包含客户端 1的 ID,然后就执行行可重入锁的命令,将 hash 结构的 value数据 + 1,返回 null。

2.2 锁互斥机制

上面客户端 1加锁成功,此时,如果客户端 2 来尝试加锁,继续分析加锁的 lua 脚本:

  • 首先,第一个 if 判断,你要加锁的那个锁 key 已经存在了。
  • 然后,第二个 if 判断,判断一下,加锁的那个锁 key的 hash 数据结构中,是否包含客户端 2 的 ID,如果包含就是执行可重入锁的赋值,此时 hash数据结构是客户端 1 的 ID,不包含客户端 2的 ID,所以,返回加锁的那个锁 key的剩余存活时间。

接着查看 lock方法中的 死循环部分。

流程大致如下:

  • 尝试获取锁,返回 null 则说明加锁成功,返回一个ttl,则说明已经存在该锁,ttl 为锁的剩余存活时间。
  • 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id,通过 Redis 的 channel 订阅锁释放的事件。
  • 进入死循环中,尝试重新获取锁。
  • 如果在重试中拿到了锁,则直接返回。
  • 如果锁当前还是被占用的,那么等待释放锁的消息。通过使用了 JDK 的信号量 Semaphore 来阻塞线程,当 ttl 为锁的剩余存活时间为0后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

注意:

当锁正在被占用时,等待获取锁的进程并不是真正通过一个 while(true) 死循环去获取锁(占 CPU资源),而时使用 JDK 的信号量 Semaphore 来阻塞线程(间断性的不断尝试获取锁),是会释放 CPU资源的。

3、锁释放代码

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise();
        // 1. 异步释放锁
        RFuture<Boolean> future = this.unlockInnerAsync(threadId);
        // 2. 取消 Watch Dog 机制
        future.onComplete((opStatus, e) -> {
            this.cancelExpirationRenewal(threadId);
            if (e != null) {
                result.tryFailure(e);
            } else if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
                result.tryFailure(cause);
            } else {
                result.trySuccess((Object)null);
            }
        });
        return result;
    }

3.1 异步释放锁机制

查看unlockInnerAsync方法。

释放锁也是执行的 lua 脚本:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end ;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end ;
return nil;

首先,第一段 if 判断语句,判断 key 是否存在的话,进行加锁。此时锁 key不存在,则客户端 1加锁成功,向Redis中设置一个 hash 结构的数据。返回 null。

然后,第二个 if 判断,判断一下该客户端对应的锁的 hash 结构的 value 值是否递减为 0,

  • 如果递减不为 0,则重入锁的解锁,返回0。
  • 如果递减为 0,则进行删除,返回1。

3.2 取消 Watch Dog机制

查看 cancelExpirationRenewal方法。

取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除。

3.3 通知阻塞等待的进程

利用 Redis 的发布订阅机制,广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Netty启动流程服务端channel初始化源码分析

    Netty启动流程服务端channel初始化源码分析

    这篇文章主要为大家介绍了Netty启动流程服务端channel初始化源码分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-03-03
  • java设计模式(实战)-责任链模式

    java设计模式(实战)-责任链模式

    这篇文章主要介绍了java设计模式(实战)-责任链模式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • 浅谈JAVA8给我带了什么——流的概念和收集器

    浅谈JAVA8给我带了什么——流的概念和收集器

    这篇文章主要介绍了JAVA8流的概念和收集器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • 实例解析Java中的构造器初始化

    实例解析Java中的构造器初始化

    这篇文章主要通过实例解析Java中的构造器初始化,代码很简单,叙述很明确,需要的朋友可以了解下。
    2017-09-09
  • SpringBoot项目中配置application.yml中server.port不生效的问题

    SpringBoot项目中配置application.yml中server.port不生效的问题

    这篇文章主要介绍了SpringBoot项目中配置application.yml中server.port不生效的问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • 使用ftpClient下载ftp上所有文件解析

    使用ftpClient下载ftp上所有文件解析

    最近项目需要写个小功能,需求就是实时下载ftp指定文件夹下的所有文件(包括子目录)到本地文件夹中,保留文件到目录路径不变。今天小编给大家分享使用ftpClient下载ftp上所有文件的方法,需要的的朋友参考下吧
    2017-04-04
  • Java通过反射机制动态设置对象属性值的方法

    Java通过反射机制动态设置对象属性值的方法

    下面小编就为大家带来一篇Java通过反射机制动态设置对象属性值的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-07-07
  • 浅析Java BigDecimal为什么可以不丢失精度

    浅析Java BigDecimal为什么可以不丢失精度

    在金融领域,为了保证数据的精度,往往会使用BigDecimal,所以这篇文章主要来和大家探讨下为什么BigDecimal可以保证精度不丢失,感兴趣的可以了解下
    2024-03-03
  • Sentinel流控规则实现限流保护详解

    Sentinel流控规则实现限流保护详解

    这篇文章主要介绍了Sentinel流控规则实现限流保护,Sentinel是一个分布式系统的流量控制组件,它可以实现限流,流控,降级等功能,提高系统的稳定性和可靠性,感兴趣想要详细了解可以参考下文
    2023-05-05
  • Java中Calendar类用法实例详解

    Java中Calendar类用法实例详解

    这篇文章主要给大家介绍了关于Java中Calendar类用法的相关资料,Calendar类是Java.util包中提供的一个抽象类,该类从JDK1.1开始出现,作为Date类的替代方案,Calendar类中包含了对不同国家地区日历的处理,需要的朋友可以参考下
    2023-09-09

最新评论