(1)Redis分布式锁超时误删问题
上一节,我们实现了一个简单的分布式锁,但是会存在一个问题:

线程1获取到锁,但由于业务阻塞,锁的过期时间超过了阻塞时间,锁自动释放,此时线程2尝试获取锁,线程2拿到锁后,线程1继续执行直到业务完成,准备释放锁,此时删除了本属于线程2的锁,这就误删了其他线程锁。
解决方案:
- 在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则进行锁的删除;如果不属于自己,则不进行释放锁逻辑。我们之前是使用当前获取锁的线程id作为锁的标识,但是在多个JVM内部,因为线程id是递增分配的,可能会出现线程id重复的情况。因此我们在线程id前面添加一个UUID,用于区分不同的JVM,而线程id用于区分同一个JVM内部的不同请求。这样就保证了分布式锁的标识唯一。
(2)解决Redis分布式锁超时误删问题

- SimpleRedisLock
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
/**
* 执行SimpleRedisLock相关业务逻辑。
*
* @param name名称
* @param stringRedisTemplate stringRedisTemplate参数
* @return处理结果
*/
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = "lock:";
// 锁value = "UUID-ThreadId",ID_PREFIX用于区分不同JVM,线程唯一标识用于区分不同服务
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
/**
* 尝试获取Redis互斥锁。
*
* 作用:
* 1.获取线程标识;
* 2.获取锁;
*
* @param timeoutSec timeoutSec参数
* @return处理结果
*/
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId=ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean success=stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放Redis互斥锁。
*
* 作用:
* 1.获取线程标识;
* 2.获取锁中的标识;
* 3.判断标识是否一致;
* 4.释放锁;
*
* @return无返回值
*/
public void unlock() {
//获取线程标识
String threadId=ID_PREFIX+Thread.currentThread().getId();
//获取锁中的标识
String id=stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);
//判断标识是否一致
if(threadId.equals(id)){
//释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
}(3)Redis分布式锁释放锁原子性问题
在上一节中,我们通过给锁添加一个线程唯一标识,并且在释放锁时添加一个判断,从而防止锁超时自动释放的问题,但是仍然存在超卖问题:

当线程1获取到锁执行完业务,判断完当前的锁是自己的锁并准备释放,由于JVM垃圾回收机制导致阻塞,切好阻塞期间锁超时释放。线程2获得锁执行业务,但此时线程1阻塞完成,由于已经判断过锁标识,确认是自己的锁了,于是直接删除锁。由于锁是线程2的,没有了锁,线程3再来就会发生超卖。
- 原因分析 因为判断锁标识和释放锁的这两个操作不是真的原子性,而是在java代码中判断的,在这两个操作之前虽然没有任何Java代码,但是由于JVM中的垃圾回收机制Full GC的存在,就有可能出现阻塞问题
所以为了解决这个问题,必须要保证判断锁标识和释放锁这两个动作是一个原子性操作。因此我们需要使用Lua脚本。
(4)解决Redis分布式锁释放锁原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,基本语法参考菜鸟教程:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,语法如下:
#执行Redis命令
redis.call('命令名称','key','其他参数',...)例如,我们要执行set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set','name','jack')例如,我们要先执行set name jack,再执行get name,则脚本如下:
#先执行set name jack
redis.call('set','name','jack')
#再执行get name
local name=redis.call('get','name')
#返回
return name写好脚本以后,需要用Redis命令来调用脚本,调用脚本的命令用法如下:
例如,我们要执行Redis.call('set', 'name', 'jack')这个脚本,语法如下:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数: 
- 优化分布式锁

释放锁的业务流程是这样的:
- 获取锁中的线程标示
- 判断是否与指定的标示(当前线程标示)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
- 编写释放锁的Lua脚本:unlock.lua
if(redis.call('get',KEYS[1])==ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0- SimpleRedisLock
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
/**
* 执行SimpleRedisLock相关业务逻辑。
*
* @param name名称
* @param stringRedisTemplate stringRedisTemplate参数
* @return处理结果
*/
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX = "lock:";
// 锁value = "UUID-ThreadId",ID_PREFIX用于区分不同JVM,线程唯一标识用于区分不同服务
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
// RedisScript接口实现类
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
//伴随着类的加载而执行,且只执行一次
static {
//装载Lua脚本的“容器”对象
UNLOCK_SCRIPT = new DefaultRedisScript<>();
//去当前项目的classpath(通常也就是resources文件夹)的根目录下找文件
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
//告诉Spring框架,等Redis执行完这段Lua脚本后,把Redis返回的结果转换成Java里的什么数据类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
/**
* 尝试获取Redis互斥锁。
*
* 作用:
* 1.获取线程标识;
* 2.获取锁;
*
* @param timeoutSec timeoutSec参数
* @return处理结果
*/
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId=ID_PREFIX+Thread.currentThread().getId();
//获取锁
Boolean success=stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放Redis互斥锁。
*
* 作用:
* 1.调用lua脚本;
*
* @return无返回值
*/
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());
}
}- 测试 同时启动两个tomcat服务,postman模拟同一用户同时重复下单两次请求 线程1先获得锁
Redis种的线程标识,此时删除线程1的锁,模拟锁超时释放 
由于线程1的锁已经被超时释放,线程2现在也可以获取到锁
线程2锁线程标识
此时线程1执行完业务,尝试释放锁,由于现在锁是线程2的,线程1判断线程标识不同后,不会进行锁误删。线程2仍然可以在锁内正常执行业务。
当线程2执行完业务并释放锁后,锁才会正常被删除,并且通过Lua脚本保证了判断锁标识和删除锁两个操作的原子性,不再受到JVM垃圾回收机制的干扰,进一步避免了锁超时后的误删问题。
现在我们的分布式锁满足了以下特性:
- 多线程可见,将锁放到Redis中,相当于全局的锁监视器,所有的JVM都可以同时看到
- 互斥,set ex nx指令互斥
- 高可用,层层优化,即使是特别极端的情况下照样可以防止超卖,后期Redis还可以扩展搭建主从集群
- 高性能,Redis的IO速度很快,Lua脚本的性能也很快
- 安全性,通过给锁设置当前线程标识+Lua封装Redis指令,充分保障了线程安全,同时采用超时释放避免死锁 基于Redis的分布式锁实现思路:
- 利用set ex nx获取锁,并设置过期时间,保存线程标识
- 释放锁时先判断线程标识是否与自己一致,一致则删除锁