Redis 实现分布式锁 && Redisson
孙玉超
2021-04-08 09:31:07
0 评论
2084 浏览
0 收藏
0 赞
既然 Java 已经提供了 synchronized 关键字和 Lock 实现类,为什么还需要分布式锁?因为他们都是本地锁,或者说是进程锁,在 web 环境中,对于核心业务通常有大量的并发请求,采用 synchronized 和 Lock 如下图,假如四台订单服务负载均衡 10000 并发请求:
本地锁只能锁住自己当前服务,如果部署单节点,那么没有什么问题。以下面代码为例
@Service public class TestService { @Transactional public void test(){ synchronized (this){ //执行业务... } } }
让我们看下这段代码,synchronized 代码块锁的是 this ,this 是当前 Service 的实例,在一个服务中,只有一个 Spring 容器,默认 Bean 是单例的,所以这个 synchronized 能锁住当前这个服务的所有请求。然而生产环境都是服务集群,基本不可能有单服务节点的情况。这样一来,本地锁就有了一些局限,它无法保证同一时间只能有一个用户请求执行业务代码。如果是为了减少数据库的并发压力,那其实本地锁没有问题。如上图所示,即使本地锁无法锁住其他实例的请求,顶多并发请求和实例个数一样多,MySQL 还是能顶得住这些压力。只是特定对于某些业务场景必须同一时间只有一个请求抢占到锁,那么就要用分布式锁了。比如博主公司的 APP,会员提交订单,由于我们自己以前技术实现我们需要加分布式锁来确保同一个用户同一时间只能提交一个订单,避免重复使用优惠券。
使用分布式锁之后图示:
实现分布式锁其核心就是我们要把锁放在一个公共访问的地方,这样才能真正锁住所有请求。Redis 这种基于内存的中间件数据库简直是量身打造。Redis 提供了 SET EX 命令可以用来实现分布式锁。在 SpringBoot 中 spring-boot-starter-data-redis 给我们提供了相关 API。
@Transactional public void test(){ Boolean flag = stringRedisTemplate.opsForValue(). setIfAbsent("lock", "value", 5, TimeUnit.SECONDS); //抢占锁成功 if(flag != null && flag){ //执行业务... //释放锁 stringRedisTemplate.delete("lock"); }else { //自旋获取锁,也可以睡 100ms 来降低自旋频率 test(); } }
假设我们线程 A 业务代码执行的时间超过五秒钟,那么还没删除锁的时候其实这个锁就过期了。那么其他线程 B 就抢到锁了,其他线程 B 正在执行业务,线程 A 把锁删掉了,但是 A 删掉的其实是线程 B 设置的锁。线程 C 又抢到锁,线程 C 又执行业务,这样就乱掉了。
+
所以这里为了保证每一个线程删除的是自己的锁,我们可以在设置锁的时候给一个仅属于当前线程的 UUID,代码做如下改进
@Transactional public void test(){ String uuid = UUID.randomUUID().toString().replace("-",""); Boolean flag = stringRedisTemplate.opsForValue(). setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS); //抢占锁成功 if(flag != null && flag){ //执行业务... //释放锁(先判断锁是自己的,再去删除锁) if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){ stringRedisTemplate.delete("lock"); } else { //自旋获取锁,也可以睡 100ms 来降低自旋频率 test(); } } }
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
@Transactional public void test(){ String uuid = UUID.randomUUID().toString().replace("-",""); Boolean flag = stringRedisTemplate.opsForValue() .setIfAbsent("lock", uuid, 5, TimeUnit.SECONDS); //抢占锁成功 if(flag != null && flag){ //执行业务... //释放锁(先判断锁是自己加的,再去删除锁) String lua = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; Long lock = stringRedisTemplate.execute(new DefaultRedisScript<>(lua, Long.class), Collections.singletonList("lock"), uuid); } else { //自旋获取锁,也可以睡 100ms 来降低自旋频率 test(); } }
这段代码就真正的实现了可靠的分布式锁,但是参考 Redis 中文网 它告诉我们这种方式并不推荐用来做分布式锁,分布式锁有更专业的框架 —— Redisson 。
分布式锁 Redisson
RLock lock = redissonClient.getLock("lock");//可重入锁 try{ lock.lock(); }finally { lock.unlock(); }
为什么这里不用自旋呢?因为 lock.lock() 或者 lock.tryLock() 这些加锁的方法都是阻塞方法,拿不到锁就会阻塞在这一直等,所以这里不需要自旋。
其他类型的分布式锁:
RLock lock = redissonClient.getLock("lock");//可重入锁 RLock fairLock = redissonClient.getFairLock("fairLock");//公平锁 RLock multiLock = redissonClient.getMultiLock(lock, fairLock);//联锁 RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");//读写锁 RLock readLock = readWriteLock.readLock();//读锁 RLock writeLock = readWriteLock.writeLock();//写锁 RSemaphore semaphore = redissonClient.getSemaphore("semaphore");//信号量 RPermitExpirableSemaphore mySemaphore = redissonClient.getPermitExpirableSemaphore("mySemaphore");//可过期信号量 RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");//闭锁