redis作为当今市场上常用的组件,在分布式应用系统中有着比较常见的应用,例如缓存,简单队列,分布式锁等。本文对Redis分布式锁做出简单实践。

实现锁的核心技术点

  • 互斥性

    保证同一时刻只能有一个客户端可以成功拿到锁。

  • 安全性

    只有加锁的客户端才有权限释放锁,也就是不能让A加锁,B或者C可以解锁。

  • 避免死锁

    死锁在多线程中变成非常常见,要破会产生死锁(互斥性,请求保持,不可剥夺,循环等待)必要条件,互斥性是在多线程中不可破坏,破会其余三个条件也是可以有效避免死锁问题。

  • 原子操作

    redis的每个指令都是一个原子操作,而且由于redis的单进程单线程特点,指令在后台队列中逐个等待执行,同一时刻只能有一个指令被成功执行;好在redis支持lua脚本将多个指令合并成一个原子指令执行

上锁

1
SET key value NX EX timeout

set指令 用于设置 KV

key value是存入redis键值对信息

NX 只有这个可以存在时才会进行操作

EX 设置这个键的过期时间,单位为秒;具体是多少由 timeout 数值决定

timeout 过期时间,这是避免死锁的关键,防止以外导致客户端断开,导致 锁(key)没有被及时释放出现的死锁。

除了上述的指令还可以使用

SETEX | Redis官方原文介绍 setex是一个时间复杂度 O(1) 且是原子操作的指令

SETEX is atomic, and can be reproduced by using the previous two commands inside an MULTI / EXEC block. It is provided as a faster alternative to the given sequence of operations, because this operation is very common when Redis is used as a cache.

先看反例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
public class RedisTest {

@Autowired
RedisTemplate<String, Object> redisTemplate;

@Test
public void testSetValue() throws Exception {
ValueOperations<String, Object> ops = redisTemplate.opsForValue();

ops.set("distributeLock","test"); // 没有容错
ops.setIfAbsent("distributeLock","test"); // key有容错,但是不能避免死锁
}
}

我们翻开setIfAbsent源码,正是使用setnx指令,并不是setex

1
2
3
4
5
6
public Boolean setIfAbsent(K key, V value) {

byte[] rawKey = rawKey(key);
byte[] rawValue = rawValue(value);
return execute(connection -> connection.setNX(rawKey, rawValue));
}

这就存在一个问题:加入客户端由于某种原因导致连接断开,key没有被及时删除,redis中就会一直存在;这就造成了死锁问题;正确的做法就是使用setex指令设置一个带有过期的key

1
2
3
4
5
6
7
8
9
@Autowired
RedisTemplate<String, Object> redisTemplate;

@Test
public void testSetValue() throws Exception {
ValueOperations<String, Object> ops = redisTemplate.opsForValue();
// 这里先留疑问,过期时间到底设置多少合适
ops.setIfAbsent("distributeLock","test",15, TimeUnit.SECONDS);
}

解锁

redis所有指令中只有一个del指令用于删除key,他的时间复杂度是O(n),其中n是key的数量,如果只有一个key,name时间复杂度就是O(1),Java客户端中也对应这样的操作方式

1
2
3
4
5
6
@Test
public void testdDleteValue() throws Exception {
if (redisTemplate.hasKey("distributeLock")) {
redisTemplate.delete("distributeLock");
}
}

然而者并非原子操作,redis指令执行队列中 判断key知否存在之后的命令并非一定是本客户端的删除指令,删除key时刻是指令在执行是,key也不一定存在。正确的做法应该使用lua脚本执行判断并删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final static String UNLOCK_LUA =
"if redis.call(\"get\",KEYS[1]) == ARGV[1] " +
"then " +
" return redis.call(\"del\",KEYS[1]) " +
"else " +
" return 0 " +
"end ";

@Test
public void testUnlock() throws Exception {
try {
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
redisTemplate.execute((RedisCallback<Boolean>) connection -> {
byte[] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA);
return connection.eval(scriptByte, ReturnType.BOOLEAN, 1
, redisTemplate.getStringSerializer().serialize("distributeLock"),
redisTemplate.getStringSerializer().serialize("distributeLock"));
});
} catch (Exception e) {
e.printStackTrace();
}
}

操作到此,基本实现上锁与解锁,在redis上的原子操作;整体代码如下

然而,并不能保证所对象是同一把,及所本身的原子;在实际生产过程中请参考key的合适表达,例如:在支付操作中请求连接url作为锁等,最小共享单元作为锁的唯一对象;在SAAS系统中一般采用 tenantid:business:k 组合主键作为锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    @Autowired
RedisTemplate<String, Object> redisTemplate;

@Test
public void testSetValue() throws Exception {
ValueOperations<String, Object> ops = redisTemplate.opsForValue();
// 这里先留疑问,过期时间到底设置多少合适
ops.setIfAbsent("distributeLock", "test", 15, TimeUnit.SECONDS);
}

private final static String UNLOCK_LUA =
"if redis.call(\"get\",KEYS[1]) == ARGV[1] " +
"then " +
" return redis.call(\"del\",KEYS[1]) " +
"else " +
" return 0 " +
"end ";

@Test
public void testUnlock() throws Exception {
try {
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
redisTemplate.execute((RedisCallback<Boolean>) connection -> {
byte[] scriptByte = redisTemplate.getStringSerializer().serialize(UNLOCK_LUA);
return connection.eval(scriptByte, ReturnType.BOOLEAN, 1
, redisTemplate.getStringSerializer().serialize("distributeLock"),
redisTemplate.getStringSerializer().serialize("distributeLock"));
});
} catch (Exception e) {
e.printStackTrace();
}
}

安全性

在上锁过程中我们遗留一个问题,过期时间设置多少合适。

过长会出现获取到锁的对象出现意外,导致不能主动释放锁;过短也会存在问题,加入设置15s,但是我们无法保证在15s过期时间内一定处理完业务可是锁已经自动过期释放了。这就出现了安全问题。不设置容易出现死锁问题。分布式高并发系统中分布式锁应该及早发现主动释放,过期时间应该尽可能短;同时也应该保证在持有锁的这段时间内锁不能被释放。解决问题的唯一有效手段–子线程续命;以下截取redisson源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

如上述,在redis上锁的这段时间 fork一个子线程不停通过 lua脚本重置过期时间,如果业务还在正常走,最终由获取锁的客户端主动释放锁;如果出现异常,主线程停止,子线程也会直接停止,不再续命,过期自动释放

Redisson分布式锁最佳实践

  • 首先redisson中的所有上锁、解锁和续命操作都是采用lua脚本实现原子操作

  • redisson中完美结合spring-redis,不需要额外配置

上锁操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取锁超时等待时间
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
// 当前线程id
long threadId = Thread.currentThread().getId();
// 上锁
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 获取失败一定要及时释放资源
acquireFailed(waitTime, unit, threadId);
return false;
}

current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}

try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

while (true) {
// 如果首次获取失败,在这里循环重试,直至获取到,或者超时获取失败
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}

// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}

解锁操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}

public RFuture<Void> unlockAsync(long threadId) {
// 释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);

CompletionStage<Void> f = future.handle((opStatus, e) -> {
// 使用异步操作,先停止续命
cancelExpirationRenewal(threadId);

if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
throw new CompletionException(cause);
}

return null;
});

return new CompletableFutureWrapper<>(f);
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 执行删除 锁 key的原子操作
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

实践应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SneakyThrows
@Override
@Transactional(rollbackFor = Exception.class)
public void loadFlow() {
Date date = new Date();
long endTime = DateUtils.addMinutes(date, 30).getTime();
long now = date.getTime();

// 开启刷新数据,条件 数据库中有数据需要刷,且开启刷新时间在30分钟内
while (needLoadBagFlow() && now < endTime) {
//分布式锁控制刷新,查询接口中有并发限制
String lockKey = "loadingBagFlow:"+ContextUtil.getTenantId;
RLock rLock = redissonClient.getLock(lockKey);
if (rLock.tryLock(30, 30, TimeUnit.SECONDS)) {
//查询35 * 50 条最大数据容量
// 处理业务逻辑
rLock.unlock();
} else {
log.info("there is a job was running");
}
//刷新时间
now = System.currentTimeMillis();
}
}

总结

实际生产中不一定非得要全部写出所有的操作,我们可以借助许多优秀的实践组件完成必要业务处理,但是对于原理一定要有深度理解学习;出现问题从原理上能够帮助我们快速定位问题

参考

Redis分布式事务锁的原理(上) - 腾讯云开发者社区-腾讯云

DEL | Redis

SETEX | Redis

8. 分布式锁和同步器 · redisson/redisson Wiki (github.com) 推荐阅读,详细介绍redisson实现的java公平锁和非公平锁实现和应用