[转]程序员如何 Get 分布式锁的正确姿势
在很多互联网产品应用中,有些场景需要加锁处理,比如秒杀、全局递增ID、楼层生成等等,大部分的解决方案是基于DB实现的,Redis也是较为常见的方案之一。
Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。
1. Redis命令介绍
使用Redis实现分布式锁,有两个重要函数需要介绍。
SETNX命令(SET if Not Exists)
语法:
1 |
SETNX key <span class="">value</span> |
功能:
当且仅当 key 不存在,将 key 的值设为 value ,并返回1; 若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
GETSET命令
语法:
1 |
GETSET key value |
功能:
将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
GET命令
语法:
1 |
GET key |
功能:
返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
DEL命令
语法:
1 |
DEL key [KEY …] |
功能:
删除给定的一个或多个 key,不存在的 key 会被忽略。
兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。
2. 加锁实现
SETNX 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试 SETNX foo.lock <current unix time>。
- 如果返回1,表示客户端已经获取锁,可以往下操作,操作完成后,通过 DEL foo.lock 命令来释放锁。
- 如果返回0,说明foo已经被其他客户端上锁,如果锁是非堵塞的,可以选择返回调用。如果是堵塞调用,就需要进入下一个重试循环,直至成功获得锁或者重试超时。
理想是美好的,现实是残酷的。仅仅使用SETNX加锁带有竞争条件的,在某些特定的情况会造成死锁错误。
3. 处理死锁
在上面的处理方式中,如果获取锁的客户端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。
因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去。
但是,在大并发情况,如果同时检测锁失效,并简单粗暴地删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。
情景描述如下:
- C1获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,获得foo.lock的时间戳,通过比对时间戳,发现锁超时。
- C2 向foo.lock发送DEL命令。
- C2 向foo.lock发送SETNX获取锁。
- C3 向foo.lock发送DEL命令,此时C3发送DEL时,其实DEL掉的是C2的锁。
- C3 向foo.lock发送SETNX获取锁。
此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。
所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好我们有GETSET方法,假设我们现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。
- C1获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,调用GET命令获得foo.lock的时间戳T1,通过比对时间戳,发现锁超时。
- C4(调用SETNX上锁返回0后,调用GET命令获得foo.lock的时间戳T1,通过比对时间戳,发现锁超时)向foo.lock发送GESET命令,GETSET foo.lock 并得到foo.lock中老的时间戳T2。
- 如果T1=T2,说明C4获得锁。
- 如果T1!=T2,说明C4之前有另外一个客户端C5通过调用GETSET方式获取并更改了时间戳,C4未获得锁。只能进入下次循环中。
4. 时间戳问题
我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间。
如果各服务器间,时间有差异,时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。锁的超时与否,严格依赖时间戳。
5. 锁覆盖问题
现在唯一的问题是,C4设置foo.lock的新时间戳,是否会对C5获取得锁产生影响?
其实我们可以看到C4和C5只有在调用GET命令获得foo.lock的时间戳,通过比对时间戳,发现锁超时后,几乎同时调用GETSET方式获取锁,执行的时间差值极小,并且写入foo.lock中的都是有效时间戳,所以对锁并没有影响。
为了让这个锁更加强壮,获取锁的客户端应该在调用关键业务时,再次调用GET方法获取T1,和写入的T0时间戳进行对比,以免锁因其他情况被执行DEL意外解开而不知。但是如果遇到上面描述得问题,则T0则会与T1不一致,当然差别一般会很小。这就是锁覆盖问题。
锁覆盖会导致什么问题呢?
当客户端的锁过期时间被覆盖,会造成锁不具有标识性,会造成客户端无法释放锁(客户端只能释放明确自己持有的锁)。
6. nil 问题
GET返回nil时应该走哪种逻辑?
1、第一种走循环走setnx逻辑
- C1客户端获取锁,并且处理完后,DEL掉锁。
- 在DEL锁之前,C2通过SETNX向foo.lock设置时间戳T0失败,发现有客户端获取锁,进入GET操作。C2 向foo.lock发送GET命令,获取返回值T1(nil)(因为此时C1执行DEL删除锁)。
- C2 循环,进入下一次SETNX逻辑。
2、第二种走超时逻辑
- C1客户端获取锁,并且处理完后,DEL掉锁。
- 在DEL锁之前,C2通过SETNX向foo.lock设置时间戳T0发现有客户端获取锁,进入GET操作。C2 向foo.lock发送GET命令,获取返回值T1(nil)(因为此时C1执行DEL删除锁)。
- C2 通过
T0 > T1 + expire
对比,进入GETSET流程。 - C2调用GETSET向foo.lock发送T0时间戳,返回foo.lock的原值T2,C2判断如果T2=T1相等,获得锁,如果T2!=T1,未获得锁。
两种逻辑貌似都是OK,但是从逻辑处理上来说,当GET返回nil,表示锁是被删除的,而不是超时,应该走SETNX逻辑加锁。
对于”第二种走超时逻辑”是否会造成死锁,尚不清楚,不过推荐采用第一种方式。
GETSET返回nil时应该怎么处理?
前提:假设C4客户端获取锁后由于异常退出等原因未正常释放锁,导致锁超时。此时,C1、C2和C3客户端同时请求获取锁。C1、C2和C3客户端调用GET接口,C1返回T1,此时C3网络情况更好,快速进入获取锁,并执行DEL删除锁,C2返回T2(nil)。C1进入超时处理逻辑。C2面临上面提到「GET返回nil时应该走哪种逻辑?」的两种选择:1. 也进入超时处理逻辑;2. 继续循环走setnx逻辑(推荐)。
- C1向foo.lock发送GETSET命令,获取返回值T11(nil)。C1比对C1和C11发现两者不同,处理逻辑认为未获取锁,然后继续循环走setnx逻辑。
- C2有两种选择:
- 进入超时处理逻辑。C2向foo.lock发送GETSET命令,获取返回值T22(C1写入的时间戳)。C2比对T2和T22发现两者不同,处理逻辑认为未获取锁,然后继续循环走setnx逻辑。
- 继续循环走setnx逻辑;
- 很明显,C1和C2最终都会继续循环走setnx逻辑,然后通过SETNX向foo.lock设置时间戳T0会失败,这其实是因为在步骤1中C1执行GETSET命令导致的。此时C1和C2都认为未获取锁,其实C1是已经获取锁了,但是他的处理逻辑没有考虑GETSET返回nil的情况,只是单纯的用GET和GETSET值进行对比。
至于为什么会出现这种情况?就如上面设想的场景那样,多客户端时,每个客户端连接Redis后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到rRedis Server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如DEL、SETNX等。
正确的处理方式就是GETSET返回nil时,获取锁成功。
总结
- 必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
- 分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在问题。要适度的机制,可以承受小概率的事件产生。
- 只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
- 在持有锁期间要不要CHECK锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的CHECK检查机制,但是根据我们的测试发现,在大并发时,每一次CHECK锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。
- sleep学问,为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。如果Redis的QPS足够高,也可以考虑循环之间不sleep,循环一定次数/时间执行yeild,提高响应速度。
- 至于为什么不使用Redis的muti、expire、watch等机制,可以查下参考资料,找下原因。
7. 代码实现
代码库
https://github.com/HuTu92/distributed-lock
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
package com.github.hutu92.concurrent.locks; import com.alibaba.fastjson.JSON; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * Created by liuchunlong on 2018/8/31. * <p> * 基于redis的分布式锁 v1 * * 需要客户端时间同步 */ public class DistributedLock { private static final long RETRY_BARRIER = 3 * 1000; // 请求锁重试屏障,单位毫秒 private final JedisPool jedisPool; // redis连接池 private final String lockKey; // lock Key private final long lockExpiryInNanos; // 锁的过期时长,单位纳秒 private static final ThreadLocal<Lock> lockThreadLocal = new ThreadLocal<Lock>(); /** * 构造方法 * * @param jedisPool redis连接池 * @param lockKey 锁的Key * @param lockExpiryInMillis 锁的过期时长,单位毫秒 */ public DistributedLock(JedisPool jedisPool, String lockKey, long lockExpiryInMillis) { this.jedisPool = jedisPool; this.lockKey = lockKey; this.lockExpiryInNanos = lockExpiryInMillis * 1000; } /** * 构造方法 * <p> * 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期 * * @param jedisPool redis连接池 * @param lockKey 锁的Key */ public DistributedLock(JedisPool jedisPool, String lockKey) { this(jedisPool, lockKey, Integer.MAX_VALUE); } /** * 获取锁在redis中的Key标记 * * @return locks key */ public String getLockKey() { return this.lockKey; } /** * 锁的过期时长 * * @return */ public long getLockExpiryInNanos() { return lockExpiryInNanos; } /** * 请求分布式锁,不会阻塞,直接返回 * * @param jedis redis 连接 * @return 成功获取锁返回true, 否则返回false */ private boolean tryAcquire(Jedis jedis) { final Lock newLock = new Lock(System.nanoTime() + this.lockExpiryInNanos); /** * 将新锁(newLock)写入redis中。如果成功写入,redis中不存在锁,获取锁成功;否则,redis中已存在锁,获取锁失败; */ if (jedis.setnx(this.lockKey, newLock.toString()) == 1) { lockThreadLocal.set(newLock); return true; } /** * 至此,说明redis中已存在锁,获取锁失败,则需要进行如下操作: * 1. 判断redis中已存在的锁是否过期,如果过期则直接获取锁; * 2. 否则,获取锁失败; */ final String currentLockValue = jedis.get(lockKey); // 特别的,当jedis.get()获取已存在的锁currentLockValue为空时,应该重新SETNX if (currentLockValue == null || currentLockValue.length() == 0) { tryAcquire(jedis); } final Lock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁 // 如果redis中已存在的锁已超时,则重新获取锁 if (isExpired(currentLock)) { String originLockValue = jedis.getSet(lockKey, newLock.toString()); /** * 这里还有个前置条件: * 会对已存在的锁进行校验,jedis.get()和jedis.getSet()获取的锁必须是同一锁,重新获取锁才成功 */ // 特别的,当jedis.getSet()获取已存在的锁originLockValue为空时,则认定获取锁成功 if (originLockValue == null || originLockValue.length() == 0) { lockThreadLocal.set(newLock); return true; } if (originLockValue.equals(currentLockValue)) { lockThreadLocal.set(newLock); return true; } } return false; } /** * 请求分布式锁,不会阻塞,直接返回 * * @return 成功获取锁返回true, 否则返回false */ public boolean tryAcquire() { Jedis jedis = null; try { jedis = jedisPool.getResource(); return tryAcquire(jedis); } finally { if (jedis != null) { jedis.close(); } } } /** * 超时请求分布式锁,会阻塞 * * 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时 * * @param acquireTimeoutInMillis 锁的请求超时时长 * @return */ public boolean acquire(long acquireTimeoutInMillis) { Jedis jedis = null; try { jedis = jedisPool.getResource(); long acquireTime = System.currentTimeMillis(); // 锁的请求到期时间 long expiryTime = System.currentTimeMillis() + acquireTimeoutInMillis; while (expiryTime >= System.currentTimeMillis()) { boolean result = tryAcquire(jedis); if (result) { // 获取锁成功直接返回,否则循环重试 return true; } if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) { Thread.yield(); } } } finally { if (jedis != null) { jedis.close(); } } return false; } /** * 释放锁 */ public void release() { Jedis jedis = null; try { jedis = jedisPool.getResource(); release(jedis); } finally { if (jedis != null) { jedis.close(); } } } /** * 释放锁 * * @param jedis */ private void release(Jedis jedis) { Lock currlock = lockThreadLocal.get(); if (currlock != null) { final String currentLockValue = jedis.get(lockKey); if (currentLockValue != null && currentLockValue.length() != 0) { final Lock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁 if (currlock.equals(currentLock)) { lockThreadLocal.remove(); jedis.del(lockKey); } } } } /** * 判断当前线程是否持有锁 * * 未持有锁或者锁超时,返回false * * @return */ public boolean isLocked() { Lock currlock = lockThreadLocal.get(); // 如果当前线程保存的lock不为null,并且未超时,则当前线程必然持有锁,锁未被意外释放 return currlock != null && !currlock.isExpired(); } /** * 判断指定的lock是否是当前线程持有的锁 * * @return */ boolean isMine(final Lock lock) { Lock currlock = lockThreadLocal.get(); return currlock != null && currlock.equals(lock); } /** * 判断锁是否超时 * * @param lock * @return */ boolean isExpired(final Lock lock) { return lock.isExpired(); } /** * 锁 */ protected static class Lock { private long expiryTime; // 锁的过期时间,注意,不是过期时长,单位纳秒 Lock(long expiryTime) { this.expiryTime = expiryTime; } /** * 解析字符串,根据解析出的过期时间构造Lock * * @param json * @return */ static Lock fromJson(String json) { return JSON.parseObject(json, Lock.class); } @Override public String toString() { return JSON.toJSONString(this, false); } public long getExpiryTime() { return expiryTime; } /** * 判断锁是否超时,如果锁的过期时间小于当前系统时间,则判定锁超时 * * @return */ boolean isExpired() { return this.expiryTime < System.nanoTime(); } @Override public boolean equals(Object obj) { return obj != null && obj instanceof Lock && this.expiryTime == ((Lock) obj).getExpiryTime(); } } }<strong><img class="" src="" crossorigin="anonymous" data-copyright="0" data-ratio="0.1625" data-s="300,640" data-src="https://mmbiz.qpic.cn/mmbiz_png/Pn4Sm0RsAuhSvZMAt2zKcxGQN3l1NV4Lf6AGBpotDb1DGltQgly1vKzgCCOtT3OQn43luu8r1JxUV1PmiaSAViaw/640?wx_fmt=png" data-type="png" data-w="160" /></strong> |
8. 优化
上面存在的锁覆盖问题是不可避免的,还有就是要求客户端时间同步。下面我们进一步优化这一问题。
Redis命令介绍:SET
1、语法:
1 |
SET key value [EX seconds] [PX milliseconds] [NX|XX] |
2、功能:
- 将字符串值 value 关联到 key 。
- 如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
- 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时,这个键原有的 TTL 将被清除。
3、可选参数:
从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:
- EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
- PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
- NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
- XX :只在键已经存在时,才对键进行设置操作。
因为 SET 命令可以通过参数来实现和 SETNX 、SETEX 和 PSETEX 三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。
4、返回值:
- 在 Redis 2.6.12 版本以前, SET 命令总是返回 OK 。
- 从 Redis 2.6.12 版本开始, SET 在设置操作成功完成时,才返回 OK。
- 如果设置了 NX 或者 XX ,但因为条件没达到而造成设置操作未执行,那么命令返回空批量回复(NULL Bulk Reply)。
5、使用模式:
命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。客户端执行以上的命令:
- 如果服务器返回 OK ,那么这个客户端获得锁。
- 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
设置的过期时间到达之后,锁将自动释放。可以通过以下修改,让这个锁实现更健壮:
- 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
- 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
以下是一个简单的解锁脚本示例:
1 2 3 4 5 6 |
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end |
6、源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
package com.github.hutu92; import com.alibaba.fastjson.JSON; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.util.Collections; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; /** * Created by liuchunlong on 2018/9/4. * <p> * 基于redis的分布式锁 v2 * <p> * 不需要客户端时间同步 */ public class DistributedLock { private static final long RETRY_BARRIER = 600; // 重试屏障,单位毫秒 private static final long INTERVAL_TIMES = 200; // 下一次重试等待,单位毫秒 private final JedisPool jedisPool; // redis连接池 private final String lockKey; // lock Key private final long lockExpiryInMillis; // 锁的过期时长,单位纳秒 private final ThreadLocal<Lock> lockThreadLocal = new ThreadLocal<Lock>(); /** * 构造方法 * * @param jedisPool redis连接池 * @param lockKey 锁的Key * @param lockExpiryInMillis 锁的过期时长,单位毫秒 */ public DistributedLock(JedisPool jedisPool, String lockKey, long lockExpiryInMillis) { this.jedisPool = jedisPool; this.lockKey = lockKey; this.lockExpiryInMillis = lockExpiryInMillis; } /** * 构造方法 * <p> * 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期 * * @param jedisPool redis连接池 * @param lockKey 锁的Key */ public DistributedLock(JedisPool jedisPool, String lockKey) { this(jedisPool, lockKey, Integer.MAX_VALUE); } /** * 获取锁在redis中的Key标记 * * @return locks key */ public String getLockKey() { return this.lockKey; } /** * 锁的过期时长 * * @return */ public long getLockExpiryInMillis() { return lockExpiryInMillis; } /** * can override * * @param jedis * @return */ private String nextUid(Jedis jedis) { // 可以考虑雪花算法.. return UUID.randomUUID().toString(); } private synchronized Jedis getClient() { return jedisPool.getResource(); } private synchronized void closeClient(Jedis jedis) { jedis.close(); } /** * 请求分布式锁,不会阻塞,直接返回 * * @param jedis redis 连接 * @return 成功获取锁返回true, 否则返回false */ private boolean tryAcquire(Jedis jedis) { final Lock nLock = new Lock(nextUid(jedis)); String result = jedis.set(this.lockKey, nLock.toString(), "NX", "PX", this.lockExpiryInMillis); if ("OK".equals(result)) { lockThreadLocal.set(nLock); return true; } return false; } /** * 请求分布式锁,不会阻塞,直接返回 * * @return 成功获取锁返回true, 否则返回false */ public boolean tryAcquire() { Jedis jedis = null; try { jedis = getClient(); return tryAcquire(jedis); } finally { if (jedis != null) { closeClient(jedis); } } } /** * 超时请求分布式锁,会阻塞 * * 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时 * * @param acquireTimeoutInMillis 锁的请求超时时长 * @return */ public boolean acquire(long acquireTimeoutInMillis) throws InterruptedException { Jedis jedis = null; try { jedis = getClient(); long acquireTime = System.currentTimeMillis(); long expiryTime = System.currentTimeMillis() + acquireTimeoutInMillis; // 锁的请求到期时间 while (expiryTime >= System.currentTimeMillis()) { boolean result = tryAcquire(jedis); if (result) { // 获取锁成功直接返回,否则循环重试 return true; } Thread.sleep(INTERVAL_TIMES); } } finally { if (jedis != null) { closeClient(jedis); } } return false; } /** * 释放锁 * * @return */ public boolean release() throws InterruptedException { return release(Integer.MAX_VALUE); } /** * 释放锁 * * @return */ public boolean release(long releaseTimeoutInMillis) throws InterruptedException { Jedis jedis = null; try { jedis = getClient(); return release(jedis, releaseTimeoutInMillis); } finally { if (jedis != null) { closeClient(jedis); } } } /** * 释放锁 * * @param jedis * @param releaseTimeoutInMillis * @return */ private boolean release(Jedis jedis, long releaseTimeoutInMillis) throws InterruptedException { Lock cLock = lockThreadLocal.get(); if (cLock == null) { System.out.println("lock is null!"); } if (cLock != null) { String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; long releaseTime = System.currentTimeMillis(); long expiryTime = System.currentTimeMillis() + releaseTimeoutInMillis; // 锁的释放到期时间 while (expiryTime >= System.currentTimeMillis()) { Object result = jedis.eval(luaScript, Collections.singletonList(this.lockKey), Collections.singletonList(cLock.toString())); if (((Long) result) == 1L) { lockThreadLocal.remove(); return true; } Thread.sleep(INTERVAL_TIMES); } } return false; } /** * 锁 */ protected static class Lock { private String uid; // lock 唯一标识 Lock(String uid) { this.uid = uid; } public String getUid() { return uid; } @Override public String toString() { return JSON.toJSONString(this, false); } } } |
9. 性能调优
这里我们使用ab性能测试工具来模拟测试。
由于没有使用队列,对高并发请求进行削峰,所以所有的压力都会被打到redis上。为了测试方便我这里只是本地启动了单机redis,没有做其它的调优配置。
我们并发测试场景是1000个并发请求,总共2000个请求。
1 |
ab -n 2000 -c 1000 "localhost:8080/lock/v2/seckill" |
上述的地址是一个接口,接口代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
@RestController @RequestMapping("/lock") public class LockController { private static LongAdder longAdder = new LongAdder(); private static Long ACQUIRE_TIMEOUT_IN_MILLIS = (long) Integer.MAX_VALUE; private static Long stock = 100000L; private static DistributedLock lock; static { longAdder.add(stock); } private final JedisPool jedisPool; @Autowired public LockController(JedisPool jedisPool) { this.jedisPool = jedisPool; lock = new DistributedLock(jedisPool, "seckillV2_" + UUID.randomUUID().toString()); } @GetMapping("/v2/seckill") public String seckillV2() throws InterruptedException { boolean acquireResult = false; try { acquireResult = lock.acquire(ACQUIRE_TIMEOUT_IN_MILLIS); if (!acquireResult) { return "人太多了,换个姿势操作一下!"; } if (longAdder.longValue() == 0L) { return "已抢光!"; } doSomeThing(jedisPool); longAdder.decrement(); System.out.println("已抢: " + (stock - longAdder.longValue()) + ", 还剩下: " + longAdder.longValue()); } finally { if (acquireResult) { boolean releaseResult = lock.release(); if (!releaseResult) { System.out.println("释放锁失败!"); } } } return "OK"; } private void doSomeThing(JedisPool jedisPool) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.incr("already_bought"); } finally { if (jedis != null) { jedis.close(); } } } } |
那么我们这里说的性能调优指的是什么呢?
仔细分析上面的源码你会发现,获取锁的逻辑是循环获取的,再每次循环之间,应该怎么去处理?如果不做任何处理,直接继续下一个循环,表面上看能够及时的获取锁,但这会给Redis更大的压力,如果Redis扛不住,到最后只会适得其反;而如果sleep等待,那么等待多久呢?等待久了,锁的获取和释放就会不及时;使用yield如何?等等……
1、
1 2 3 |
if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) { Thread.yield(); } |
请求获取锁的前600毫秒内直接循环重试,如果超过600毫秒还未获取到锁则每次循环都将线程推迟到下一个时间片执行。
主要参数说明:
- Failed requests:失败的请求;
- Time per request:每个请求的平均耗时。
2、
1 2 3 4 5 |
if ((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) { Thread.sleep(INTERVAL_TIMES); } else { Thread.yield(); } |
请求获取锁的前600毫秒内每次循环重试都先将线程推迟到下一个时间片,如果超过600毫秒还未获取到锁则每次循环都将线程休眠200毫秒。
很明显,出错率降低了很多,每个请求的耗时也减少了一半。这是因为No1中在600毫秒内的直接循环重试,会产生很多意义的请求,给Redis造成了巨大的压力,无法响应请求。
3、
1 |
Thread.sleep(INTERVAL_TIMES); |
请求获取锁的每次循环重试都将线程休眠200毫秒。
4、
1 |
Thread.sleep(INTERVAL_TIMES * 10); |
请求获取锁的每次循环重试都将线程休眠2秒。
很明显,休眠时间过长,会使部分线程请求锁的时间变长,不能够及时获取到锁。
5、
1 |
Thread.yield(); |
请求获取锁的每次循环重试都将线程推迟到下一个时间片执行。
总结
总的来说,No2与No3表现得都还可以。但是No2使用了Thread.yield();也会给Redis造成压力,我们对比下两者的 Percentage of the requests served within a certain time (ms) 数据。可以看到No3的90%以下请求的用户平均时间要明显低于No2的。所以最终我们选择No3策略。
当然你也可以根据你Redis的QPS自行调整策略。
作者:刘春龙,曾就职国美、优信,现就职于金山,担任消息队列服务开发。
[resource]程序员如何 Get 分布式锁的正确姿势