上锁 1 SET key value NX EX timeout
set指令 用于设置 KV
key value是存入redis键值对信息
NX 只有这个可以存在时才会进行操作
EX 设置这个键的过期时间,单位为秒;具体是多少由 timeout 数值决定
timeout 过期时间,这是避免死锁的关键,防止以外导致客户端断开,导致 锁(key)没有被及时释放出现的死锁。
SETEX | Redis 官方原文介绍 setex是一个时间复杂度 O(1) 且是原子操作的指令
is atomic, and can be reproduced by using the previous two commands inside an MULTI
block. It is provided as a faster alternative to the given sequence of operations, because this operation is very common when Redis is used as a cache.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @SpringBootTest public class RedisTest { @Autowired RedisTemplate<String, Object> redisTemplate; @Test public void testSetValue () throws Exception { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.set("distributeLock" ,"test" ); ops.setIfAbsent("distributeLock" ,"test" ); } }
1 2 3 4 5 6 public Boolean setIfAbsent (K key, V value) { byte [] rawKey = rawKey(key); byte [] rawValue = rawValue(value); return execute(connection -> connection.setNX(rawKey, rawValue)); }
1 2 3 4 5 6 7 8 9 @Autowired RedisTemplate<String, Object> redisTemplate; @Test public void testSetValue () throws Exception { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.setIfAbsent("distributeLock" ,"test" ,15 , TimeUnit.SECONDS); }
解锁 redis所有指令中只有一个del指令用于删除key,他的时间复杂度是O(n),其中n是key的数量,如果只有一个key,name时间复杂度就是O(1),Java客户端中也对应这样的操作方式
1 2 3 4 5 6 @Test public void testdDleteValue () throws Exception { if (redisTemplate.hasKey("distributeLock" )) { redisTemplate.delete("distributeLock" ); } }
然而者并非原子操作,redis指令执行队列中 判断key知否存在之后的命令并非一定是本客户端的删除指令,删除key时刻是指令在执行是,key也不一定存在。正确的做法应该使用lua脚本执行判断并删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private final static String UNLOCK_LUA = "if\"get\",KEYS[1]) == ARGV[1] " + "then " + " return\"del\",KEYS[1]) " + "else " + " return 0 " + "end " ; @Test public void testUnlock () throws Exception { try { redisTemplate.execute((RedisCallback<Boolean>) connection -> { byte [] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA); return connection.eval(scriptByte, ReturnType.BOOLEAN, 1 , redisTemplate.getStringSerializer().serialize("distributeLock" ), redisTemplate.getStringSerializer().serialize("distributeLock" )); }); } catch (Exception e) { e.printStackTrace(); } }
然而,并不能保证所对象是同一把,及所本身的原子;在实际生产过程中请参考key的合适表达,例如:在支付操作中请求连接url作为锁等,最小共享单元作为锁的唯一对象;在SAAS系统中一般采用 tenantid:business:k 组合主键作为锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Autowired RedisTemplate<String, Object> redisTemplate; @Test public void testSetValue () throws Exception { ValueOperations<String, Object> ops = redisTemplate.opsForValue(); ops.setIfAbsent("distributeLock" , "test" , 15 , TimeUnit.SECONDS); } private final static String UNLOCK_LUA = "if\"get\",KEYS[1]) == ARGV[1] " + "then " + " return\"del\",KEYS[1]) " + "else " + " return 0 " + "end " ; @Test public void testUnlock () throws Exception { try { redisTemplate.execute((RedisCallback<Boolean>) connection -> { byte [] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA); return connection.eval(scriptByte, ReturnType.BOOLEAN, 1 , redisTemplate.getStringSerializer().serialize("distributeLock" ), redisTemplate.getStringSerializer().serialize("distributeLock" )); }); } catch (Exception e) { e.printStackTrace(); } }
安全性 在上锁过程中我们遗留一个问题,过期时间设置多少合适。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if ('exists', KEYS[1]) == 0) then " + "'hincrby', KEYS[1], ARGV[2], 1); " + "'pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if ('hexists', KEYS[1], ARGV[2]) == 1) then " + "'hincrby', KEYS[1], ARGV[2], 1); " + "'pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return'pttl', KEYS[1]);" , Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
如上述,在redis上锁的这段时间 fork一个子线程不停通过 lua脚本重置过期时间,如果业务还在正常走,最终由获取锁的客户端主动释放锁;如果出现异常,主线程停止,子线程也会直接停止,不再续命,过期自动释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } current = System.currentTimeMillis(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); try { subscribeFuture.get(time, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { if (!subscribeFuture.cancel(false )) { subscribeFuture.whenComplete((res, ex) -> { if (ex == null ) { unsubscribe(res, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false ; } catch (ExecutionException e) { acquireFailed(waitTime, unit, threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } } } finally { unsubscribe(commandExecutor.getNow(subscribeFuture), threadId); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } public RFuture<Void> unlockAsync (long threadId) { RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null ) { throw new CompletionException (e); } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException (cause); } return null ; }); return new CompletableFutureWrapper <>(f); } protected RFuture<Boolean> unlockInnerAsync (long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if ('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter ='hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "'pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "'del', KEYS[1]); " + "'publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
实践应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @SneakyThrows @Override @Transactional(rollbackFor = Exception.class) public void loadFlow () { Date date = new Date (); long endTime = DateUtils.addMinutes(date, 30 ).getTime(); long now = date.getTime(); while (needLoadBagFlow() && now < endTime) { String lockKey = "loadingBagFlow:" +ContextUtil.getTenantId; RLock rLock = redissonClient.getLock(lockKey); if (rLock.tryLock(30 , 30 , TimeUnit.SECONDS)) { rLock.unlock(); } else {"there is a job was running" ); } now = System.currentTimeMillis(); } }
总结 实际生产中不一定非得要全部写出所有的操作,我们可以借助许多优秀的实践组件完成必要业务处理,但是对于原理一定要有深度理解学习;出现问题从原理上能够帮助我们快速定位问题
Redis分布式事务锁的原理(上) - 腾讯云开发者社区-腾讯云
DEL | Redis
SETEX | Redis
8. 分布式锁和同步器 · redisson/redisson Wiki ( 推荐阅读,详细介绍redisson实现的java公平锁和非公平锁实现和应用