29.Redisson的分布式锁剖析

Redission的底层原理是一段Lua脚本

evalWriteAsync(getName(), LongCodec.INSTANCE, command,

“if (redis.call(‘exists’, KEYS[1]) == 0) then ” +

“redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” +

“redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +

“return nil; ” +

“end; ” +

“if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” +

“redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” +

“redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +

“return nil; ” +

“end; ” +

“return redis.call(‘pttl’, KEYS[1]);”,

Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

上面一段Lua脚本中KEYS[1]表示加锁的key,而ARGV[1]是锁的过期时间,默认30s

ARGV[2]表示是锁的ID,是一个随机字符串

最后的1,是为了做重入锁是的统计

整体就是先判断key存不存在,不存在就加锁,设置一个hash,然后执行设置过期的命令

如果存在,那么就获取这个key的剩余生存时间

然后是上层代码

上面代码中,首先执行Lua脚本,然后获取到过期时间

Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

// lock acquired

if (ttl == null) {

return true;

}

因为ttl不为null,所以订阅锁释放通知

current = System.currentTimeMillis();

RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);

if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {

if (!subscribeFuture.cancel(false)) {

subscribeFuture.onComplete((res, e) -> {

if (e == null) {

unsubscribe(subscribeFuture, threadId);

}

});

}

acquireFailed(waitTime, unit, threadId);

return false;

}

最后同步的等待锁

while (true) {

long currentTime = System.currentTimeMillis();

ttl = tryAcquire(waitTime, leaseTime, unit, threadId);

// lock acquired

if (ttl == null) {

return true;

}

time -= System.currentTimeMillis() – currentTime;

if (time <= 0) {

acquireFailed(waitTime, unit, threadId);

return false;

}

// waiting for message

currentTime = System.currentTimeMillis();

if (ttl >= 0 && ttl < time) {

subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

} else {

subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);

}

time -= System.currentTimeMillis() – currentTime;

if (time <= 0) {

acquireFailed(waitTime, unit, threadId);

return false;

}

}

整体下来

就是尝试获取到锁,返回null说明成功

失败了就订阅锁释放的时间,如果一直没等到锁的释放事件且超时了,就返回false

等待了通知,就不断的重试获取锁

重试中,每次都获得存在的锁的剩余时间没如果获取到了直接返回,不行就阻塞线程

尝试获取

接下来说一下Redisson的可重入机制

在上面的LUA代码中

“if (redis.call(‘exists’, KEYS[1]) == 0) then ” +

“redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” +

“redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +

“return nil; ” +

“end; ” +

“if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then ” +

“redis.call(‘hincrby’, KEYS[1], ARGV[2], 1); ” +

“redis.call(‘pexpire’, KEYS[1], ARGV[1]); ” +

“return nil; ” +

“end; ” +

“return redis.call(‘pttl’, KEYS[1]);”,

上面第一个if必然不成立,而第二if成立之后,会判断是否是传入的客户端ID,执行incr命令,给其增加1

于是乎就变成了

mylock:xxxxx:2

那么我们顺便说一下释放锁的流程,主要还是Lua脚本

return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

“if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) then ” +

“return nil;” +

“end; ” +

“local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1); ” +

“if (counter > 0) then ” +

“redis.call(‘pexpire’, KEYS[1], ARGV[2]); ” +

“return 0; ” +

“else ” +

“redis.call(‘del’, KEYS[1]); ” +

“redis.call(‘publish’, KEYS[2], ARGV[1]); ” +

“return 1; ” +

“end; ” +

“return nil;”,

Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

整体将就是判断锁是否存在

然后尝试减一,如果已经到达了0

那么就删除这个key

整体来看Redission支持了可重入锁,而且加锁的时候利用了Redis的pubsub机制,相对高效,是一个不错的实现

发表评论

邮箱地址不会被公开。 必填项已用*标注