Skip to content
DAILY QUOTE

“ ”

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题。

注意项:这个互斥锁不能使用synchronized和Lock这样的本地单机锁,如果是分布式服务集群就会锁不住,如果要解决这个问题,需要使用分布式锁(后面会具体讲)。并且这两个单机锁加锁了只会阻塞等待,无法进行后面的休眠重试逻辑。互斥锁要求当有一个线程能获取到锁,其他线程都获取失败,因此这里可以使用Redis中String类型的setnx操作实现互斥锁

提示:setnx的功能是如果不存在这个key,则可以set,如果存在了这个key,则无法set。 在StringRedisTemplate中一般使用setIfAbsent()方法,相当于setnx命令,并且在setIfAbsent()中设置有效时间,底层是set key value [EX seconds] [PX milliseconds] [NX|XX],保证命令的原子性。

使用setnx命令模拟加锁,即使是分布式服务的多线程环境下,由于Redis是单线程的,也只会允许一个线程去执行setnx加锁操作,所以不用担心多个线程同时setnx。而且在设置完锁后为了防止意外情况不释放锁,一般我们还会在setnx的时候加一个TTL有效期,避免锁无法释放产生死锁。这就是利用互斥锁保证只有一个线程去执行操作数据库,防止高并发环境下多个线程同时访问失效热点key造成缓存击穿。

  • 使用互斥锁改造queryById()方法,解决缓存击穿
java
/**
 * 根据id查询数据。
 *
 * 作用:
 * 1.互斥锁解决缓存击穿;
 *
 * @param id业务id
 * @return处理结果
 */
@Override
public Result queryById(Long id) {
    //互斥锁解决缓存击穿
    Shop shop = queryWithMute(id);

    if(shop == null){
        return Result.fail("店铺不存在");
    }
    return Result.ok(shop);
}

//封装互斥锁方案
/**
 * 使用互斥锁解决缓存击穿查询。
 *
 * 作用:
 * 1.从Redis查询商铺缓存;
 * 2.判断是否存在;
 * 3.存在,直接返回;
 * 4.判断命中的是否是空值;
 * 5.返回一个错误信息;
 *
 * @param id业务id
 * @return处理结果
 */
public Shop queryWithMute(Long id) {
    String key= CACHE_SHOP_KEY+id;
    //1.从Redis查询商铺缓存
    String shopJson=stringRedisTemplate.opsForValue().get(key);
    //2.判断是否存在
    if(StrUtil.isNotBlank(shopJson)){
        //3.存在,直接返回
        Shop shop= JSONUtil.toBean(shopJson,Shop.class);
        return shop;
    }
    //判断命中的是否是空值
    if(shopJson!=null){
        //返回一个错误信息
        return null;
    }
    //4.实现缓存重建
    //4.1.获取互斥锁
    String lockKey="lock:shop:"+id;
    Shop shop= null;
    try {
        boolean isLock=tryLock(lockKey);
        //4.2.判断是否获取成功
        if (!isLock) {
            //4.3。失败,则休眠并重试
            Thread.sleep(50); //可读性差,不如TimeUtil
            return queryWithMute(id);
        }
        //4.4。成功,根据id查数据库
        shop = getById(id);

        //模拟重建的延迟
        Thread.sleep(200);
        //5.不存在,返回错误
        if(shop==null){
            //将空值写入Redis
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;
        }
        //6.存在,写入Redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL,TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        //7.释放互斥锁
        unlock(lockKey);  //确保即使遇到异常也能释放锁
    }
    //8.返回
    return shop;
}

/**
 * 尝试获取Redis互斥锁。
 *
 * @param key Redis中的key
 * @return处理结果
 */
private boolean tryLock(String key){
    Boolean flag=stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

/**
 * 释放Redis互斥锁。
 *
 * @param key Redis中的key
 * @return无返回值
 */
private void unlock(String key){
    stringRedisTemplate.delete(key);
}

测试: 使用JMeter模拟多机同时请求,设置5秒内发送1000个请求,预计QPS将达到200左右。 配置请求接口参数

绿色表示线程成功执行

吞压测结果显示,吞吐量为207.3/sec,由于这个请求业务中不涉及事务,即多个业务表的操作,所以这里的吞吐量Throughput可以理解为QPS,也就是每秒钟发送200个请求,每秒钟处理207个请求。

TPS(Transactions Per Second):每秒传输的事物处理个数。即服务器每秒处理的事务数。 QPS(Queries Per Second):每秒查询率。即服务器每秒能够处理的查询请求次数。 那么我们对于一个页面做一次访问,就会形成一个TPS;但一次页面访问,可能产生多次对服务器的请求,服务器对这些请求,计为QPS。