Redis分布式锁解决超卖问题
更新时间:2023年12月11日 10:51:03 作者:Eliauk-_-
超卖问题是典型的多线程安全问题,本文就来介绍一下Redis分布式锁解决超卖问题,具有一定的参考价值,感兴趣的可以了解一下
一、使用redisTemplate中的setIfAbsent方法。
// setIfAbsent就是对应redis的setnx Boolean setIfAbsent = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey, 10, TimeUnit.SECONDS); if (Boolean.TRUE.equals(setIfAbsent)) { LOG.info("恭喜,抢到锁了!lockKey:{}", lockKey); } else { LOG.info("很遗憾,没抢到锁!lockKey:{}", lockKey); }
缺点:
- 性能不高,可能会导致少买。
- 假如业务执行时间长,设置的锁的过期时间短的话,可能出现超卖问题。
二、使用Redisson解决(看门狗方式)
2.1、实现原理
redisson在获取锁之后,会开启一个守护线程(看门狗线程),当锁即将过期还没有释放时,不断的延长锁key的生存时间
2.2、SpringBoot集成Redisson
2.2.1、添加pom.xml依赖
<!--至少3.18.0版本,才支持spring boot 3--> <!--升级到3.20.0,否则打包生产会报错:Could not initialize class org.redisson.spring.data.connection.RedissonConnection--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.21.0</version> </dependency>
2.2.2、注入RedissonClient对象
@Autowired private RedissonClient redissonClient;
2.2.3、使用Redisson
RLock lock = null; try { // 使用redisson,自带看门狗 lock = redissonClient.getLock(lockKey); /** waitTime – the maximum time to acquire the lock 等待获取锁时间(最大尝试获得锁的时间),超时返回false leaseTime – lease time 锁时长,即n秒后自动释放锁 time unit – time unit 时间单位 */ // boolean tryLock = lock.tryLock(30, 10, TimeUnit.SECONDS); // 不带看门狗 boolean tryLock = lock.tryLock(0, TimeUnit.SECONDS); // 带看门狗 if (tryLock) { LOG.info("恭喜,抢到锁了!"); } else { LOG.info("很遗憾,没抢到锁"); } } catch (InterruptedException e) { LOG.error("发生异常", e); } finally { LOG.info("释放锁!"); //当lock不等于null并且lock是当前线程的时候去释放锁 if (null != lock && lock.isHeldByCurrentThread()) { lock.unlock(); } }
缺点:
- 看门狗启动后,对整体性能也会有一定影响
- 当redis宕机后,获取不到锁,业务中断。
- 正常情况下,如果加锁成功了,那么master节点会异步复制给对应的slave节点。但是如果在这个过程中发生master节点宕机,主备切换,slave节点从变为了 master节点,而锁还没从旧master节点同步过来,这就发生了锁丢失,会导致多个客户端可以同时持有同一把锁的问题
三、Redis红锁
2.1、Redis红锁名称来源
Redis 红锁的名称来源于 Redis 的logo,Redis 的 logo 是一个红色热气球,而红色的热气球上有一把锁的图案,因此这种分布式锁解决方案也被称为"Redlock",中文翻译为"红锁"。
2.2、原理
现在假设有5个Redis主节点(大于3的奇数个),这样基本保证他们不会同时都宕掉,获取锁和释放锁的过程中,客户端会执行以下操作:
- 获取当前Unix时间,以毫秒为单位,并设置超时时间过期时间(过期时间要大于正常业务执行的时间 + 获取所有redis服务消耗时间 + 时钟漂移)
- 依次尝试从5个实例,使用相同的key和具有唯一性的value获取锁,当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间TTL,这样可以避免客户端死等。比如:TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁,就放弃获取这个锁,从而尝试获取下个锁
- 客户端 获取所有能获取的锁后的时间 减去 第(1)步的时间,就得到锁的获取时间。锁的获取时间要小于锁失效时间TTL,并且至少从半数以上的Redis节点取到锁,才算获取成功锁
- 如果成功获得锁,key的真正有效时间 = TTL - 锁的获取时间 - 时钟漂移。比如:TTL 是5s,获取所有锁用了2s,则真正锁有效时间为3s
- 如果因为某些原因,获取锁失败(没有在半数以上实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁,无论Redis实例是否加锁成功,因为可能服务端响应消息丢失了但是实际成功了。
2.3、代码实现
2.3.1、注册红锁的RedissonClient
@Component public class RedisConfig { @Bean(name = "redissonClient1") @Primary public RedissonClient redissonRed1(){ Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6379").setDatabase(0); return Redisson.create(config); } @Bean(name = "redissonClient2") public RedissonClient redissonRed2(){ Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6380").setDatabase(0); return Redisson.create(config); } @Bean(name = "redissonClient3") public RedissonClient redissonRed3(){ Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6381").setDatabase(0); return Redisson.create(config); } @Bean(name = "redissonClient4") public RedissonClient redissonRed4(){ Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6382").setDatabase(0); return Redisson.create(config); } @Bean(name = "redissonClient5") public RedissonClient redissonRed5(){ Config config = new Config(); config.useSingleServer().setAddress("127.0.0.1:6383").setDatabase(0); return Redisson.create(config); } }
2.3.2、注入Redis红锁对象
// 红锁 @Autowired @Qualifier("redissonClient1") private RedissonClient redissonClient1; @Autowired @Qualifier("redissonClient2") private RedissonClient redissonClient2; @Autowired @Qualifier("redissonClient3") private RedissonClient redissonClient3; @Autowired @Qualifier("redissonClient4") private RedissonClient redissonClient4; @Autowired @Qualifier("redissonClient5") private RedissonClient redissonClient5;
2.3.3、使用Redis红锁
/* 假设有五台redis机器, A B C D E 线程1: A B C D E(获取到锁) 线程2: C D E(获取到锁) 线程3: C(未获取到锁) */ RLock lock1 = null; RLock lock2 = null; RLock lock3 = null; RLock lock4 = null; RLock lock5 = null; try { lock1 = redissonClient1.getLock(lockKey); lock2 = redissonClient2.getLock(lockKey); lock3 = redissonClient3.getLock(lockKey); lock4 = redissonClient4.getLock(lockKey); lock5 = redissonClient5.getLock(lockKey); // 红锁的写法 RedissonRedLock redissonRedLock = new RedissonRedLock(lock1, lock2, lock3, lock4, lock5); boolean tryLock = redissonRedLock.tryLock(0, TimeUnit.SECONDS); if (tryLock) { LOG.info("恭喜,抢到锁了!"); } else { LOG.info("很遗憾,没抢到锁"); } } catch (InterruptedException e) { LOG.error("发生异常", e); } finally { LOG.info("释放锁!"); //当lock不等于null并且lock是当前线程的时候去释放锁 if (null != lock && lock.isHeldByCurrentThread()) { lock.unlock(); } }
注意:
- 按照顺序获取锁、不然会出现每个线程都拿不到锁的情况。
- 当redis宕机后,切换主备或者重启时间需大于锁的时间,不然会有线程同时获取到锁(比如线程1获取到a,b,c,获得了锁后c宕机重启了,如果数据没有备份,c中没有key,线程2有可能获取到了c,d,e,也获取到了锁)。
- 尽可能的获取到更多redis实例的锁。
- 获取锁的时间要注意,需要设置超时时间,如果超时时间内拿不到锁,结束线程
到此这篇关于Redis分布式锁解决超卖问题的文章就介绍到这了,更多相关Redis分布式锁超卖内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
最新评论