29.Redisson的分布式锁剖析
Redission的底层原理是一段Lua脚本
evalWriteAsync(getName(), LongCodec.INSTANCE, command,
“if (redis.call(‘exists’, KEYS[1]) == 0) then ” + “redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” + “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” + “redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” + “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + “return redis.call(‘pttl’, KEYS[1]);”, Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); |
上面一段Lua脚本中KEYS[1]表示加锁的key,而ARGV[1]是锁的过期时间,默认30s
ARGV[2]表示是锁的ID,是一个随机字符串
最后的1,是为了做重入锁是的统计
整体就是先判断key存不存在,不存在就加锁,设置一个hash,然后执行设置过期的命令
如果存在,那么就获取这个key的剩余生存时间
然后是上层代码
上面代码中,首先执行Lua脚本,然后获取到过期时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired if (ttl == null) { return true; } |
因为ttl不为null,所以订阅锁释放通知
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } |
最后同步的等待锁
while (true) {
long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() – currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() – currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } |
整体下来
就是尝试获取到锁,返回null说明成功
失败了就订阅锁释放的时间,如果一直没等到锁的释放事件且超时了,就返回false
等待了通知,就不断的重试获取锁
重试中,每次都获得存在的锁的剩余时间没如果获取到了直接返回,不行就阻塞线程
尝试获取
接下来说一下Redisson的可重入机制
在上面的LUA代码中
“if (redis.call(‘exists’, KEYS[1]) == 0) then ” +
“redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” + “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + “if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” + “redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” + “redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + “return redis.call(‘pttl’, KEYS[1]);”, |
上面第一个if必然不成立,而第二if成立之后,会判断是否是传入的客户端ID,执行incr命令,给其增加1
于是乎就变成了
mylock:xxxxx:2
那么我们顺便说一下释放锁的流程,主要还是Lua脚本
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
“if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) then ” + “return nil;” + “end; ” + “local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1); ” + “if (counter > 0) then ” + “redis.call(‘pexpire’, KEYS[1], ARGV[2]); ” + “return 0; ” + “else ” + “redis.call(‘del’, KEYS[1]); ” + “redis.call(‘publish’, KEYS[2], ARGV[1]); ” + “return 1; ” + “end; ” + “return nil;”, Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); |
整体将就是判断锁是否存在
然后尝试减一,如果已经到达了0
那么就删除这个key
整体来看Redission支持了可重入锁,而且加锁的时候利用了Redis的pubsub机制,相对高效,是一个不错的实现