redis作为当今市场上常用的组件,在分布式应用系统中有着比较常见的应用,例如缓存,简单队列,分布式锁等。本文对Redis分布式锁做出简单实践。
实现锁的核心技术点
互斥性
保证同一时刻只能有一个客户端可以成功拿到锁。
安全性
只有加锁的客户端才有权限释放锁,也就是不能让A加锁,B或者C可以解锁。
避免死锁
死锁在多线程中变成非常常见,要破会产生死锁(互斥性,请求保持,不可剥夺,循环等待)必要条件,互斥性是在多线程中不可破坏,破会其余三个条件也是可以有效避免死锁问题。
原子操作
redis的每个指令都是一个原子操作,而且由于redis的单进程单线程特点,指令在后台队列中逐个等待执行,同一时刻只能有一个指令被成功执行;好在redis支持lua脚本将多个指令合并成一个原子指令执行
上锁 1 SET key value NX EX timeout
set指令 用于设置 KV
key value是存入redis键值对信息
NX 只有这个可以存在时才会进行操作
EX 设置这个键的过期时间,单位为秒;具体是多少由 timeout 数值决定
timeout 过期时间,这是避免死锁的关键,防止以外导致客户端断开,导致 锁(key)没有被及时释放出现的死锁。
除了上述的指令还可以使用
SETEX | Redis 官方原文介绍 setex是一个时间复杂度 O(1) 且是原子操作的指令
SETEX
is atomic, and can be reproduced by using the previous two commands inside an MULTI
/ EXEC
block. It is provided as a faster alternative to the given sequence of operations, because this operation is very common when Redis is used as a cache.
先看反例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @SpringBootTest public class RedisTest { @Autowired RedisTemplate<String, Object> redisTemplate; @Test public void testSetValue () throws Exception { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.set("distributeLock" ,"test" ); ops.setIfAbsent("distributeLock" ,"test" ); } }
我们翻开setIfAbsent源码,正是使用setnx指令,并不是setex
1 2 3 4 5 6 public Boolean setIfAbsent (K key, V value) { byte [] rawKey = rawKey(key); byte [] rawValue = rawValue(value); return execute(connection -> connection.setNX(rawKey, rawValue)); }
这就存在一个问题:加入客户端由于某种原因导致连接断开,key没有被及时删除,redis中就会一直存在;这就造成了死锁问题;正确的做法就是使用setex指令设置一个带有过期的key
1 2 3 4 5 6 7 8 9 @Autowired RedisTemplate<String, Object> redisTemplate; @Test public void testSetValue () throws Exception { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.setIfAbsent("distributeLock" ,"test" ,15 , TimeUnit.SECONDS); }
解锁 redis所有指令中只有一个del指令用于删除key,他的时间复杂度是O(n),其中n是key的数量,如果只有一个key,name时间复杂度就是O(1),Java客户端中也对应这样的操作方式
1 2 3 4 5 6 @Test public void testdDleteValue () throws Exception { if (redisTemplate.hasKey("distributeLock" )) { redisTemplate.delete("distributeLock" ); } }
然而者并非原子操作,redis指令执行队列中 判断key知否存在之后的命令并非一定是本客户端的删除指令,删除key时刻是指令在执行是,key也不一定存在。正确的做法应该使用lua脚本执行判断并删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private final static String UNLOCK_LUA = "if redis.call(\"get\",KEYS[1]) == ARGV[1] " + "then " + " return redis.call(\"del\",KEYS[1]) " + "else " + " return 0 " + "end " ; @Test public void testUnlock () throws Exception { try { redisTemplate.execute((RedisCallback<Boolean>) connection -> { byte [] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA); return connection.eval(scriptByte, ReturnType.BOOLEAN, 1 , redisTemplate.getStringSerializer().serialize("distributeLock" ), redisTemplate.getStringSerializer().serialize("distributeLock" )); }); } catch (Exception e) { e.printStackTrace(); } }
操作到此,基本实现上锁与解锁,在redis上的原子操作;整体代码如下
然而,并不能保证所对象是同一把,及所本身的原子;在实际生产过程中请参考key的合适表达,例如:在支付操作中请求连接url作为锁等,最小共享单元作为锁的唯一对象;在SAAS系统中一般采用 tenantid:business:k 组合主键作为锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Autowired RedisTemplate<String, Object> redisTemplate; @Test public void testSetValue () throws Exception { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.setIfAbsent("distributeLock" , "test" , 15 , TimeUnit.SECONDS); } private final static String UNLOCK_LUA = "if redis.call(\"get\",KEYS[1]) == ARGV[1] " + "then " + " return redis.call(\"del\",KEYS[1]) " + "else " + " return 0 " + "end " ; @Test public void testUnlock () throws Exception { try { redisTemplate.execute((RedisCallback<Boolean>) connection -> { byte [] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA); return connection.eval(scriptByte, ReturnType.BOOLEAN, 1 , redisTemplate.getStringSerializer().serialize("distributeLock" ), redisTemplate.getStringSerializer().serialize("distributeLock" )); }); } catch (Exception e) { e.printStackTrace(); } }
安全性 在上锁过程中我们遗留一个问题,过期时间设置多少合适。
过长会出现获取到锁的对象出现意外,导致不能主动释放锁;过短也会存在问题,加入设置15s,但是我们无法保证在15s过期时间内一定处理完业务可是锁已经自动过期释放了。这就出现了安全问题。不设置容易出现死锁问题。分布式高并发系统中分布式锁应该及早发现主动释放,过期时间应该尽可能短;同时也应该保证在持有锁的这段时间内锁不能被释放。解决问题的唯一有效手段–子线程续命;以下截取redisson源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
如上述,在redis上锁的这段时间 fork一个子线程不停通过 lua脚本重置过期时间,如果业务还在正常走,最终由获取锁的客户端主动释放锁;如果出现异常,主线程停止,子线程也会直接停止,不再续命,过期自动释放
Redisson分布式锁最佳实践
上锁操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.cancel(false )) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null ) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false ; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } }
解锁操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } public RFuture<Void> unlockAsync (long threadId) { RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null ) { throw new CompletionException (e); } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException (cause); } return null ; }); return new CompletableFutureWrapper <>(f); } protected RFuture<Boolean> unlockInnerAsync (long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
实践应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @SneakyThrows @Override @Transactional(rollbackFor = Exception.class) public void loadFlow () { Date date = new Date (); long endTime = DateUtils.addMinutes(date, 30 ).getTime(); long now = date.getTime(); while (needLoadBagFlow() && now < endTime) { String lockKey = "loadingBagFlow:" +ContextUtil.getTenantId; RLock rLock = redissonClient.getLock(lockKey); if (rLock.tryLock(30 , 30 , TimeUnit.SECONDS)) { rLock.unlock(); } else { log.info("there is a job was running" ); } now = System.currentTimeMillis(); } }
总结 实际生产中不一定非得要全部写出所有的操作,我们可以借助许多优秀的实践组件完成必要业务处理,但是对于原理一定要有深度理解学习;出现问题从原理上能够帮助我们快速定位问题
参考
Redis分布式事务锁的原理(上) - 腾讯云开发者社区-腾讯云
DEL | Redis
SETEX | Redis
8. 分布式锁和同步器 · redisson/redisson Wiki (github.com) 推荐阅读,详细介绍redisson实现的java公平锁和非公平锁实现和应用