Skip to content
DAILY QUOTE

“ ”

经过对我们自定义实现的Redis分布式锁的优化后,已经达到生产可用级别了,但是还不够完善,比如:

  • 不可重入:同一线程不能重复获取同一把锁。比如,方法A中调用方法B,在方法A中执行业务并获取锁,方法B需要获取同一把锁,如果锁是不可重入的,在方法A获取了锁,方法B无法再次获取这把锁,方法B此时就会等待方法A的锁释放,而方法A还没有执行完,因为还在调方法B,导致死锁。这种场景下要求锁是可重入的,可重入锁的意义在于防止死锁,我们的synchronized和Lock锁都是可重入的。
  • 不可重试:我们之前实现的锁是非阻塞式,获取锁只尝试一次就返回false,没有重试机制。有些业务场景在获取锁失败后,需要等待一小段时间,再次进行重试的(阻塞式、可重试的)。
  • 超时释放:虽然我们之前利用判断锁标识+Lua脚本解决了因为锁超时释放导致的误删问题,但是还是存在超时释放的时间问题。
    • 如果业务执行耗时过长,期间锁就释放了,这样存在安全隐患。
    • 如果锁的有效期过短,容易出现业务没执行完就被释放,这样存在并发安全问题。
    • 如果锁的有效期过长,容易出现锁的阻塞周期过长问题。
  • 主从一致性问题:如果Redis提供了主从集群(相当于读写分离,写操作访问主节点,读操作访问从节点),主节点需要把自己的数据同步给从节点,保证主从数据一致,如果主节点宕机,还可以选择一个从节点成为新的主节点。但是主从同步之间存在延迟,比如在极端情况下,线程在主节点获取了锁,写操作在主节点完成后尚未同步给从节点时,主节点宕机,此时会选一个新的从作为主,而从节点没有完成数据同步,没有锁的标识,此时多个从节点就会获取到锁,存在安全隐患。 我们如果想要更进一步优化分布式锁,当然是可以的,但是没必要,我们完全可以直接使用已经造好的轮子,比如:Redssion

(1)介绍

官方定义:Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。 简单来讲Redisson是一个在Redis的基础上实现的分布式工具的集合。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。 Redission提供了分布式锁的多种多样的功能

(2)Redisson实现分布式锁

  • 引入Redisson依赖
xml
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

此外还有一种引入方式,直接引入Redission的starter依赖,然后在yam|文件中配置Redisson,但是不推荐这种方式,因为他会替换掉Spring官方提供的这套对Redisson的配置。所以我们采用@Bean手动配置。

  • RedissonConfig:配置Redisson客户端
java
@Configuration
public class RedissonConfig {
    /**
     * 执行RedissonClient相关业务逻辑。
     *
     * 作用:
     * 1.配置;
     * 2.创建RedissonClient对象;
     *
     * @return处理结果
     */
    @Bean
    public RedissonClient redissonClient() {
        //配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://ip:36379").setPassword("123321");
        //创建RedissonClient对象
        return Redisson.create(config);
    }
}
  • VoucherOrderServiceImpl:只需要修改使用锁的地方,将我们自己实现的分布式锁SimpleRedisLock替换成Redisson,其它的业务代码无需修改
java
@Resource
private RedissonClient redissonClient;

/**
 * 处理优惠券秒杀下单请求。
 *
 * 作用:
 * 1.查询优惠卷;
 * 2.判断秒杀是否开始;
 * 3.尚未开始;
 * 4.判断秒杀是否已经结束;
 * 5.已经结束;
 *
 * @param voucherId优惠券id
 * @return处理结果
 */
@Override
public Result seckillVoucher(Long voucherId) {
    //1.查询优惠卷
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    //2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        //尚未开始
        return Result.fail("秒杀尚未开始");
    }
    //3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        //已经结束
        return Result.fail("秒杀已结束");
    }
    //4.判断库存是否充足
    if (voucher.getStock() < 1) {
        //库存不足
        return Result.fail("库存不足");
    }
    Long userId=UserHolder.getUser().getId();
    //创建锁对象
    //SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
    RLock lock = redissonClient.getLock("lock:order" + userId);
    //获取锁
    boolean isLock = lock.tryLock();
    //判断是否获取锁成功
    if (!isLock) {
        //获取锁失败,返回错误或重试
        return Result.fail("不允许重复下单");
    }
    try {
        //获取代理对象
        IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
    } catch (IllegalStateException e) {
        throw new RuntimeException(e);
    } finally {
        lock.unlock();
    }
}

tryLock方法介绍

  • tryLock():它会使用默认的超时时间(默认30秒)和等待机制。具体的超时时间是由Redisson配置文件或者自定义配置决定的。
  • tryLock(long time,TimeUnit unit):它会在指定的时间内尝试获取锁(等待time后重试),如果获取成功则返回true,表示获取到了锁;如果在指定时间内(Redisson内部默认指定的)未能获取到锁,则返回false。
  • tryLock(long waitTime,long leaseTime,TimeUnit unit):waitTime是获取锁失败后的重试等待时间,等待时间过了之后会接着重试,例如等待时间为一秒,那获取锁失败等一秒后再次尝试获取锁,在超时时间内反复重试,直到获取锁成功后返回true。leaseTime是锁的超时时间,如果超过leaseTime后还没有获取锁就直接返回false。

(3)Redisson分布式锁原理

  • 什么是重入锁和不可重入锁?
  1. 可重入锁:又称之为递归锁,也就是一个线程可以反复获取锁多次,一个线程获取到锁之后,内部如果还需要获取锁,可以直接再获取锁,前提是同一个对象或者class。可重入锁的最重要作用就是避免死锁的情况。
  2. 不可重入锁:又称之为自旋锁,底层是一个循环加上unsafe和cas机制,就是一直循环直到抢到锁,这个过程通过cas进行限制,如果一个线程获取到锁,cas会返回1,其它线程包括自己就不能再持有锁,需要等线程释放锁。

① 可重入锁的原理

ReentrantLock和synchronized都是可重入锁,在ReentrantLock锁种,底层借助一个volatile的state变量记录重入状态次数,第一次调用lock,计数设置为1,调用unlock解锁,计数减1,知道减为0,释放锁。在synchronized中,在C语言代码中有一个count,原理与state类似。

我们之前的自定义的分布式锁不具有可重入性的原因,是因为:重入锁的设计必须要求既记录线程标识,又要记录重入次数,而我们String数据类型的锁已经不够用了。因此,需要一个key里同时记录两个字段的情况,可以使用hash数据结构。

Redisson底层也是以hash数据结构的形式将锁存储Redis中,并且Redisson分布式锁也具有可重入性,每次获取锁,将value值加1,释放锁,value值减1,value值为0就释放锁,保证了锁的可重入性

  • 测试Redisson锁的可重入性
java
@SpringBootTest
@Slf4j
public class RedissonLockTest {
    @Resource
    private RedissonClient redissonClient;
    //创建锁对象
    private RLock lock;

    /*类的成员变量赋值操作,是和new对象绑死在一起的,它在Spring介入之前就已经发生了。*/

    /**
     * 执行setUp相关业务逻辑。
     *
     * @return无返回值
     */
    @BeforeEach
    void setUp() {
        lock = redissonClient.getLock("order");
    }

    /**
     * 执行method1相关业务逻辑。
     *
     * 作用:
     * 1.尝试获取锁;
     * 2.方法1内部调用方法2;
     *
     * @return无返回值
     */
    @Test
    void method1() {
        boolean isLock = false;
        try {
            // 尝试获取锁
            isLock = lock.tryLock();
            if (!isLock) {
                log.error("1...获取锁失败");
                return;
            }
            log.info("1...获取锁成功");
            // 方法1内部调用方法2
            log.info("1...开始执行并发业务");
            method2();
            log.info("1...继续执行剩余的并发业务");
        } finally {
            if (isLock) {
                log.warn("1...释放锁");
                lock.unlock();
            }
        }
    }

    /**
     * 执行method2相关业务逻辑。
     *
     * 作用:
     * 1.再次尝试获取锁;
     *
     * @return无返回值
     */
    void method2() {
        log.info("2...业务方法1调用执行业务方法2");
        boolean isLock = false;
        try {
            // 再次尝试获取锁
            isLock = lock.tryLock();
            if (!isLock) {
                log.error("2...获取锁失败");
                return;
            }
            log.info("2...获取锁成功");
            log.info("2...开始执行业务");
        } finally {
            if (isLock) {
                log.warn("2...释放锁");
                lock.unlock();
            }
        }
    }
}
  • 执行流程如下 第一次获取锁 步入方法2,第二次获得锁,value值加1

方法2释放锁,重入次数-1,并刷新有效期,最后方法1释放锁,value减1后重入计数器为0,由方法1释放删除锁

② Redisson源码流程原理解析

之前我们分析发现,自己实现的锁不够灵活,具有不可重入、不可重试、超时释放、主从一致性四大问题。虽然我们对自定义的Redis分布式锁进行了优化,但是不如直接使用别人造好的轮子,Redisson分布式锁不仅提供了多种锁实现,而且还解决了分布式锁不够灵活的这四大问题,下面我们通过深入剖析Redisson源码,看看底层是如何实现和解决:锁的可重入性、可重试机制、超时续约机制和主从一致性问题的。

(1)Redisson可重入锁原理

之前我们使用string类型的setnx命令可以保证获取锁的原子性,释放锁的原子性使用了lua脚本。但是我们为了保证锁的可重入性,需要使用Hash结构存储线程标识以及重入次数两个字段,而Hash类型没有这种组合命令保证原子性。所以下面分析Redisson的获取锁tryLock和释放锁unlock方法底层是如何实现这种重入锁,并保证命令原子性。

  • tryLock源码 空参调用的是Lock接口的空参方法,有参调用的是RLock接口的有参方法,不管是哪种调用,我们选择创建锁对象的实现类都是RedissonLock(Lock接口是通用标准,RLock接口是Redisson私有加强版,RedissonClient并不负责具体加锁逻辑,而是返回一个具备加锁能力的对象)

我们从空参和有参的方法同步进行分析,最后它们都会执行到同一个获取锁方法。

空参tryLock调用tryLockAsync方法内部传入的waitTime和leaseTime的默认值都是-1,调用tryAcquireOnceAsync方法作为参数。

有参tryLock经过传递调用,最后也同样会执行tryAcquireOnceAsync方法。 在tryAcquireOnceAsync方法内部,如果过期时间leaseTime > 0,说明调用trylockInnerAsync方法,该方法最终也是调用Lua脚本保证命令原子性,尝试获取锁的脚本逻辑如下。

leaseTime > 0时,执行的tryLockInnerAsync里的Lua脚本。 Lua脚本内部的具体动作如下:

  1. 判断锁是否存在
    • 如果不存在(exists key == 0):
      • 创建Hash。
      • 设置字段threadId,值设为1
      • 关键动作:给这个Key设置一个生存时间,时间就是你传进来的leaseTime
      • 返回nil(代表加锁成功)。
  2. 判断锁是否是自己的(重入判断)
    • 如果锁已存在,且hexists key threadId == 1
      • threadId对应的数值+1
      • 关键动作重新刷新这个Key的生存时间为leaseTime
      • 返回nil(代表加锁成功)。
  3. 锁被别人占了
    • 返回这个Key剩余的生存时间(TTL)。
  • unlock源码 总体逻辑:
  • 最外层(API层):调用的lock.unlock()。这只是个入口。
  • 中间层(异步处理):它会进入unlockAsync()。因为Redisson是基于Netty的,它是异步非阻塞的。
  • 逻辑层(状态检查):它要检查:你是当前线程吗?如果你不是拿锁的人,你凭什么放锁?(防止A线程释放了B线程的锁)。
  • 核心层(Lua脚本):最后跑到了RedissonLock类里去执行那段Lua脚本。

跟踪unlock接口的抽象实现类RedissonBaseLock

经过层层调用,最终释放锁的实现类还是RedissonLock

实现类RedissonLock对父类的unlockInnerAsync抽象方法进行了重写实现,可以看到,内部还是使用Lua脚本去构建和执行释放锁的逻辑。

简化版lua脚本

lua
-- 1. 判断锁是否存在
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;

-- 2. 重入计数器减 1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);

-- 3. 判断计数器是否大于 0
if (counter > 0) then
    -- 说明还有嵌套的方法没跑完,不能删锁,只更新过期时间
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    -- 4. 计数器归零,说明彻底释放,删锁并广播信号
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;

return nil;

第一步:身份校验

  • 指令hexists
  • 逻辑:脚本先去Redis的Hash结构里看:持有锁的threadId是不是当前来放锁的这个人? 第二步:重入递减
  • 指令hincrby...-1
  • 逻辑:如果是你持有的锁,那就把计数器(Counter减1。 第三步:状态判断
  • 逻辑
    • 如果减完后 > 0:说明你还处在“嵌套加锁”的内层。此时绝对不能删锁,否则外层业务还没跑完,锁就没了,会出并发事故。所以只通过pexpire刷新一下过期时间。
    • 如果减完后 == 0:说明这是最外层的方法,业务彻底搞定了。 第四步:彻底清理与通知
  • 指令delpublish
  • 逻辑
    1. 执行del:物理删除Redis中的Key。
    2. 执行publish:向特定的Channel发送一个消息,告诉那些在排队等这把锁的其他线程,保证锁被占有。

(2)Redisson的锁重试和WatchDog机制源码解析

刚刚我们分析了我们自定义的分布式锁不可重入的原因,并且剖析了Redisson的tryLock和unlock的源码,知道了它解决锁不可重入问题的实现原理是:通过hash类型的锁结构的设计,存储当前线程标识和重入计数器,来控制当前线程的再次重入和重入锁释放锁。这和JDK里的ReentrantLock重入锁的实现原理基本一致。

下面我们来看一下Redisson是如何解决不可重试、超时释放问题的。

  • 锁重试 Redisson在尝试获取锁的时候,如果传了时间参数,就不会在获取锁失败时立即返回失败,而是会进行重试。三个参数:最大重试时间,锁释放时间(默认为-1,会触发看门狗机制),时间单位。

源码分析: 在RLock接口中,找到三个参数的tryLock方法的实现,选择RedissonLock

tryLock(long waitTime, long leaseTime, TimeUnit unit)方法解读:

java
/**
 * 尝试获取Redis互斥锁。
 *
 * 作用:
 * 1.尝试获取锁,返回的ttl为null,表示获取成功;
 * 2.lock获取成功;
 * 3.获取失败,计算剩余时间;
 * 4.剩余时间小于等于零,不再重试,返回失败;
 * 5.剩余时间大于零,重新获取当前时间current,subscribe订阅拿到锁的线程;
 *
 * @param waitTime waitTime参数
 * @param leaseTime leaseTime参数
 * @param unit时间单位
 * @return处理结果
 */
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,返回的ttl为null,表示获取成功
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock获取成功
    if (ttl == null) {
        return true;
    }
    // 获取失败,计算剩余时间
    time -= System.currentTimeMillis() - current;
    // 剩余时间小于等于零,不再重试,返回失败
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    // 剩余时间大于零,重新获取当前时间current,subscribe订阅拿到锁的线程,该线程释放锁后会发布通知,其余等待的线程可以继续争抢。
    current = System.currentTimeMillis();
    // 创建一个用于订阅锁释放通知的subscribeFuture,调用subscribe()方法进行订阅
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        // 等待time毫秒时间,等待订阅结果
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        // 如果在等待期间未收到订阅结果,表示等待超时。在等待超时后,代码会尝试取消订阅任务
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
                "Unable to acquire subscription lock after " + time + "ms. " +
                        "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
            // 判断是否需要取消订阅,并调用unsubscribe()方法进行处理
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        // 如果取消成功,则代码调用acquireFailed()方法进行处理,表示当前线程获取锁失败,最终返回false
        acquireFailed(waitTime, unit, threadId);
        return false;
    } catch (ExecutionException e) {
        LOGGER.error(e.getMessage(), e);
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    try {
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        while (true) {
            long currentTime = System.currentTimeMillis();
            // 仍有等待时间,则进行获取锁重试
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock获取成功
            if (ttl == null) {
                return true;
            }
            // // 剩余时间小于等于零,不再重试,返回失败
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // waiting for message
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {	// ttl < time(等待时间),代表在等待之间锁就已经释放了
                // 信号量形式订阅等待锁释放
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {	// ttl > time(等待时间),如果等了time的时间,经过time的时间,锁还没有被释放,也就没必要等了
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            // 剩余时间小于等于零,不再重试,返回失败
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            // 若time > 0,表示时间还很充足,仍可等待,继续执行while(true)循环
        }
    } finally {
        // 取消订阅
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
}

这段代码实现了在等待获取锁的过程中对剩余时间的动态调整,确保在等待过程中可以根据实际情况调整等待时间,这里设计的巧妙之处就在于利用了PubSub消息订阅、信号量的机制,它不是无休止的这种盲等机制,也避免了不断的重试,而是检测到锁被释放才去尝试重新获取,这对CPU十分的友好。同时,使用try-finally语句块确保在获取锁过程中发生异常时可以正确地取消订阅并释放资源。

订阅的通知就是释放锁Lua脚本当中发布的通知,然后等待订阅结果,等待的时间就是time(锁的最大剩余时间);

  • 看门狗机制 线程获取锁,由于业务阻塞导致锁超时释放了该如何解决呢?这就需要使用看门狗机制了,确保我们的业务是执行完释放,而不是阻塞释放。

源码分析:

调用无参的lock.lock()时,源码会一路向下走,进入tryAcquireAsync这个方法。这里藏着看门狗的触发条件: tryAcquireAsync()方法

看门狗不是随便启动的。只有加锁成功leaseTime == -1,才会执行scheduleExpirationRenewal

当我们没有设置leaseTime的时候,也就是内部leaseTime=-1的时候,过期时间为默认internalLockLeaseTime。查看如下代码可知internalLockLeaseTime调用getLockwatchdogTimeout()赋值默认时间是30s。

回到tryAcquireAsync()方法,接着看下面的续期方法

进入scheduleExpectationRenew(long threadId)方法中查看

EXPIRATION_RENEWAL_MAP中的key我们进去看一下,发现这是一个concurrentHashMap,并且entryNameidname两部分组成,id就是当前连接的id,name就是当前锁的名称。

renewExpiration()方法主要开启一段定时任务,不断的去更新有效期,定时任务的的时间就是internalLockLeaseTime / 3,默认也就是10s后刷新有效期

java
// 到期续约方法
/**
 * 执行renewExpiration相关业务逻辑。
 *
 * 作用:
 * 1.Timeout定时任务,或者叫周期任务;
 * 2.renewExpirationAsync()执行续命操作的方法,10s之后刷新有效期;
 * 3.rescheduleitself;
 * 4.刷新周期internalLockLeaseTime/3,默认释放时间是30秒,除以3;
 *
 * @return无返回值
 */
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {	// 从map中没有获取到,直接返回
        return;
    }
    // Timeout定时任务,或者叫周期任务
    Timeout task = getServiceManager().newTimeout(new TimerTask() {	// 开启异步定时任务
        /**
         * 循环处理异步任务。
         *
         * 作用:
         * 1.renewExpirationAsync()执行续命操作的方法,10s之后刷新有效期;
         * 2.rescheduleitself;
         * 3.刷新周期internalLockLeaseTime/3,默认释放时间是30秒,除以3就是每10秒更新一次;
         *
         * @param timeout timeout参数
         * @return无返回值
         */
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync()执行续命操作的方法,10s之后刷新有效期
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    if (getServiceManager().isShuttingDown(e)) {
                        return;
                    }

                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();	// 递归调用自己,一直续期,直到锁释放
                } else {
                    cancelExpirationRenewal(null, null);
                }
            });
        }
        // 刷新周期internalLockLeaseTime / 3, 默认释放时间是30秒,除以3就是每10秒更新一次
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

因此,WatchDog机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个RedissonBaseLock.EXPIRATION_RENEWAL_MAP里面,然后每隔10秒(internalLockLeaseTime / 3)检查一下,如果客户端1还持有锁key(判断客户端是否还持有key,其实就是遍历EXPIRATION_RENEWAL_MAP里面线程id,然后根据线程id去Redis中查,如果存在就会延长key的时间),那么就会不断的延长锁key的生存时间,重置锁的超时时间。

renewExpirationAsync方法源码,其调用了Lua脚本执行续期操作的。代码如下图:

看完了刷新续约操作,我们知道了它会递归调用续约方法,一直刷新有效期,那么什么时候才不进行续命呢?当然是在释放锁的时候。我们在来看看释放锁操作。

释放锁操作,取消续期

在unlockAsync0方法中,当执行完lua脚本删除锁操作后,返回了个future,当这个future执行完成后,就执行取消续期操作。源码如下图:

跟进着看cancelExpirationRenewal(threadId)这个方法:

先从map中取出任务,先移除任务的线程Id,再取消这个任务,最后再移除entry,到这里看门狗的流程就结束了。

注意:如果服务宕机了,因为没有人再去调用renewExpiration这个方法,WatchDog机制线程也就没有了,所以等到时间之后自然就释放了。此时就不会延长key的过期时间,到了30s之后就会自动过期了,其他线程就可以获取到锁。如果调用带过期时间的lock方法,则不会启动看门狗任务去自动续期。

整体执行流程图

总结:看门狗机制是解决超时续期的问题,在获取锁成功以后,开启一个WatchDog定时任务,每隔一段时间(默认30秒)就会去重置锁的超时时间,以确保锁是在程序执行完unlock手动释放的,不会发生因为业务阻塞key超时,而导致锁自动释放的情况。只要任务在运行中,看门狗就会持续续期。

(3)Redisson的主从一致性问题解决方案(MultiLock原理)

下面我们来看一下Redisson是如何解决主从一致性问题的。 为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例

此时我们进行写命令,写在主机上,主机将数据同步给从机,但是假设主机还没有来得及把数据写入从机,此时主机宕机,哨兵发现主机宕机,并选择一个从机作为主机,但此时新的主机没有锁信息,此时所信息就丢掉了。

针对这个问题,Redisson提出了MutiLock锁,我们让每个节点地位一样,加锁逻辑是写入到每个节点上,只有所有服务器写入成功才算加锁成功,现在某个节点挂了,那么在获得锁的时候,只要有一个节点拿不到,就不算加锁成功,保证了锁的可靠性。

那么MultiLock加锁原理是什么呢?下面这幅图来说明

当我们设置多个锁时,Redisson会将多个锁添加到一个集合中,然后用while循环不断尝试拿锁,这一步是为了凑齐所有锁,有了一个锁,就要去尝试获取其他锁,但是会有一个总的加锁时间,这个时间是加锁个数* 1500ms,假如有3个锁,那就是4500ms,假设在这4500ms内,所有锁都加锁成功,那么才算是加锁成功,时间主要是了限制Redisson长时间拿不到锁导致的死锁。

  • 测试主从节点锁的一致性 1)注入三个RedissonClient
java
/**
 * 执行RedissonClient相关业务逻辑。
 *
 * 作用:
 * 1.获取Redisson配置对象;
 * 2.添加Redis地址,这里添加的是单节点地址,除了单机,Redisson还支持useC;
 * 3.获取RedisClient对象,并交给IOC进行管理;
 *
 * @return处理结果
 */
@Bean
public RedissonClient redissonClient() {
    // 获取Redisson配置对象
    Config config = new Config();
    // 添加Redis地址,这里添加的是单节点地址,除了单机,Redisson还支持useClusterServers()(集群模式)、useSentinelServers()(哨兵模式)等。不同的方法会启用不同的底层连接策略
config.useSingleServer().setAddress("redis://192.168.8.100:6379").setPassword("123321");
    // 获取RedisClient对象,并交给IOC进行管理
    return Redisson.create(config);
}

/**
 * 执行RedissonClient相关业务逻辑。
 *
 * @return处理结果
 */
@Bean
public RedissonClient2 redissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.8.100:6380");
    return Redisson.create(config);
}

/**
 * 执行RedissonClient相关业务逻辑。
 *
 * @return处理结果
 */
@Bean
public RedissonClient3 redissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.8.100:6381");
    return Redisson.create(config);
}

2)编写测试类

java
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;

private RLock lock;

/**
 * 执行setUp相关业务逻辑。
 *
 * 作用:
 * 1.创建联锁MultiLock;
 *
 * @return无返回值
 */
@BeforeEach
void setUp(){
    RLock lock1 = redissonClient.getLock("lock");
    RLock lock2 = redissonClient2.getLock("lock");
    RLock lock3 = redissonClient3.getLock("lock");
    // 创建联锁MultiLock
    redissonClient.getMultiLock(lock1, lock2, lock3);
}
  • 源码分析: 1)先来看获取锁:RLock multiLock = Redisson.getMultiLock(lock1, lock2, lock3);实际上就是拿个一个数组来存放这些锁,可以像操作一把锁一样操作一堆锁
java
/**
 * 执行getMultiLock相关业务逻辑。
 *
 * @param locks locks参数
 * @return处理结果
 */
@Override
public RLock getMultiLock(RLock... locks) {
    return new RedissonMultiLock(locks);
}

// RedissonMultiLock.java
// 存放到一个列表中 //固定顺序防止死锁
final List<RLock> locks = new ArrayList<>();

/**
 * 执行RedissonMultiLock相关业务逻辑。
 *
 * 作用:
 * 1.创建空锁没有意义,报错;
 *
 * @param locks locks参数
 * @return处理结果
 */
public RedissonMultiLock(RLock... locks) {
    if (locks.length == 0) {
	    //创建空锁没有意义,报错
        throw new IllegalArgumentException("Lock objects are not defined");
    }
    this.locks.addAll(Arrays.asList(locks));
}

2)加锁

java
// RedissonMultiLock.java
/**
 * 执行lock相关业务逻辑。
 *
 * @return无返回值
 */
@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

/**
 * 执行lockInterruptibly相关业务逻辑。
 *
 * @return无返回值
 */
@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

/**
 * 执行lockInterruptibly相关业务逻辑。
 *
 * 作用:
 * 1.、计算等待时间leaseTime=-1unit=null;
 * 2.baseWaitTime等待时间=锁的数量(3个)*1500=4500毫秒;
 * 3.、等待全部加锁成功;
 * 4.basewaitime:保底总等待时间waittime:锁持有时间leasetime;
 * 5.情况A:用户没传waitTime(leaseTime<=0代表默认配置);
 *
 * @param leaseTime leaseTime参数
 * @param unit时间单位
 * @return无返回值
 */
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 1、计算等待时间leaseTime=-1 unit=null
    // baseWaitTime等待时间 = 锁的数量(3个) * 1500 = 4500毫秒
    long baseWaitTime = locks.size() * 1500;
    // 2、等待全部加锁成功
    while (true) {
	    //basewaitime:保底总等待时间waittime:锁持有时间leasetime:单轮尝试限额
        long waitTime;
        // 情况A:用户没传waitTime (leaseTime <= 0代表默认配置)
        if (leaseTime <= 0) {
            //使用默认的时间作为总等待限额
            waitTime = baseWaitTime;
        } else {	// 自定义等待时间
            waitTime = unit.toMillis(leaseTime);
            if (waitTime <= baseWaitTime) {
                waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
            } else {
                waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
            }
        }

        if (leaseTime > 0) {
            leaseTime = unit.toMillis(leaseTime);
        }
        // 不停的去获取锁,waitTime = 4500毫秒,leaseTime = -1
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
            return;
        }
    }
}

此过程,主要分为两部分:

  1. 计算等待时间:跟锁数量相关; 3个锁,等待时间 = 3 * 1500 = 4500
  2. 尝试加全部锁:无限循环,至全部锁加成功

3)进入tryLock(),查看其源码:

java
/**
 * 尝试获取Redis互斥锁。
 *
 * 作用:
 * 1.try{;
 * 2.}catch(ExecutionExceptione){;
 * 3.thrownewIllegalStateException(e);
 * 4.};
 * 5.awaitTime=-1,在tryLock中,-1代表了如果获取锁成功了,就会启动一;
 *
 * @param waitTime waitTime参数
 * @param leaseTime leaseTime参数
 * @param unit时间单位
 * @return处理结果
 */
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//        try {
//            return tryLockAsync(waitTime, leaseTime, unit).get();
//        } catch (ExecutionException e) {
//            throw new IllegalStateException(e);
//        }
    long newLeaseTime = -1;
    if (leaseTime > 0) {	// 给锁一个初始化的有效期,并且这个时间是大于我们需要的锁的有效期的
        if (waitTime > 0) {
            newLeaseTime = unit.toMillis(waitTime)*2;
        } else {
            newLeaseTime = unit.toMillis(leaseTime);
        }
    }

    long time = System.currentTimeMillis();	// 记录加锁的过程开始时间
    long remainTime = -1;	// 加锁剩余时间
    if (waitTime > 0) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);	// 官方弃用了RedissonRedLock, 就是剩余时间

    int failedLocksLimit = failedLocksLimit();	// 官方弃用了RedissonRedLock,failedLocksLimit()返回值是0
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());	// 需要加锁的lock集合
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {	// 拿到锁的迭代器
        RLock lock = iterator.next();	// 获取其中一个锁
        boolean lockAcquired;	// 是否加锁成功
        try {
            if (waitTime <= 0 && leaseTime <= 0) {	// 如果等待时间,有效时间都没有设置就使用默认的方式去获取锁
                lockAcquired = lock.tryLock();
            } else {
                // awaitTime=-1,在tryLock中,-1代表了如果获取锁成功了,就会启动一个lock watchDog,不停的刷新锁的生存时间
                long awaitTime = Math.min(lockWaitTime, remainTime);
                // 获取锁,等待awaitTime=4500毫秒,获取锁成功,启动一个watchDog
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {	// 任意一台redis服务器响应超时了,就应该释放所有锁
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            lockAcquired = false;
        }

        if (lockAcquired) {	// 加锁成功了就添加进去
            acquiredLocks.add(lock);
        } else {
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {	// 若是成功的锁达到了设定的值就不用再去获取锁了
                break;
            }
            // 失败机制,设置为0且还加锁失败了那就直接清空所有锁,本次获取锁失败了
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime <= 0) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
        // 计算本次加锁花费的时间 ,看看是否超时
        if (remainTime > 0) {
            // 如果获取锁成功,当前时间减去获取锁耗费的时间time
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                // 如果remainTime < 0说明获取锁超时,那么就释放掉这个锁
                unlockInner(acquiredLocks);
                return false;	// 返回false,加锁失败
            }
        }
    }
    // 若是没超时说明都加锁成功了,就添加锁的过期时间
    if (leaseTime > 0) {
        acquiredLocks.stream()
                .map(l -> (RedissonBaseLock) l)
                .map(l -> l.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS))
                .forEach(f -> f.toCompletableFuture().join());
    }

    return true;
}

4)unlock()释放锁,每个锁调用自己的unlock方法

java
// 在RedissonMultiLock中释放锁,就是依次调用所有的锁的释放的逻辑,lua脚本,同步等待所有的锁释放完毕,才会返回
/**
 * 释放Redis互斥锁。
 *
 * @return无返回值
 */
@Override
public void unlock() {
    locks.forEach(Lock::unlock);
}

调用的是之前分析过的RedissonBaseLock中unlock方法,最终指向的是各自重写的unlockInnerAsync(),即异步解锁方法。 1)不可重入Redis分布式锁:

  • 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
  • 缺陷:不可重入、无法重试、锁超时失效 2)可重入的Redis分布式锁:
  • 原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
  • 缺陷:Redis宕机引起锁失效问题 3)Redisson的multiLock:
  • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
  • 缺陷:运维成本高、实现复杂