I. 分布式锁
分布式锁的主要目的是:在分布式系统多进程下,保证只有一个进程能够执行。
参考单机的锁特点来说,我们实现的分布式锁需要拥有以下特性:
- 互斥性
- 不发生死锁
- 正确释放锁
- 高性能
- 高可用
最好还能支持:
- 可重入
- 阻塞获取/超时获取/中断获取
- 公平/非公平
II. Redis分布式锁
Redis获取分布式锁主要是靠 setnx
命令来实现的,该命令表示当 key 存在时,set 失败,返回为0,key 不存在 set 成功返回 1。
单机Redis分布式锁实现
分布式锁的主要关注点在于获取锁、释放锁以及避免死锁,在实现一个锁工具支持的功能时可以考虑的文章开始的8个点。Redis分布式锁的实现,其中获取锁主要依靠 setnx
命令,释放锁删除对应的 key 即可,避免死锁则依靠 key 的自动失效来保证。
目前看到其他的文章主要的实现方式以下几种:
分布式锁看这篇就够了
获取锁:当一个进程前来请求分布式锁,首先通过setnx
来尝试获取锁,如果获取锁成功,OK,设置好过期时间,可以返回去干自己事情了。如果setnx
没获取成功,事情麻烦了,说明肯定有人拿到锁了。这个时候就要看看拿锁的进程是不是该让让位了——比较当前时间和锁里设置的过期时间(通过get
查看),如果没到过期时间,好吧,自己就只好放弃或者继续等待了。如果发现当前时间已经超过过期时间了,不好意思,自己获取锁咯,利用getset
设置一个新的过期时间,拿出旧的过期时间。看看旧的过期时间是不是和上一次看到的一样,卧槽,竟然不一样说明被哪个进程先下手给改了新的过期时间,那自己又没获取锁成功。
释放锁:直接删除key
这种实现思路乍一看没问题,其实问题很多,首先没有正确的释放锁。比如:当A进程获取到锁,由于某些原因执行时间较久,到了锁过期时间自动释放锁。这时,B进程过来获取锁,也执行一段代码,正在执行着,A进程释放锁(直接删除key),结果把B的锁反而给释放了。这出大事情了!!C进程可以获取到锁了…
这种方式的改进在于释放锁时,不要直接释放,而是判断只有 key 未过期才进行释放。这样A就不会删除B设置的 key 了。也就是说,只有未超时就结束业务的进程才能释放锁,锁的互斥性简介保证了正确的释放锁——解铃还须系铃人。但还是没有完全解决问题,因为可能刚判断完key未过期,然后当前线程停顿了一会,在这过程中发生了锁到了失效时间被其他进程获取,然后原来线程释放锁,GG,还是出现之前的莫名其妙释放锁问题。
其次,该方案利用时间戳,决定了该方法强依赖分布式系统的时间同步。如果不同步,必然会给算法带来影响。
同时,getset
是一个原子操作,但是每一个尝试获取锁的进程都会进行 set
,也就是说这一步其实是会发生覆盖操作的。比如AB两进程同时获取锁,A getset
成功获取锁,B也进行getset
获取,B会把新的过期时间设置进去,A的过期时间返回出来。但B获取锁是失败的,却莫名其妙更新的A的过期时间,这是不合理的。
另一种极为常见的实现方案:
setnx
和expire
两步操作非原子、释放锁检查和释放动作两步操作也非原子
首先我们分析setnx
和expire
两步操作非原子带来的影响,如果先setnx
成功,如果此时因为某种原因,当前进程挂了,则无法expire
,最终导致死锁。这其实问题还是不可忽略的。
其次释放锁时讲究解铃还须系铃人,那么就需要进行锁的检查,是否是自己的。当锁检查完毕时自己加上的,那么便可以释放锁。这两步动作如果不是原子,出现了胡乱释放锁的问题前文已经讨论过。
那么考虑到以上细节,这里实现一个单机版的Redis锁实现。
@Slf4j
public class RedisLock {
private RedisTemplate<String, Serializable> redisTemplate;
private Integer sleepInterval;
public RedisLock(RedisTemplate<String, Serializable> redisTemplate) {
this(redisTemplate, 5);
}
public RedisLock(RedisTemplate<String, Serializable> redisTemplate, Integer sleepInterval) {
this.redisTemplate = redisTemplate;
this.sleepInterval = sleepInterval;
}
/**
* 阻塞获取锁
*/
public void lock(String key, String lockId, long expireTime, TimeUnit timeUnit) {
for (;;) {
if (tryLock(key, lockId, expireTime, timeUnit)) {
return;
}
try {
TimeUnit.MILLISECONDS.sleep(sleepInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 快速尝试获取锁
* set nx ex 整体原子性
* @param key 锁key
* @param lockId 锁value(区分锁的id号)
* @param expireTime 锁的过期时间
* @param timeUnit 时间单位
* @return
*/
public boolean tryLock(String key, String lockId, long expireTime, TimeUnit timeUnit) {
Boolean res = redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(), lockId.getBytes(), Expiration.from(expireTime, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
if (Boolean.TRUE.equals(res)) {
log.info("Thread {} get lock success, key:{} lockId:{} expire:{} timeUnit:{}.", Thread.currentThread(), key, lockId, expireTime, timeUnit);
return true;
}
return false;
}
/**
* 超时获取锁
* @param key 锁key
* @param lockId 锁value(区分锁的id号)
* @param expireTime 锁的过期时间
* @param time 获取锁超时时间
* @param unit 时间单位
* @return
* @throws InterruptedException
*/
public boolean tryLock(String key, String lockId, long expireTime, long time, TimeUnit unit) {
long timeout = System.currentTimeMillis() + unit.toMillis(time);
while (System.currentTimeMillis() <= timeout) {
if (tryLock(key, lockId, expireTime, unit)) {
return true;
}
try {
TimeUnit.MILLISECONDS.sleep(sleepInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return false;
}
/**
* 释放锁
* 如果锁还未释放,则释放自己的锁(原子操作,且保证了解铃还须系铃人)
* @param key 锁的key
* @param lockId 锁id
* @return
*/
public boolean unlock(String key, String lockId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long res = redisTemplate.execute((RedisCallback<Long>) connection -> connection.eval(script.getBytes(), ReturnType.INTEGER, 1, key.getBytes(), lockId.getBytes()));
if (Long.valueOf(1).equals(res)) {
log.info("Thread {} release lock success, key:{} lockId:{}.", Thread.currentThread(), key, lockId)