消息对了(Message Queue):字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取并处理消息

核心优点:解耦、异步、削峰。
常见的消息队列:RabbitMQ、RocketMQ、Kaflka、ActiveMQ等、我们也可以直接使用Redis提供的MQ方案、降低我们的部署和学习成本。
Redis提供了三种不同的方式来实现消息队列
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
(1)基于List结构模拟消息队列
Redis的list数据结构是一个双向链表,很容易模拟队列效果。
队列是先进先出的管道,我们可以利用LPUSH和RPOP,或者RPUSH和LPOP实现。如果队列中没有消息时,RPOP和LPOP会返回null,并不会像JVM的阻塞队列一样阻塞并等待消息,所以应该使用BRPOP和BLPOP来实现阻塞效果。

- 生产消息:
BRPUSH key value [value ...]将一个或多个元素推入指定列表头部,如果列表不存在,BRPUSH命令会自动创建新列表 - 消费消息:
BRPOP key [key ...] timeout从指定的一个或多个列表中弹出最后一个元素,如果列表为空,BRPOP会使客户端阻塞,直到有数据可用或者超过指定时间。
案例:



基于List的消息队列优缺点:
- 优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 满足消息有序性
- 缺点:
- 无法避免消息丢失
- 只支持单消费者
(2)基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的点对点消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者相对应channel发送消息后,所有订阅者都能收到相关消息。

- 生产消息
#向指定频道发送一条消息
PUBLIST channel message- 消费消息
#订阅一个或多个频道
SUBSCRIBE channel [channel ...]
#取消订阅一个或多个频道
UNSUBSCRIBE channel [channel ...]
#订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern [pattern ...]
#取消订阅与pattern格式匹配的所有频道
PUNSUBSCRIBE pattern [pattern ...]案例:





停止订阅 消费者停止订阅,消息丢失

基于PubSub的消息队列优缺点:
- 优点:
- 采用发布订阅模型,支持多生产、多消费
- 缺点:
- 不支持数据持久化
- 无法避免消息丢失(发送到channel无消费者订阅、消息直接丢失、数据无法保证安全)
- 无法避免消息丢失(无消费者订阅,消息直接丢失)
- 消息堆积有上限,超出时数据丢失(发出消息如果有消费者监听,消息就会缓存到消费者缓存,如果消费者处理消息耗时较久,新接受到的消息就会堆积,超出缓存上限丢失,可靠性低)
(3)基于Stream的消息队列
Stream时Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列
① Stream的单消费模式
- 生产消息

#向指定的Stream中添加一个消息
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] THRESHOLD [LIMIT count]] *|ID
field value [field value ...]
#最简用法:XADD添加消息的队列名称 消息id 消息Entry
XADD key *|ID field value [field value ...]
#例如:创建名为users的队列,并向其中发送一个消息,内容是:{NAME=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1777040914882-0"- 消费消息

#XREAD COUNT 读取消息数量 BLOCK 阻塞时常 STREAMS 要读取的阻塞队列名称 ID 起始id
#例如:使用XREAD读取第一条消息
XREAD COUNT 1STREAMS users 0
#XREAD阻塞方式,读取最新的消息(阻塞1秒种后读取最新消息)
XREAD COUNT 1 BLOCK 1000 STREAMS users $在开发业务种,我们可以循环调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

当我们指定起始ID为
$时,代表读取最后一条消息(读取最新的消息),ID为0时代表读最开始的一条消息(读取最旧的消息)。如果我们处理消息过程中,又有超过1条以上消息到达队列,则下次获取也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的单消费模式(XREAD命令)优缺点:
- 优点:
- 消息可回溯(重复读取)
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 缺点:
- 有消息漏读的风险
Stream的消费者组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点 
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理速度
- 消息标示:消费者组维护一个标识,记录最后一个被处理的消息,哪怕消费者组当即重启,还是会从标识之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于
pending(待处理)状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。


消费者监听消息的基本思路,伪代码如下:

STREAM类型消息队列的XREADGROUP命令特点
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
三种Redis消息队列实现对比

(4)Stream消息队列优化异步秒杀

1.创建一个Stream类型消息队列,名为stream.orders
- 方法一:在Redis-cli中直接使用命令创建
#创建队列(消费者组模式)MKSTREAM:当创建消费者组时,若队列不存在,将自动创建队列和消费者组
XGROUP CREATE stream.orders g1 0 MKSTREAM- 方法二:在Java代码中创建
// Stream消息队列相关属性
private static final String GROUP_NAME = "g1"; // 消费者组 groupName
private static final String CONSUMER_NAME = "c1"; // 消费者名称 consumer,该项后期可以在yaml中配置多个消费者,并实现消费者组多消费模式
private static final String QUEUE_NAME = "stream.orders"; // 消息队列名称 key
/**
* 初始化异步处理任务。
*
* 作用:
* 1.创建消息队列;
* 2.启动线程池,执行任务;
*
* @return无返回值
*/
@PostConstruct // 在类初始化时执行该方法
private void init() {
// 创建消息队列
if (Boolean.FALSE.equals(stringRedisTemplate.hasKey(QUEUE_NAME))) {
stringRedisTemplate.opsForStream().createGroup(QUEUE_NAME, ReadOffset.from("0"), GROUP_NAME);
log.debug("Stream队列创建成功");
}
// 启动线程池,执行任务
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}2.修改之前的秒杀下单Lua脚本,再确认有抢购资格后,直接向stream.orders添加消息,内容包含voucherId,userId,orderId
- 修改lua脚本,加入xadd发送消息命令,向名为queueName的Stream消息队列发送下单消息
-- 参数列表
local voucherId = ARGV[1] -- 优惠券id(用于判断库存是否充足)
local userId = ARGV[2] -- 用户id(用于判断用户是否下过单)
local orderId = ARGV[3] -- 订单id
local queueName = ARGV[4] -- Stream消息队列名称
-- 构造缓存数据key
local stockKey = 'hmdp:seckill:stock:' .. voucherId -- 库存key
local orderKey = 'hmdp:seckill:order:' .. voucherId -- 订单key
-- 脚本业务
-- 判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
-- 库存不足,返回1
return 1
end
-- 判断用户是否下过单 SISMEMBER orderKey userId,SISMEMBER:判断Set集合中是否存在某个元素,存在返回1,不存在放回0
if redis.call('sismember', orderKey, userId) == 1 then
-- 存在,说明用户已经下单,返回2
return 2
end
-- 缓存中预先扣减库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 发送订单消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', queueName, '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
-- 有下单资格,允许下单,返回0
return 0- 在Java代码中修改lua脚本调用,将阻塞队列添加消息改为lua脚本操作stream发送消息
private IVoucherOrderService proxy;
/**
* 处理优惠券秒杀下单请求。
*
* 作用:
* 1.获取用户id;
* 2.获取订单id;
* 3.执行lua脚本;
* 4.判断结果是否为0;
* 5.不为0,代表没有购买资格;
*
* @param voucherId优惠券id
* @return处理结果
*/
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户id
Long userId=UserHolder.getUser().getId();
//获取订单id
long orderId=redisIdWorker.nextId("order");
//1.执行lua脚本
Long result=stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),userId.toString(),String.valueOf(orderId)
);
//2.判断结果是否为0
int r =result.intValue();
if(r!=0){
//2.1.不为0,代表没有购买资格
return Result.fail(r==1?"库存不足":"不能重复下单");
}
//3.为0,有购买资格,lua脚本已经将订单相关消息发送到消息队列,待消费者读取
proxy= (IVoucherOrderService) AopContext.currentProxy(); //获取当前对象的事务代理对象
//4.返回订单id
return Result.ok(orderId);3.项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
/**
* 初始化异步处理任务。
*
* 作用:
* 1.启动线程池,执行任务;
*
* @return无返回值
*/
@PostConstruct
public void init(){
//启动线程池,执行任务
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
String queueName="stream.orders";
/**
* 循环处理异步任务。
*
* 作用:
* 1.从Stream消息队列读取订单消息;
* 2.判断消息获取是否成功;
* 3.如果获取失败,说明没有消息,继续下一次循环;
* 4.解析消息中的订单信息;
* 5.如果获取成功,可以下单;
*
* @return无返回值
*/
public void run() {
while(true){
try {
//1.从Stream消息队列读取订单消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息获取是否成功
if(list==null||list.isEmpty()){
//2.1.如果获取失败,说明没有消息,继续下一次循环
continue;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values,new VoucherOrder(),true);
//4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//5.处理成功后ACK确认消息
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
try {
handlePendingList();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
//处理pending-list中的异常订单信息
/**
* 处理pending-list中的异常订单。
*
* 作用:
* 1.从pending-list读取异常订单消息;
* 2.判断消息获取是否成功;
* 3.如果获取失败,说明pending-list没有异常消息,结束循环;
* 4.如果获取成功,可以下单;
* 5.处理成功后ACK确认消息;
*
* @return无返回值
*/
private void handlePendingList() throws InterruptedException {
while(true){
try {
//1.从pending-list读取异常订单消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"), //g1消费者组名,c1消费者名
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), //count每次最多拉取1条消息,block最多阻塞2秒钟
StreamOffset.create(queueName, ReadOffset.from("0")) //读取待确认列表里的异常消息
);
//2.判断消息获取是否成功
if(list==null||list.isEmpty()){
//2.1.如果获取失败,说明pending-list没有异常消息,结束循环
break;
}
//3.解析消息中的订单信息 //MapRecord<消息id,消息key,消息value>
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> values = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values,new VoucherOrder(),true);
//4.如果获取成功,可以下单
handleVoucherOrder(voucherOrder);
//5.处理成功后ACK确认消息
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理pending-list异常",e);
try{
// 防止处理频繁,下次循环休眠20毫秒
Thread.sleep(20);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
}- 测试下单 恢复数据库和Redis,测试单人下单,正常下单

再进行压力性能测试,与之前的阻塞队列的吞吐量差不多,但是Stream更加灵活和可靠。
