Skip to content
DAILY QUOTE

“ ”

消息对了(Message Queue):字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)

  • 生产者:发送消息到消息队列

  • 消费者:从消息队列获取并处理消息

  • 核心优点:解耦、异步、削峰。

  • 常见的消息队列:RabbitMQ、RocketMQ、Kaflka、ActiveMQ等、我们也可以直接使用Redis提供的MQ方案、降低我们的部署和学习成本。

Redis提供了三种不同的方式来实现消息队列

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

(1)基于List结构模拟消息队列

Redis的list数据结构是一个双向链表,很容易模拟队列效果。

队列是先进先出的管道,我们可以利用LPUSH和RPOP,或者RPUSH和LPOP实现。如果队列中没有消息时,RPOP和LPOP会返回null,并不会像JVM的阻塞队列一样阻塞并等待消息,所以应该使用BRPOP和BLPOP来实现阻塞效果。

  • 生产消息:BRPUSH key value [value ...]将一个或多个元素推入指定列表头部,如果列表不存在,BRPUSH命令会自动创建新列表
  • 消费消息:BRPOP key [key ...] timeout从指定的一个或多个列表中弹出最后一个元素,如果列表为空,BRPOP会使客户端阻塞,直到有数据可用或者超过指定时间。

案例:

基于List的消息队列优缺点:

  • 优点:
    • 利用Redis存储,不受限于JVM内存上限
    • 基于Redis的持久化机制,数据安全性有保证
    • 满足消息有序性
  • 缺点:
    • 无法避免消息丢失
    • 只支持单消费者

(2)基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的点对点消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者相对应channel发送消息后,所有订阅者都能收到相关消息。

  • 生产消息
shell
#向指定频道发送一条消息
PUBLIST channel message
  • 消费消息
shell
#订阅一个或多个频道
SUBSCRIBE channel [channel ...]
#取消订阅一个或多个频道
UNSUBSCRIBE channel [channel ...]
#订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern [pattern ...]
#取消订阅与pattern格式匹配的所有频道
PUNSUBSCRIBE pattern [pattern ...]

案例:

停止订阅 消费者停止订阅,消息丢失

基于PubSub的消息队列优缺点:

  • 优点:
    • 采用发布订阅模型,支持多生产、多消费
  • 缺点:
    • 不支持数据持久化
    • 无法避免消息丢失(发送到channel无消费者订阅、消息直接丢失、数据无法保证安全)
    • 无法避免消息丢失(无消费者订阅,消息直接丢失)
    • 消息堆积有上限,超出时数据丢失(发出消息如果有消费者监听,消息就会缓存到消费者缓存,如果消费者处理消息耗时较久,新接受到的消息就会堆积,超出缓存上限丢失,可靠性低)

(3)基于Stream的消息队列

Stream时Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列

① Stream的单消费模式

  • 生产消息

shell
#向指定的Stream中添加一个消息
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] THRESHOLD [LIMIT count]] *|ID
field value [field value ...]
#最简用法:XADD添加消息的队列名称 消息id 消息Entry
XADD key *|ID field value [field value ...]

#例如:创建名为users的队列,并向其中发送一个消息,内容是:{NAME=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1777040914882-0"
  • 消费消息

shell
#XREAD COUNT 读取消息数量 BLOCK 阻塞时常 STREAMS 要读取的阻塞队列名称 ID 起始id

#例如:使用XREAD读取第一条消息
XREAD COUNT 1STREAMS users 0
#XREAD阻塞方式,读取最新的消息(阻塞1秒种后读取最新消息)
XREAD COUNT 1 BLOCK 1000 STREAMS users $

在开发业务种,我们可以循环调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

当我们指定起始ID为$时,代表读取最后一条消息(读取最新的消息),ID为0时代表读最开始的一条消息(读取最旧的消息)。如果我们处理消息过程中,又有超过1条以上消息到达队列,则下次获取也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的单消费模式(XREAD命令)优缺点:

  • 优点:
    • 消息可回溯(重复读取)
    • 一个消息可以被多个消费者读取
    • 可以阻塞读取
  • 缺点:
    • 有消息漏读的风险

Stream的消费者组模式

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理速度
  • 消息标示:消费者组维护一个标识,记录最后一个被处理的消息,哪怕消费者组当即重启,还是会从标识之后读取消息。确保每一个消息都会被消费
  • 消息确认:消费者获取消息后,消息处于pending(待处理)状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

消费者监听消息的基本思路,伪代码如下:

STREAM类型消息队列的XREADGROUP命令特点

  • 消息可回溯

  • 可以多消费者争抢消息,加快消费速度

  • 可以阻塞读取

  • 没有消息漏读的风险

  • 有消息确认机制,保证消息至少被消费一次

  • 三种Redis消息队列实现对比

(4)Stream消息队列优化异步秒杀

1.创建一个Stream类型消息队列,名为stream.orders

  • 方法一:在Redis-cli中直接使用命令创建
shell
#创建队列(消费者组模式)MKSTREAM:当创建消费者组时,若队列不存在,将自动创建队列和消费者组
XGROUP CREATE stream.orders g1 0 MKSTREAM
  • 方法二:在Java代码中创建
java
// Stream消息队列相关属性
private static final String GROUP_NAME = "g1";    // 消费者组 groupName
private static final String CONSUMER_NAME = "c1";    // 消费者名称 consumer,该项后期可以在yaml中配置多个消费者,并实现消费者组多消费模式
private static final String QUEUE_NAME = "stream.orders";    // 消息队列名称 key

/**
 * 初始化异步处理任务。
 *
 * 作用:
 * 1.创建消息队列;
 * 2.启动线程池,执行任务;
 *
 * @return无返回值
 */
@PostConstruct  // 在类初始化时执行该方法
private void init() {
    // 创建消息队列
    if (Boolean.FALSE.equals(stringRedisTemplate.hasKey(QUEUE_NAME))) {
        stringRedisTemplate.opsForStream().createGroup(QUEUE_NAME, ReadOffset.from("0"), GROUP_NAME);
        log.debug("Stream队列创建成功");
    }
    // 启动线程池,执行任务
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

2.修改之前的秒杀下单Lua脚本,再确认有抢购资格后,直接向stream.orders添加消息,内容包含voucherId,userId,orderId

  • 修改lua脚本,加入xadd发送消息命令,向名为queueName的Stream消息队列发送下单消息
lua
-- 参数列表
local voucherId = ARGV[1]   -- 优惠券id(用于判断库存是否充足)
local userId = ARGV[2]  -- 用户id(用于判断用户是否下过单)
local orderId = ARGV[3]  -- 订单id
local queueName = ARGV[4]   -- Stream消息队列名称
-- 构造缓存数据key
local stockKey = 'hmdp:seckill:stock:' .. voucherId -- 库存key
local orderKey = 'hmdp:seckill:order:' .. voucherId -- 订单key
-- 脚本业务
-- 判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
    -- 库存不足,返回1
    return 1
end
-- 判断用户是否下过单 SISMEMBER orderKey userId,SISMEMBER:判断Set集合中是否存在某个元素,存在返回1,不存在放回0
if redis.call('sismember', orderKey, userId) == 1 then
    -- 存在,说明用户已经下单,返回2
    return 2
end
-- 缓存中预先扣减库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 下单(保存用户) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 发送订单消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', queueName, '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
-- 有下单资格,允许下单,返回0
return 0
  • 在Java代码中修改lua脚本调用,将阻塞队列添加消息改为lua脚本操作stream发送消息
java
private IVoucherOrderService proxy;
/**
 * 处理优惠券秒杀下单请求。
 *
 * 作用:
 * 1.获取用户id;
 * 2.获取订单id;
 * 3.执行lua脚本;
 * 4.判断结果是否为0;
 * 5.不为0,代表没有购买资格;
 *
 * @param voucherId优惠券id
 * @return处理结果
 */
@Override
public Result seckillVoucher(Long voucherId) {
    //获取用户id
    Long userId=UserHolder.getUser().getId();
    //获取订单id
    long orderId=redisIdWorker.nextId("order");

    //1.执行lua脚本
    Long result=stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(),userId.toString(),String.valueOf(orderId)
    );
    //2.判断结果是否为0
    int r =result.intValue();
    if(r!=0){
        //2.1.不为0,代表没有购买资格
        return Result.fail(r==1?"库存不足":"不能重复下单");
    }
    //3.为0,有购买资格,lua脚本已经将订单相关消息发送到消息队列,待消费者读取
    proxy= (IVoucherOrderService) AopContext.currentProxy(); //获取当前对象的事务代理对象
    //4.返回订单id
    return Result.ok(orderId);

3.项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

java
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

/**
 * 初始化异步处理任务。
 *
 * 作用:
 * 1.启动线程池,执行任务;
 *
 * @return无返回值
 */
@PostConstruct
public void init(){

    //启动线程池,执行任务
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
    String queueName="stream.orders";
    /**
     * 循环处理异步任务。
     *
     * 作用:
     * 1.从Stream消息队列读取订单消息;
     * 2.判断消息获取是否成功;
     * 3.如果获取失败,说明没有消息,继续下一次循环;
     * 4.解析消息中的订单信息;
     * 5.如果获取成功,可以下单;
     *
     * @return无返回值
     */
    public void run() {
        while(true){
            try {
                //1.从Stream消息队列读取订单消息
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                //2.判断消息获取是否成功
                if(list==null||list.isEmpty()){
                    //2.1.如果获取失败,说明没有消息,继续下一次循环
                    continue;
                }
                //3.解析消息中的订单信息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values,new VoucherOrder(),true);
                //4.如果获取成功,可以下单
                handleVoucherOrder(voucherOrder);
                //5.处理成功后ACK确认消息
                stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
            } catch (Exception e) {
                log.error("处理订单异常",e);
                try {
                    handlePendingList();
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    //处理pending-list中的异常订单信息
    /**
     * 处理pending-list中的异常订单。
     *
     * 作用:
     * 1.从pending-list读取异常订单消息;
     * 2.判断消息获取是否成功;
     * 3.如果获取失败,说明pending-list没有异常消息,结束循环;
     * 4.如果获取成功,可以下单;
     * 5.处理成功后ACK确认消息;
     *
     * @return无返回值
     */
    private void handlePendingList() throws InterruptedException {
        while(true){
            try {
                //1.从pending-list读取异常订单消息
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"), //g1消费者组名,c1消费者名
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), //count每次最多拉取1条消息,block最多阻塞2秒钟
                        StreamOffset.create(queueName, ReadOffset.from("0")) //读取待确认列表里的异常消息
                );
                //2.判断消息获取是否成功
                if(list==null||list.isEmpty()){
                    //2.1.如果获取失败,说明pending-list没有异常消息,结束循环
                    break;
                }
                //3.解析消息中的订单信息 //MapRecord<消息id,消息key,消息value>
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values,new VoucherOrder(),true);
                //4.如果获取成功,可以下单
                handleVoucherOrder(voucherOrder);
                //5.处理成功后ACK确认消息
                stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
            } catch (Exception e) {
                log.error("处理pending-list异常",e);
                try{
                    // 防止处理频繁,下次循环休眠20毫秒
                    Thread.sleep(20);
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }
}
  • 测试下单 恢复数据库和Redis,测试单人下单,正常下单

再进行压力性能测试,与之前的阻塞队列的吞吐量差不多,但是Stream更加灵活和可靠。