Redis消息队列
约 3595 字大约 12 分钟
2025-07-07
7.1 认识消息队列
消息队列是一种用于存储和管理消息的机制,它在生产者和消费者之间起到了桥梁的作用,也被称为消息代理(Message Broker)。消息队列包含以下三个核心角色:
- 消息队列(Message Queue): 负责存储和管理消息。
- 生产者(Producer): 将消息发送到消息队列。
- 消费者(Consumer): 从消息队列获取消息并处理消息。
消息队列的核心优势在于解耦。解耦是指将系统的不同组件分离,使得它们之间的依赖性降低。例如,在快递场景中,快递员(生产者)将快递放入快递柜(Message Queue),用户(消费者)再从快递柜取件。这种模式实现了异步处理,避免了快递员必须等待用户在家才能交付快递的情况,从而提高了效率。
在秒杀场景中,用户下单后,系统可以使用 Redis 快速校验下单条件,然后将消息发送到消息队列,再由独立的线程异步消费消息,完成后续处理。这样可以加快响应速度,提升用户体验。

虽然可以使用 Kafka、RabbitMQ 等专业的 MQ 中间件,但如果没有安装这些组件,可以直接使用 Redis 提供的 MQ 方案,以降低部署和学习成本。
7.2 基于 List 实现消息队列
Redis 的 List 数据结构是一个双向链表,可以方便地模拟队列。队列的特点是入口和出口不在同一侧,因此可以使用 LPUSH
结合 RPOP
或 RPUSH
结合 LPOP
来实现队列效果。

需要注意的是,当队列为空时,RPOP
或 LPOP
操作会返回 null
,而不会阻塞等待消息。为了实现阻塞效果,可以使用 BRPOP
或 BLPOP
命令。
基于 List 的消息队列的优缺点如下:
优点:
- 利用 Redis 存储,不受限于 JVM 内存上限。
- 基于 Redis 的持久化机制,数据安全性有保证。
- 可以满足消息的有序性。
缺点:
- 无法避免消息丢失。
- 只支持单消费者。
7.3 基于 PubSub 的消息队列
PubSub(发布订阅)是 Redis 2.0 版本引入的消息传递模型。消费者可以订阅一个或多个 channel,生产者向 channel 发送消息后,所有订阅者都能收到消息。
常用的 PubSub 命令包括:
SUBSCRIBE channel [channel]
:订阅一个或多个频道。PUBLISH channel msg
:向一个频道发送消息。PSUBSCRIBE pattern [pattern]
:订阅与 pattern 格式匹配的所有频道。基于 PubSub 的消息队列的优缺点如下:
优点:
- 采用发布订阅模型,支持多生产、多消费。
缺点:
- 不支持数据持久化。
- 无法避免消息丢失。
- 消息堆积有上限,超出时数据丢失。
7.4 基于 Stream 的消息队列
Stream 是 Redis 5.0 引入的一种新型数据类型,它实现了一个功能完善的消息队列。
7.4.1 发送消息:XADD
命令
XADD
命令用于向 Stream 中追加新的消息。
XADD key [NOMKSTREAM] [MAXLEN|MINID [~|=] threshold [LIMIT count]] *|ID field value [field value ...]
key
: Stream 的名称。NOMKSTREAM
: 如果 Stream 不存在,则自动创建 Stream。MAXLEN|MINID
: 设置消息队列的最大消息数量。*
: 代表由 Redis 自动生成唯一 ID,格式是 " 时间戳 - 递增数字 ",例如 "1644804662707-0"。ID
: 消息的唯一 ID。field value
: 消息的内容,格式就是 key-value 键值对。
例如,创建一个名为 users
的 Stream,并向其中发送了一条消息,内容是 {name=jack, age=21}
,并且使用 Redis 自动生成的 ID。
XADD users * name jack age 21
7.4.2 读取消息:XREAD
命令
XREAD 命令用于从 Stream 中读取消息。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
COUNT
: 每次读取消息的最大数量。BLOCK
: 当没有消息可读时,阻塞等待的时长,单位是毫秒。STREAMS
: 要从哪个队列读取消息,key 是队列名。ID
: 起始 ID,只返回大于该 ID 的消息。0
代表从第一条消息开始,$
代表从最新的消息开始。
例如,使用 XREAD
从名为 users
的 Stream 中读取第一条消息:
XREAD COUNT 1 STREAMS users 0
以阻塞方式读取 users
Stream 中最新的消息,最多阻塞 1000 毫秒:
XREAD COUNT 1 BLOCK 1000 STREAMS users $
7.4.3 循环 XREAD
阻塞方式的应用
在业务开发中,可以循环调用 XREAD 阻塞方式来查询最新消息,从而实现持续监听队列的效果。
以下是示例代码:
while (true) {
// 尝试读取队列中的消息,最多阻塞 2 秒
Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
if (msg == null) {
continue;
}
// 处理消息
handleMessage(msg);
}
这段代码展示了一个简单的消息队列监听器的实现。它通过一个无限循环不断地尝试从 users
Stream 中读取最新的消息。
XREAD
命令使用BLOCK 2000
参数,这意味着如果当前 Stream 中没有新的消息,该命令将阻塞 2 秒钟,避免了不必要的 CPU 资源占用。- 如果
XREAD
命令返回null
,则表示在阻塞时间内没有新的消息到达,此时continue
语句会跳过本次循环,并进入下一次循环尝试读取消息。 - 如果
XREAD
命令成功读取到消息,则调用handleMessage
方法来处理该消息。
警告
在使用消息队列时,如果指定起始 ID 为 $
,表示从最新的消息开始读取。但这种方式在处理消息时,可能会出现漏读消息的问题。具体来说,如果在处理当前消息的过程中,有超过一条的新消息到达队列,那么下次读取时,仍然只能获取到最新的消息,导致中间的消息被跳过。
在实际应用中,需要根据具体的业务场景来选择合适的起始 ID 和消息处理策略,以确保消息的可靠性和完整性。
STREAM 类型消息队列的 XREAD
命令的特点:
- 消息可回溯。
- 一个消息可以被多个消费者读取。
- 可以阻塞读取。
- 有消息漏读的风险。
7.5 基于 Stream 的消息队列 - 消费者组
消费者组 (Consumer Group) 将多个消费者划分到一个组中,监听同一个队列。其特点如下:
- 消息分流: 队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。
- 消息标示: 消费者组会维护一个标示,记录最后一条被处理的消息,哪个消费者宕机重启后,会从标示之后读取消息,确保每一个消息都会被消费。
- 消息确认: 消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list,当处理完成后需要通过
XACK
来确认消息已处理,消息才会从 pending-list 移除。
7.5.1 创建消费者组
XGROUP CREATE key group ID [MKSTREAM]
key
:队列名称group
:消费者组名称ID
:起始 ID 标识,$
代表队列中最后一条消息,0
代表队列中第一条消息MKSTREAM
:队列不存在时自动创建队列
其他常用命令如下所示:
功能 | 命令 |
---|---|
删除指定的消费者组 | XGROUP DESTROY key group |
给指定的消费者组添加消费者 | XGROUP CREATECONSUMER key group consumer |
删除消费者组中的指定消费者 | XGROUP DELCONSUMER key group consumer |
从消费者组读取消息 | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] |
consumer
:消费者名称,如果消费者不存在,会自动创建一个消费者COUNT
:本次查询的最大数量BLOCK milliseconds
:当没有消息时最长等待时间NOACK
:无需手动 ACK,获取到消息后自动确认STREAMS key
:指定队列名称ID
:获取消息的起始 ID;>
: 从下一个未消费的消息开始- 其他:根据指定 id 从 pending-list 中获取已消费但未确认的消息,例如 0,是从 pending-list 中的第一个消息开始
7.5.2 消费者监听队列的基本思路
- 循环监听: 通过
while(true)
循环不断尝试从队列中读取消息。 - 尝试读取消息: 使用
redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >")
命令,以阻塞模式尝试从名为s1
的 Stream 队列的g1
消费者组中读取一条消息。BLOCK 2000
表示最长等待 2000 毫秒。 - 处理消息: 如果成功读取到消息(
msg != null
),则调用handleMessage(msg)
方法处理消息,并在try...catch
块中捕获处理过程中的异常。处理完成后,需要手动ACK
确认消息已被消费。 - 异常处理和重试: 如果处理消息时出现异常,记录日志,并进入下一次循环重试。如果因为没有新消息而阻塞超时,也会继续循环尝试。
- pending 消息处理: 如果有异常消息,再次进行处理,结束循环。
while(true) {
// 尝试监听队列,使用阻塞模式, 最长等待2000 毫秒
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null) { // null说明没有消息,继续下一次
continue;
}
try {
// 处理消息,完成后一定要ACK
handleMessage(msg);
} catch (Exception e) {
while(true) {
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null) { // null说明没有异常消息,所有消息都已确认, 结束循环
break;
}
try {
// 说明有异常消息,再次处理
handleMessage(msg);
} catch(Exception e) {
// 再次出现异常,记录日志,继续循环
continue;
}
}
}
}
7.5.3 XREADGROUP
命令特性总结
- 消息可回溯
- 可以多消费者竞争消息,加快消费速度
- 可以阻塞读取
- 没有消息遗漏的风险
- 有消息确认机制,保证消息至少被消费一次
List, Pub/Sub, Stream 对比如下:
特性 | List | Pub/Sub | Stream |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
Stream 在消息持久化、消息确认机制和消息回溯方面都优于 List 和 Pub/Sub。Stream 支持消息持久化,保证消息不丢失;支持消息确认机制,确保消息至少被消费一次;支持消息回溯,允许消费者重新消费历史消息。
7.6 基于 Redis 的 Stream 结构作为消息队列,实现异步秒杀下单
创建一个 Stream 类型的消息队列,名为
stream.orders
。XGROUP CREATE stream.orders g1 0 MKSTREAM
向
stream.orders
中添加订单消息,内容包含voucherId
、userId
、orderId
。-- 1. 参数列表 -- 1.1 优惠券id local voucherId = ARGV[1] -- 1.2 用户id local userId = ARGV[2] -- 1.3 订单id -- local orderId = ARGV[3] -- 2. Redis key -- 2.1 库存 keylocal stockKey = "seckill:stock:" .. voucherId -- 2.2 订单 keylocal orderKey = "seckill:order:" .. voucherId -- 3. 脚本业务 -- 3.1 判断库存是否充足 if (tonumber(redis.call("GET", stockKey)) <= 0) then return 1 end -- 3.2 判断是否重复下单 if (redis.call("SISMEMBER", orderKey, userId) == 1) then return 2 end -- 3.3 扣减库存 redis.call("INCRBY", stockKey, -1) -- 3.4 下单 redis.call("SADD", orderKey, userId) -- 3.5 发送信息到消息队列 -- redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
修改秒杀流程。修改
seckillVoucher
方法,移除之前使用的阻塞队列,直接返回订单 ID。@Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1. 执行 Lua 脚本 Long result = stringRedisTemplate.execute( SCIKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); // 2. 判断结果是否为 0 if (result != 0L) { return result == 1L ? Result.fail(MessageConstants.STOCK_OUT) : Result.fail(MessageConstants.SINGLE_ORDER_LIMIT_REACHED); } // 3. 保存订单到阻塞队列 VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); orderTasks.add(voucherOrder); // 3. 获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); }
获取消息队列中的订单信息,完成下单。在线程池中提交
VoucherOrderHandler
任务。// 处理订单信息 private class VoucherOrderHandler implements Runnable { private String queueName = "stream.orders"; // 处理订单 @Override public void run() { while (true) { try { // 1. 获取队列中的订单信息 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); // 2. 判断消息是否获取成功 if (list == null || list.isEmpty()) { // 没有消息,继续循环 continue; } // 3. 获取成功,处理订单 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4. ACK 确认 stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("订单处理异常: {}", String.valueOf(e)); handlePendingList(); } } } private void handlePendingList() { while (true) { try { // 1. 获取 pending-list 队列中的订单信息 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create(queueName, ReadOffset.from("0")) ); // 2. 判断消息是否获取成功 if (list == null || list.isEmpty()) { // pending-list 队列中没有消息,结束循环 break; } // 3. 获取成功,处理订单 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4. ACK 确认 stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId()); } catch (Exception e) { log.error("pending-list 处理异常: {}", String.valueOf(e)); try { Thread.sleep(20); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } } } }
代码解释:
- 使用
stringRedisTemplate.opsForStream().read()
方法从 Stream 队列中读取消息。Consumer.from("g1", "c1")
指定了消费者组和消费者名称,StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
设置了每次读取一条消息,最多阻塞 2 秒,StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
指定从下一个未消费的消息开始。 - 如果读取到消息,则解析消息内容,创建订单,并使用
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId())
方法确认消息。 - 如果处理消息过程中发生异常,则调用
handlePendingList()
方法处理 pending-list 中的消息。 handlePendingList()
方法用于处理未被确认的消息,它会从 pending-list 中读取消息,重新处理,并确认消息。
- 使用