Redis Stream 作为队列的定位

Redis 做「队列」常见几种方式:

  • List(LPUSH/BRPOP):简单 FIFO,但一条消息只能被一个消费者取走,无法多实例负载均衡,也没有「未 ACK 可重新投递」的语义。
  • Pub/Sub:广播,消息不落盘,订阅者不在就丢消息,不适合任务队列。
  • Stream:消息持久化、支持消费者组、每条消息有唯一 ID、支持 XACKPEL(Pending Entries List),可实现多消费者负载均衡与至少一次消费,适合作为「队列」使用。

下文先说明把 Redis Stream 当队列 的核心概念与 Spring 用法,再在延伸里讲 基于 ZSET + Stream 的延迟队列 实现要点。


Stream 核心概念与命令

基本结构

  • Stream:一个 key 对应一条日志流,每条记录有一个 消息 ID(默认毫秒时间戳-序号,如 1739123456789-0)和多个 field-value 对(类似 hash)。
  • 生产者XADD stream * field1 value1 field2 value2* 表示自动生成 ID。
  • 单消费者读XREAD BLOCK 5000 STREAMS stream 0,从 ID 0 起读,阻塞最多 5 秒。

消费者组(多消费者、负载均衡、至少一次)

  • XGROUP CREATE stream group-name 0 MKSTREAM:在 stream 上创建消费者组,从 0 开始消费;MKSTREAM 表示 stream 不存在时创建。
  • XREADGROUP GROUP group-name consumer-name STREAMS stream >:在组内用 consumer-name 领消息,> 表示「只领尚未分配给任何消费者的新消息」。
  • 被领走但未 ACK 的消息会进入该消费者在组内的 PEL(Pending Entries List);其他消费者不会再次领到同一条,除非用 XAUTOCLAIM 接管超时未 ACK 的。
  • XACK stream group-name message-id:确认某条消息已处理完成,从 PEL 移除。
  • XDEL stream message-id:从 stream 中删除该条记录(可选,看是否需要保留历史)。

因此:把 Stream 当队列 = 用 XADD 投递 → 用 XREADGROUP 在组内领消息 → 业务处理 → XACK(可选再 XDEL),即可实现多实例负载均衡与至少一次语义。


Spring 集成:监听容器与建组时机

StreamMessageListenerContainer

Spring Data Redis 提供 StreamMessageListenerContainer,内部用 XREADGROUP 轮询,收到消息后回调你注册的 StreamListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainerOptions.<String, MapRecord<String, String, String>>builder()
.batchSize(10)
.pollTimeout(Duration.ofSeconds(2))
.build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(factory, options);

container.receive(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
streamListener
);
container.start();
  • ReadOffset.lastConsumed():对应 Redis 的 >,只在组内领「尚未被分配过的」新消息,与 XACK 配合使用。
  • ReadOffset.latest():对应 $,只消费启动之后新到的消息,不涉及组内位点,一般不用于「队列」场景。

建组时机与 NOGROUP

使用 XREADGROUP 前,Stream 和消费者组必须已存在,否则会报 NOGROUP。因此要在 Stream 监听容器启动之前 先执行 XGROUP CREATE

做法:单独一个 Bean 在 @PostConstruct 里建组,Stream 容器的 @Bean@DependsOn 该 Bean,保证顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component("streamGroupInitializer")
public class StreamGroupInitializer {

private final RedisConnectionFactory connectionFactory;

@PostConstruct
public void createGroupIfNotExists() {
try (RedisConnection connection = connectionFactory.getConnection()) {
RedisStreamCommands streamCommands = connection.streamCommands();
byte[] streamKey = STREAM_KEY.getBytes(StandardCharsets.UTF_8);
try {
streamCommands.xGroupCreate(streamKey, GROUP_NAME, ReadOffset.from("0"), true);
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("BUSYGROUP")) {
// 组已存在,忽略
} else {
throw e;
}
}
}
}
}
1
2
3
4
5
6
7
@Bean
@DependsOn("streamGroupInitializer")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
RedisConnectionFactory factory) {
// ... 同上 create + receive + start ...
return container;
}

消费语义:ACK 与可选 XDEL

  • 处理成功:应对该消息做 XACK,否则会一直留在 PEL,占位且可能被 XAUTOCLAIM 转给其他消费者。
  • 若希望 stream 不堆积已消费消息,可在 ACK 后 XDELXACK 与 XDEL 建议用 Lua 脚本 原子执行,避免只 ACK 不删或只删不 ACK 导致状态不一致。
1
2
3
redis.call('XACK', KEYS[1], ARGV[1], ARGV[2])
redis.call('XDEL', KEYS[1], ARGV[2])
return 1
1
2
3
4
5
// KEYS[1]=stream, ARGV[1]=group, ARGV[2]=messageId
redisTemplate.execute(SCRIPT_ACK_AND_DELETE,
List.of(STREAM_KEY),
GROUP_NAME,
messageId);
  • 处理失败:不要 ACK,可抛异常或记日志;消息会留在 PEL,可由同一或其它消费者通过 XAUTOCLAIM 或再次读取 PEL 做重试。

Listener 中拿到消息与业务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onMessage(MapRecord<String, String, String> message) {
String messageId = message.getId().getValue();
Map<String, String> value = message.getValue();
String body = value.get("body");

try {
processTask(body);
redisTemplate.execute(SCRIPT_ACK_AND_DELETE, List.of(STREAM_KEY), GROUP_NAME, messageId);
} catch (Exception e) {
log.error("处理失败,消息留在 PEL", e);
throw e;
}
}

生产端用 XADD 或 Spring 的 StringRedisTemplate 写 stream(可封装为 opsForStream().add(StreamRecords.newRecord().in(STREAM_KEY).ofMap(map))即可完成「Stream 作为队列」的闭环。


延伸:基于 ZSET + Stream 的延迟队列

在「Stream 作队列」的基础上,若需要 按时间触发(延迟 N 秒/分钟再消费),可加上 ZSET 做时间调度,把「到期」任务再搬进 Stream,形成延迟队列。

需求与架构

  • 任务先按 执行时间 排队,到期后再被消费;消费失败要能 按重试次数 重新入队。
  • ZSET:score = 执行时间戳,member 为 {retry}_{key}key 为业务唯一标识,如订单号),用 ZRANGEBYSCORE ZSET_KEY 0 now 拉取到期任务。
  • Dispatcher:单独线程循环拉取到期 member,从 ZSET 移除写入 Stream;只有真正移除成功才写入,避免多实例重复入队,因此 ZREM + XADDLua 原子 执行。
  • Consumer:与普通 Stream 队列一致,业务成功则 XACK + XDEL(Lua),失败则不 ACK;若需延迟重试,可解析 body 拆出 retry 与 key,未超最大重试则以 (retry+1)_key 再次 ZADD,超过最大重试则只打日志。

Member 格式与 retry 传递

  • 例如 0_order-1001:前半为 retry,第一个 _ 之后为业务 key。Dispatcher 把整段作为 Stream 的 body,把前半作为 retry;失败回写 1_order-1001 等,并配合最大重试次数(如 3)。

常量与配置

1
2
3
4
5
6
7
8
public final class DelayQueueConfig {
public static final String ZSET_KEY = "delay-queue:zset";
public static final String STREAM_KEY = "delay-queue";
public static final String GROUP_NAME = "delay-queue:consumer-group";
public static final String CONSUMER_NAME = "consumer_1";
public static final String BODY_FIELD = "body";
public static final String RETRY_FIELD = "retry";
}

生产者:ZADD 延迟任务

约定 member 为 0_{key},便于解析 retry;若直接以业务 key 入 ZSET(无下划线),Dispatcher 将 retry 视为 0

1
2
3
4
5
public void submit(String key, long delayMs) {
String member = "0_" + key;
long executeAt = System.currentTimeMillis() + delayMs;
redisTemplate.opsForZSet().add(DelayQueueConfig.ZSET_KEY, member, executeAt);
}

Dispatcher Lua:ZREM 成功才 XADD

1
2
3
4
5
local removed = redis.call('ZREM', KEYS[1], ARGV[1])
if removed == 1 then
return redis.call('XADD', KEYS[2], '*', ARGV[2], ARGV[3], ARGV[4], ARGV[5])
end
return nil

KEYS[1]=ZSET,KEYS[2]=STREAM;ARGV[1]=member,ARGV[2..5] 为 body 字段名、member 值、retry 字段名、retry 值。

Dispatcher:线程循环与 Java 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private static final RedisScript<String> SCRIPT_ZREM_THEN_XADD = RedisScript.of(
"local removed = redis.call('ZREM', KEYS[1], ARGV[1])\n"
+ "if removed == 1 then\n"
+ " return redis.call('XADD', KEYS[2], '*', ARGV[2], ARGV[3], ARGV[4], ARGV[5])\n"
+ "end\n"
+ "return nil",
String.class);

@PostConstruct
public void startDispatcher() {
dispatcherThread = new Thread(this::runLoop, "delay-queue-dispatcher");
dispatcherThread.start();
}

@PreDestroy
public void stopDispatcher() {
running.set(false);
if (dispatcherThread != null) {
dispatcherThread.interrupt();
}
}

private void runLoop() {
while (running.get()) {
try {
if (dispatchBatch() == 0) {
Thread.sleep(50);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.warn("Dispatcher 批次异常", e);
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}

private int dispatchBatch() {
long now = System.currentTimeMillis();
Set<String> expired = redisTemplate.opsForZSet().rangeByScore(
DelayQueueConfig.ZSET_KEY, 0, now, 0, 100);
if (expired == null || expired.isEmpty()) {
return 0;
}
int count = 0;
List<String> redisKeys = List.of(DelayQueueConfig.ZSET_KEY, DelayQueueConfig.STREAM_KEY);
for (String member : expired) {
String retryPart = member.indexOf('_') >= 0
? member.substring(0, member.indexOf('_'))
: "0";
redisTemplate.execute(
SCRIPT_ZREM_THEN_XADD,
redisKeys,
member,
DelayQueueConfig.BODY_FIELD,
member,
DelayQueueConfig.RETRY_FIELD,
retryPart);
count++;
}
return count;
}

有到期任务时连续 dispatchBatch 直至本批为空再进入短休眠,避免固定周期拉长延迟;@PreDestroy 中断线程并优雅退出。

Consumer:成功 ACK+DEL,失败回写 ZSET(示例)

成功处理仍用前文 SCRIPT_ACK_AND_DELETE。失败时可选将任务重新写入 ZSET(注意与 PEL 策略二选一,避免重复消费):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void rescheduleToZset(String body, long delayMs, int maxRetry) {
int sep = body.indexOf('_');
if (sep < 0) {
return;
}
int retry = Integer.parseInt(body.substring(0, sep));
String key = body.substring(sep + 1);
if (retry >= maxRetry) {
log.warn("超过最大重试,丢弃: {}", body);
return;
}
String member = (retry + 1) + "_" + key;
long executeAt = System.currentTimeMillis() + delayMs;
redisTemplate.opsForZSet().add(DelayQueueConfig.ZSET_KEY, member, executeAt);
}

在 Listener 的 catch 中调用 rescheduleToZset(body, backoffMs, 3) 即可;若采用「失败不 ACK、仅靠 PEL/XAUTOCLAIM 重试」,则不必写 ZSET。

Listener:读取 body / retry 并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void onMessage(MapRecord<String, String, String> message) {
String messageId = message.getId().getValue();
String body = message.getValue().get(DelayQueueConfig.BODY_FIELD);
String retried = message.getValue().get(DelayQueueConfig.RETRY_FIELD);
if (!StringUtils.hasLength(body) || !StringUtils.hasLength(retried)) {
log.warn("消息缺 body/retry, messageId: {}", messageId);
return;
}
log.info("收到延迟任务 body={}, retry={}, id={}", body, retried, messageId);
try {
processTask(body);
redisTemplate.execute(
SCRIPT_ACK_AND_DELETE,
List.of(DelayQueueConfig.STREAM_KEY),
DelayQueueConfig.GROUP_NAME,
messageId);
} catch (Exception e) {
log.error("处理失败,消息留在 PEL 或按需回写 ZSET, body={}", body, e);
throw e;
}
}

小结

  • Redis Stream 作为队列:XADD 投递 → 消费者组 XREADGROUP → 业务处理 → XACK(+ 可选 XDEL),建组在监听容器前完成(@DependsOn),ACK 与 XDEL 可用 Lua 原子化。
  • 延迟队列:ZSET member 为 {retry}_{key},Dispatcher 用 Lua 原子 ZREM+XADD 搬入 Stream;Consumer 成功则 XACK+XDEL,失败可按 (retry+1)_key 回写 ZSET 或依赖 PEL 重试,并限制最大重试。