yeskery

基于 Redis 的分布式锁 Redlock

怎么在单节点上实现分布式锁

说道Redis分布式锁大部分人都会想到:setnx+lua,或者知道 set key value px milliseconds nx 。后一种方式的核心实现命令如下:

该命令仅当 Key 不存在时(NX保证)set 值,并且设置过期时间 3000ms (PX保证),值 unique_value 必须是所有 client 和所有锁请求发生期间唯一的,释放锁的逻辑是:

  1. - 获取锁(unique_value可以是UUID等)
  2. SET resource_name unique_value NX PX 30000
  3. - 释放锁(lua脚本中,一定要比较value,防止误解锁)
  4. if redis.call("get",KEYS[1]) == ARGV[1] then
  5. return redis.call("del",KEYS[1])
  6. else
  7. return 0
  8. end

上述实现可以避免释放另一个client创建的锁,如果只有 del 命令的话,那么如果 client1 拿到 lock1 之后因为某些操作阻塞了很长时间,此时 Redis 端 lock1 已经过期了并且已经被重新分配给了 client2,那么 client1 此时再去释放这把锁就会造成 client2 原本获取到的锁被 client1 无故释放了,但现在为每个 client 分配一个 unique 的 string 值可以避免这个问题。至于如何去生成这个 unique string,方法很多随意选择一种就行了。

这种实现方式有3大要点(也是面试概率非常高的地方):

  1. set命令要用set key value px milliseconds nx;
  2. value要具有唯一性;
  3. 释放锁时要验证value值,不能误解锁;

缺点

事实上这类琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

  1. 在Redis的master节点上拿到了锁;
  2. 但是这个加锁的key还没有同步到slave节点;
  3. master故障,发生故障转移,slave节点升级为master节点;
  4. 导致锁丢失。

什么是 RedLock

Redis 官方站这篇文章提出了一种权威的基于 Redis 实现分布式锁的方式名叫 Redlock,此种方式比原先的单节点的方法更安全。它可以保证以下特性:

  1. 安全特性:互斥访问,即永远只有一个 client 能拿到锁
  2. 避免死锁:最终 client 都可能拿到锁,不会出现死锁的情况,即使原本锁住某资源的 client crash 了或者出现了网络分区
  3. 容错性:只要大部分 Redis 节点存活就可以正常提供服务

Redlock 算法

算法很易懂,起 5 个 master 节点,分布在不同的机房尽量保证可用性。为了获得锁,client 会进行如下操作:

  1. 得到当前的时间,微秒单位
  2. 尝试顺序地在 5 个实例上申请锁,当然需要使用相同的 key 和 random value,这里一个 client 需要合理设置与 master 节点沟通的 timeout 大小,避免长时间和一个 fail 了的节点浪费时间
  3. 当 client 在大于等于 3 个 master 上成功申请到锁的时候,且它会计算申请锁消耗了多少时间,这部分消耗的时间采用获得锁的当下时间减去第一步获得的时间戳得到,如果锁的持续时长(lock validity time)比流逝的时间多的话,那么锁就真正获取到了。
  4. 如果锁申请到了,那么锁真正的 lock validity time 应该是 origin(lock validity time) - 申请锁期间流逝的时间
  5. 如果 client 申请锁失败了,那么它就会在少部分申请成功锁的 master 节点上执行释放锁的操作,重置状态
  6. 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)

失败重试

如果一个 client 申请锁失败了,那么它需要稍等一会在重试避免多个 client 同时申请锁的情况,最好的情况是一个 client 需要几乎同时向 5 个 master 发起锁申请。另外就是如果 client 申请锁失败了它需要尽快在它曾经申请到锁的 master 上执行 unlock 操作,便于其他 client 获得这把锁,避免这些锁过期造成的时间浪费,当然如果这时候网络分区使得 client 无法联系上这些 master,那么这种浪费就是不得不付出的代价了。

放锁

放锁操作很简单,就是依次释放所有节点上的锁就行了

性能、崩溃恢复和 fsync

如果我们的节点没有持久化机制,client 从 5 个 master 中的 3 个处获得了锁,然后其中一个重启了,这是注意 整个环境中又出现了 3 个 master 可供另一个 client 申请同一把锁! 违反了互斥性。如果我们开启了 AOF 持久化那么情况会稍微好转一些,因为 Redis 的过期机制是语义层面实现的,所以在 server 挂了的时候时间依旧在流逝,重启之后锁状态不会受到污染。但是考虑断电之后呢,AOF部分命令没来得及刷回磁盘直接丢失了,除非我们配置刷回策略为 fsnyc = always,但这会损伤性能。解决这个问题的方法是,当一个节点重启之后,我们规定在 max TTL 期间它是不可用的,这样它就不会干扰原本已经申请到的锁,等到它 crash 前的那部分锁都过期了,环境不存在历史锁了,那么再把这个节点加进来正常工作。

Redlock源码

redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。

POM依赖

  1. <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson</artifactId>
  5. <version>3.3.2</version>
  6. </dependency>

用法

首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:

  1. Config config1 = new Config();
  2. config1.useSingleServer().setAddress("redis://192.168.0.1:5378")
  3. .setPassword("a123456").setDatabase(0);
  4. RedissonClient redissonClient1 = Redisson.create(config1);
  5. Config config2 = new Config();
  6. config2.useSingleServer().setAddress("redis://192.168.0.1:5379")
  7. .setPassword("a123456").setDatabase(0);
  8. RedissonClient redissonClient2 = Redisson.create(config2);
  9. Config config3 = new Config();
  10. config3.useSingleServer().setAddress("redis://192.168.0.1:5380")
  11. .setPassword("a123456").setDatabase(0);
  12. RedissonClient redissonClient3 = Redisson.create(config3);
  13. String resourceName = "REDLOCK_KEY";
  14. RLock lock1 = redissonClient1.getLock(resourceName);
  15. RLock lock2 = redissonClient2.getLock(resourceName);
  16. RLock lock3 = redissonClient3.getLock(resourceName);
  17. // 向3个redis实例尝试加锁
  18. RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
  19. boolean isLock;
  20. try {
  21. // isLock = redLock.tryLock();
  22. // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
  23. isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
  24. System.out.println("isLock = "+isLock);
  25. if (isLock) {
  26. //TODO if get lock success, do something;
  27. }
  28. } catch (Exception e) {
  29. } finally {
  30. // 无论如何, 最后都要解锁
  31. redLock.unlock();
  32. }

唯一ID

实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:

  1. protected final UUID id = UUID.randomUUID();
  2. String getLockName(long threadId) {
  3. return id + ":" + threadId;
  4. }

获取锁

获取锁的代码为 redLock.tryLock() 或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s

  1. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2. internalLockLeaseTime = unit.toMillis(leaseTime);
  3. // 获取锁时需要在redis实例上执行的lua命令
  4. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  5. // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
  6. "if (redis.call('exists', KEYS[1]) == 0) then " +
  7. "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  8. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  9. "return nil; " +
  10. "end; " +
  11. // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
  12. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  13. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  14. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  15. "return nil; " +
  16. "end; " +
  17. // 获取分布式锁的KEY的失效时间毫秒数
  18. "return redis.call('pttl', KEYS[1]);",
  19. // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
  20. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  21. }

获取锁的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;

  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;

  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:

释放锁

释放锁的代码为 redLock.unlock(),核心源码如下:

  1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  2. // 释放锁时需要在redis实例上执行的lua命令
  3. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  4. // 如果分布式锁KEY不存在,那么向channel发布一条消息
  5. "if (redis.call('exists', KEYS[1]) == 0) then " +
  6. "redis.call('publish', KEYS[2], ARGV[1]); " +
  7. "return 1; " +
  8. "end;" +
  9. // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
  10. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  11. "return nil;" +
  12. "end; " +
  13. // 如果就是当前线程占有分布式锁,那么将重入次数减1
  14. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  15. // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
  16. "if (counter > 0) then " +
  17. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  18. "return 0; " +
  19. "else " +
  20. // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
  21. "redis.call('del', KEYS[1]); " +
  22. "redis.call('publish', KEYS[2], ARGV[1]); " +
  23. "return 1; "+
  24. "end; " +
  25. "return nil;",
  26. // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
  27. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
  28. }

参考:https://redis.io/topics/distlock

本文内容来自:

  1. https://www.jianshu.com/p/7e47a4503b87
  2. https://blog.csdn.net/chen_kkw/article/details/81276068

评论

发表评论 点击刷新验证码

提示

该功能暂未开放