细说分布式锁实现

本文详细探讨了分布式锁的概念及其重要特性,并分别介绍了使用Redis和ZooKeeper实现分布式锁的机制。针对Redis,讨论了单机实现及RedLock算法,而ZooKeeper的实现则依赖于临时节点的创建与监听。文章分析了各种实现的优缺点,包括潜在的并发问题和死锁风险,并推荐了相关资源进行深入研究。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

I. 分布式锁

分布式锁的主要目的是:在分布式系统多进程下,保证只有一个进程能够执行。

参考单机的锁特点来说,我们实现的分布式锁需要拥有以下特性:

  • 互斥性
  • 不发生死锁
  • 正确释放锁
  • 高性能
  • 高可用

最好还能支持:

  • 可重入
  • 阻塞获取/超时获取/中断获取
  • 公平/非公平

II. Redis分布式锁

Redis获取分布式锁主要是靠 setnx 命令来实现的,该命令表示当 key 存在时,set 失败,返回为0,key 不存在 set 成功返回 1。

单机Redis分布式锁实现

分布式锁的主要关注点在于获取锁、释放锁以及避免死锁,在实现一个锁工具支持的功能时可以考虑的文章开始的8个点。Redis分布式锁的实现,其中获取锁主要依靠 setnx 命令,释放锁删除对应的 key 即可,避免死锁则依靠 key 的自动失效来保证。

目前看到其他的文章主要的实现方式以下几种:

分布式锁看这篇就够了
 
获取锁:当一个进程前来请求分布式锁,首先通过 setnx 来尝试获取锁,如果获取锁成功,OK,设置好过期时间,可以返回去干自己事情了。如果 setnx 没获取成功,事情麻烦了,说明肯定有人拿到锁了。这个时候就要看看拿锁的进程是不是该让让位了——比较当前时间和锁里设置的过期时间(通过 get 查看),如果没到过期时间,好吧,自己就只好放弃或者继续等待了。如果发现当前时间已经超过过期时间了,不好意思,自己获取锁咯,利用 getset 设置一个新的过期时间,拿出旧的过期时间。看看旧的过期时间是不是和上一次看到的一样,卧槽,竟然不一样说明被哪个进程先下手给改了新的过期时间,那自己又没获取锁成功。
释放锁:直接删除key

这种实现思路乍一看没问题,其实问题很多,首先没有正确的释放锁。比如:当A进程获取到锁,由于某些原因执行时间较久,到了锁过期时间自动释放锁。这时,B进程过来获取锁,也执行一段代码,正在执行着,A进程释放锁(直接删除key),结果把B的锁反而给释放了。这出大事情了!!C进程可以获取到锁了…

这种方式的改进在于释放锁时,不要直接释放,而是判断只有 key 未过期才进行释放。这样A就不会删除B设置的 key 了。也就是说,只有未超时就结束业务的进程才能释放锁,锁的互斥性简介保证了正确的释放锁——解铃还须系铃人。但还是没有完全解决问题,因为可能刚判断完key未过期,然后当前线程停顿了一会,在这过程中发生了锁到了失效时间被其他进程获取,然后原来线程释放锁,GG,还是出现之前的莫名其妙释放锁问题。

其次,该方案利用时间戳,决定了该方法强依赖分布式系统的时间同步。如果不同步,必然会给算法带来影响。

同时,getset 是一个原子操作,但是每一个尝试获取锁的进程都会进行 set,也就是说这一步其实是会发生覆盖操作的。比如AB两进程同时获取锁,A getset 成功获取锁,B也进行getset 获取,B会把新的过期时间设置进去,A的过期时间返回出来。但B获取锁是失败的,却莫名其妙更新的A的过期时间,这是不合理的。

另一种极为常见的实现方案:

setnxexpire 两步操作非原子、释放锁检查和释放动作两步操作也非原子
 
首先我们分析 setnxexpire 两步操作非原子带来的影响,如果先 setnx 成功,如果此时因为某种原因,当前进程挂了,则无法 expire,最终导致死锁。这其实问题还是不可忽略的。
其次释放锁时讲究解铃还须系铃人,那么就需要进行锁的检查,是否是自己的。当锁检查完毕时自己加上的,那么便可以释放锁。这两步动作如果不是原子,出现了胡乱释放锁的问题前文已经讨论过。

那么考虑到以上细节,这里实现一个单机版的Redis锁实现。

@Slf4j
public class RedisLock {
   

    private RedisTemplate<String, Serializable> redisTemplate;
    private Integer sleepInterval;

    public RedisLock(RedisTemplate<String, Serializable> redisTemplate) {
   
        this(redisTemplate, 5);
    }

    public RedisLock(RedisTemplate<String, Serializable> redisTemplate, Integer sleepInterval) {
   
        this.redisTemplate = redisTemplate;
        this.sleepInterval = sleepInterval;
    }

    /**
     * 阻塞获取锁
     */
    public void lock(String key, String lockId, long expireTime, TimeUnit timeUnit) {
   
        for (;;) {
   
            if (tryLock(key, lockId, expireTime, timeUnit)) {
   
                return;
            }
            try {
   
                TimeUnit.MILLISECONDS.sleep(sleepInterval);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
        }
    }

    /**
     * 快速尝试获取锁
     * set nx ex 整体原子性
     * @param key 锁key
     * @param lockId 锁value(区分锁的id号)
     * @param expireTime 锁的过期时间
     * @param timeUnit 时间单位
     * @return
     */
    public boolean tryLock(String key, String lockId, long expireTime, TimeUnit timeUnit) {
   
        Boolean res = redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(), lockId.getBytes(), Expiration.from(expireTime, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
        if (Boolean.TRUE.equals(res)) {
   
            log.info("Thread {} get lock success, key:{} lockId:{} expire:{} timeUnit:{}.", Thread.currentThread(), key, lockId, expireTime, timeUnit);
            return true;
        }
        return false;
    }

    /**
     * 超时获取锁
     * @param key 锁key
     * @param lockId 锁value(区分锁的id号)
     * @param expireTime 锁的过期时间
     * @param time 获取锁超时时间
     * @param unit 时间单位
     * @return
     * @throws InterruptedException
     */
    public boolean tryLock(String key, String lockId, long expireTime, long time, TimeUnit unit) {
   
        long timeout = System.currentTimeMillis() + unit.toMillis(time);
        while (System.currentTimeMillis() <= timeout) {
   
            if (tryLock(key, lockId, expireTime, unit)) {
   
                return true;
            }
            try {
   
                TimeUnit.MILLISECONDS.sleep(sleepInterval);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
        }
        return false;
    }

    /**
     * 释放锁
     * 如果锁还未释放,则释放自己的锁(原子操作,且保证了解铃还须系铃人)
     * @param key 锁的key
     * @param lockId 锁id
     * @return
     */
    public boolean unlock(String key, String lockId) {
   
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Long res = redisTemplate.execute((RedisCallback<Long>) connection -> connection.eval(script.getBytes(), ReturnType.INTEGER, 1, key.getBytes(), lockId.getBytes()));
        if (Long.valueOf(1).equals(res)) {
   
            log.info("Thread {} release lock success, key:{} lockId:{}.", Thread.currentThread(), key, lockId)
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值