Featured image of post Redis Stream消息队列实战

Redis Stream消息队列实战

什么是 Redis Stream?

Redis Stream 是 Redis 5.0 引入的数据结构,专门用于实现消息队列。与传统的 List、Pub/Sub 相比,Stream 提供了更强大的功能:

  • 消息持久化:消息存储在 Redis 中,重启不丢失
  • 消费者组:支持多个消费者组成团队,分摊处理消息
  • 消息确认机制:支持 ACK确认,确保消息被正确处理
  • 消息 ID:每个消息都有唯一自增 ID,支持消息追踪
  • 范围查询:支持按 ID 范围读取消息
  • 无限堆积:支持配置最大长度,自动裁剪旧消息

Redis Stream 核心概念

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
Stream Key: user:actions:stream
    |
    ├── 消息ID (格式: timestamp-sequence)
    |   ├── field1: value1
    |   ├── field2: value2
    |   └── field3: value3
    |
    ├── 消息ID
    |   ├── field1: value1
    |   └── ...
    |
    └── 消费者组 (consumer groups)
        ├── group1 (analyze-group)
        |   ├── consumer1 (analyze-consumer-xxx)
        |   └── consumer2 (analyze-consumer-yyy)
        |
        └── group2 (vectorize-group)
            ├── consumer1 (vectorize-consumer-xxx)
            └── ...

关键术语

术语 说明
Stream 类似 List 的消息队列,但更强大
Consumer Group 消费者组,多个消费者共享消息
Consumer 消费者,属于某个组
Message ID 消息唯一标识,格式为 timestamp-sequence
PEL (Pending Entries List) 待确认消息列表

项目集成

Maven 依赖

1
2
3
4
5
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>4.0.0</version>
</dependency>

Redis Service 封装

项目中封装了完整的 Redis Stream 操作服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Service
@Slf4j
@RequiredArgsConstructor
public class RedisService {

    private final RedissonClient redissonClient;

    /**
     * 发送消息到 Stream
     */
    public String streamAdd(String streamKey, Map<String, String> message, int maxLen) {
        RStream<String, String> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
        StreamAddArgs<String, String> addArgs = StreamAddArgs.entries(message);

        if (maxLen > 0) {
            // 非严格裁剪,超过最大长度时裁剪旧消息
            addArgs.trimNonStrict().maxLen(maxLen);
        }
        StreamMessageId streamMessageId = stream.add(addArgs);
        return streamMessageId.toString();
    }

    /**
     * 创建消费者组
     */
    public void createConsumer(String streamKey, String groupName) {
        RStream<Object, Object> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
        try {
            stream.createGroup(StreamCreateGroupArgs.name(groupName).makeStream());
        } catch (Exception e) {
            // 忽略组已存在的异常
            if (!e.getMessage().contains("BUSYGROUP")) {
                log.warn("创建消费者组失败: {}", e.getMessage());
            }
        }
    }

    /**
     * 消费消息(阻塞模式)
     */
    public boolean streamConsumeMessages(String streamKey, String groupName,
            String consumerName, int count, long blockTimeoutMs,
            StreamMessageProcessor processor) {
        RStream<String, String> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);

        try {
            Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
                groupName, consumerName,
                StreamReadGroupArgs.neverDelivered().count(count)
                    .timeout(Duration.ofMillis(blockTimeoutMs)));

            if (messages == null || messages.isEmpty()) {
                return false;
            }

            for (StreamMessageId streamMessageId : messages.keySet()) {
                processor.process(streamMessageId, messages.get(streamMessageId));
            }
            return true;
        } catch (Exception e) {
            // Redisson bug: 无消息时返回 EmptyList
            return false;
        }
    }

    /**
     * 确认消息
     */
    public void streamAck(String streamKey, String groupName, StreamMessageId... ids) {
        RStream<Object, Object> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
        stream.ack(groupName, ids);
    }

    @FunctionalInterface
    public interface StreamMessageProcessor {
        void process(StreamMessageId messageId, Map<String, String> data);
    }
}

生产者实现

抽象基类

项目中实现了抽象基类,简化生产者开发:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public abstract class AbstractStreamProducer<T> {

    private final RedisService redisService;

    protected AbstractStreamProducer(RedisService redisService) {
        this.redisService = redisService;
    }

    protected String sendTask(T data) {
        try {
            String streamMessageId = redisService.streamAdd(
                streamKey(),
                buildMessage(data),
                AsyncTaskStreamConstants.BATCH_SIZE
            );
            log.info("{}任务已发送到Stream: {}, messageId={}",
                taskDisplayName(), payloadIdentifier(data), streamMessageId);
            return streamMessageId;
        } catch (Exception e) {
            log.error("发送{}任务失败: {}", taskDisplayName(), e.getMessage(), e);
            onSendFailed(data, "任务入队失败: " + e.getMessage());
        }
        return "";
    }

    protected abstract String taskDisplayName();
    protected abstract String streamKey();
    protected abstract Map<String, String> buildMessage(T payload);
    protected abstract String payloadIdentifier(T payload);
    protected abstract void onSendFailed(T payload, String error);
}

具体生产者实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Service
@Slf4j
public class AnalyzeStreamProducer
        extends AbstractStreamProducer<AnalyzeStreamProducer.AnalyzeTaskPayload> {

    private final ResumeRepository resumeRepository;
    private final ObjectMapper objectMapper;

    protected AnalyzeStreamProducer(RedisService redisService,
            ResumeRepository resumeRepository, ObjectMapper objectMapper) {
        super(redisService);
        this.resumeRepository = resumeRepository;
        this.objectMapper = objectMapper;
    }

    public record AnalyzeTaskPayload(Long resumeId, String content) {}

    public String analyzeStreamSend(Long resumeId, String content) {
        return sendTask(new AnalyzeTaskPayload(resumeId, content));
    }

    @Override
    protected String taskDisplayName() {
        return "简历分析";
    }

    @Override
    protected String streamKey() {
        return AsyncTaskStreamConstants.RESUME_ANALYZE_STREAM_KEY;
    }

    @Override
    protected Map<String, String> buildMessage(AnalyzeTaskPayload payload) {
        return Map.of(
            AsyncTaskStreamConstants.FIELD_RESUME_ID, payload.resumeId().toString(),
            AsyncTaskStreamConstants.FIELD_CONTENT, payload.content(),
            AsyncTaskStreamConstants.FIELD_RETRY_COUNT, "0"
        );
    }

    @Override
    protected String payloadIdentifier(AnalyzeTaskPayload payload) {
        return objectMapper.writeValueAsString(payload);
    }

    @Override
    protected void onSendFailed(AnalyzeTaskPayload payload, String error) {
        updateAnalyzeStatus(payload.resumeId, AsyncTaskStatus.FAILED,
            truncateError(error));
    }
}

消费者实现

抽象基类

消费者基类封装了完整的消息消费流程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@Slf4j
public abstract class AbstractStreamConsumer<T> {

    private final RedisService redisService;
    private ExecutorService executorService;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private String consumerName;

    protected AbstractStreamConsumer(RedisService redisService) {
        this.redisService = redisService;
    }

    @PostConstruct
    public void init() {
        // 创建消费者组
        this.consumerName = consumerPrefix() + UUID.randomUUID().toString().substring(0, 8);
        try {
            redisService.createConsumer(streamKey(), groupName());
        } catch (Exception e) {
            log.error("创建消费者组发生异常(可能已存在): {}", e.getMessage());
        }

        // 创建单线程执行器
        this.executorService = new ThreadPoolExecutor(1, 1, 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
            r -> {
                Thread t = new Thread(r, threadName());
                t.setDaemon(true);
                return t;
            });

        running.set(true);
        executorService.submit(this::consumeLoop);
    }

    @PreDestroy
    public void destroy() {
        running.set(false);
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    private void consumeLoop() {
        while (running.get()) {
            try {
                redisService.streamConsumeMessages(
                    streamKey(), groupName(), consumerName,
                    AsyncTaskStreamConstants.BATCH_SIZE,
                    AsyncTaskStreamConstants.POLL_INTERVAL_MS,
                    this::processMessage
                );
            } catch (Exception e) {
                log.error("消费消息时发生错误: {}", e.getMessage(), e);
            }
        }
    }

    private void processMessage(StreamMessageId streamMessageId, Map<String, String> data) {
        T message = parsePayload(streamMessageId, data);
        if (isAck(message)) {
            ackMessage(streamMessageId);
        }

        int retryCount = getRetryCount(data);

        try {
            markProcessing(message);
            processBusiness(message);
            markCompleted(message);
            ackMessage(streamMessageId);
        } catch (Exception e) {
            if (retryCount < MAX_RETRY_COUNT) {
                retryMessage(message, retryCount);
            } else {
                markFailed(message, truncateError(
                    taskDisplayName() + "失败(已重试" + retryCount + "次): " + e.getMessage()
                ));
            }
            ackMessage(streamMessageId);
        }
    }

    protected abstract void retryMessage(T payload, int retryCount);
    protected abstract String taskDisplayName();
    protected abstract void markFailed(T message, String errorMsg);
    protected abstract void markCompleted(T message);
    protected abstract void processBusiness(T message);
    protected abstract void markProcessing(T message);
    protected abstract String payloadIdentifier(T message);
    protected abstract int getRetryCount(Map<String, String> data);
    protected abstract T parsePayload(StreamMessageId streamMessageId,
                                       Map<String, String> data);
    protected abstract String groupName();
    protected abstract String consumerPrefix();
    protected abstract String threadName();
    protected abstract String streamKey();
}

具体消费者实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Service
@Slf4j
public class AnalyzeStreamConsumer
        extends AbstractStreamConsumer<AnalyzeStreamConsumer.AnalyzePayload> {

    private final ResumeRepository resumeRepository;
    private final ResumePersistenceService persistenceService;
    private final ResumeGradingService gradingService;

    record AnalyzePayload(Long resumeId, String content) {}

    protected AnalyzeStreamConsumer(RedisService redisService,
            ResumeRepository resumeRepository,
            ResumePersistenceService persistenceService,
            ResumeGradingService gradingService) {
        super(redisService);
        this.resumeRepository = resumeRepository;
        this.persistenceService = persistenceService;
        this.gradingService = gradingService;
    }

    @Override
    protected void retryMessage(AnalyzePayload payload, int retryCount) {
        redisService.streamAdd(streamKey(),
            Map.of(
                FIELD_RESUME_ID, payload.resumeId().toString(),
                FIELD_CONTENT, payload.content(),
                FIELD_RETRY_COUNT, String.valueOf(retryCount)
            ),
            STREAM_MAX_LEN
        );
    }

    @Override
    protected void processBusiness(AnalyzePayload payload) {
        Long resumeId = payload.resumeId();
        if (!resumeRepository.existsById(resumeId)) {
            log.warn("简历已被删除,跳过分析任务: resumeId={}", resumeId);
            return;
        }

        ResumeAnalysisResponse analysis = gradingService.analyzeResume(payload.content());
        ResumeEntity resume = resumeRepository.findById(resumeId).orElse(null);
        if (resume == null) {
            log.warn("简历在分析期间被删除,跳过保存结果: resumeId={}", resumeId);
            return;
        }
        persistenceService.saveAnalysis(resume, analysis);
    }

    @Override
    protected int getRetryCount(Map<String, String> data) {
        return Integer.parseInt(
            data.getOrDefault(FIELD_RETRY_COUNT, "0").toString()
        );
    }

    @Override
    protected AnalyzePayload parsePayload(StreamMessageId streamMessageId,
                                          Map<String, String> data) {
        String resumeIdStr = data.get(FIELD_RESUME_ID);
        String content = data.get(FIELD_CONTENT);
        if (resumeIdStr == null || content == null) {
            log.warn("消息格式错误,跳过: messageId={}", streamMessageId);
            return null;
        }
        return new AnalyzePayload(Long.parseLong(resumeIdStr), content);
    }

    // ... 其他抽象方法实现
}

常量配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final class AsyncTaskStreamConstants {

    // 通用配置
    public static final int MAX_RETRY_COUNT = 3;
    public static final int BATCH_SIZE = 10;
    public static final long POLL_INTERVAL_MS = 1000;
    public static final int STREAM_MAX_LEN = 1000;

    // 字段名
    public static final String FIELD_RETRY_COUNT = "retryCount";
    public static final String FIELD_CONTENT = "content";
    public static final String FIELD_RESUME_ID = "resumeId";

    // Stream 配置
    public static final String RESUME_ANALYZE_STREAM_KEY = "resume:analyze:stream";
    public static final String RESUME_ANALYZE_GROUP_NAME = "analyze-group";
    public static final String RESUME_ANALYZE_CONSUMER_PREFIX = "analyze-consumer-";

    // 知识库 Stream 配置
    public static final String KB_VECTORIZE_STREAM_KEY = "knowledgebase:vectorize:stream";
    public static final String KB_VECTORIZE_GROUP_NAME = "vectorize-group";

    // 面试评估 Stream 配置
    public static final String INTERVIEW_EVALUATE_STREAM_KEY = "interview:evaluate:stream";
    public static final String INTERVIEW_EVALUATE_GROUP_NAME = "evaluate-group";

    // 语音面试 Stream 配置
    public static final String VOICE_EVALUATE_STREAM_KEY = "voice:evaluate:stream";
    public static final String VOICE_EVALUATE_GROUP_NAME = "voice-evaluate-group";
}

业务调用

在 Service 中使用生产者发送消息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
@RequiredArgsConstructor
public class ResumerUploadService {

    private final AnalyzeStreamProducer analyzeStreamProducer;

    public Map<String, Object> uploadAndAnalyze(MultipartFile file) {
        // ... 文件验证、解析、存储逻辑

        // 保存简历到数据库(状态为 PENDING)
        ResumeEntity resume = persistenceService.saveResume(file, resumeText, fileKey, fileUrl);

        // 发送分析任务到 Redis Stream
        analyzeStreamProducer.analyzeStreamSend(resume.getId(), resumeText);

        return Map.of(
            "resume", Map.of(
                "id", resume.getId(),
                "analyzeStatus", AsyncTaskStatus.PENDING.name()
            )
        );
    }
}

常见操作命令

使用 Redis CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 查看 Stream 信息
XINFO STREAM resume:analyze:stream

# 查看消费者组
XINFO GROUPS resume:analyze:stream

# 查看待确认消息 (PEL)
XINFO CONSUMERS resume:analyze:stream analyze-group

# 读取所有消息(不消费)
XRANGE resume:analyze:stream - +

# 手动确认消息
XACK resume:analyze:stream analyze-group 1689123456788-0

# 删除消息
XDEL resume:analyze:stream 1689123456788-0

# 裁剪 Stream 长度
XTRIM resume:analyze:stream MAXLEN 1000

消息堆积处理

当消费者处理速度跟不上生产速度时,会出现消息堆积。可以:

1. 增加消费者实例

启动多个消费者实例,自动分摊消息:

1
2
3
4
# 启动多个消费者进程
java -jar app.jar --consumer-id=1 &
java -jar app.jar --consumer-id=2 &
java -jar app.jar --consumer-id=3 &

2. 监控 PEL 大小

1
2
3
4
public long getPendingCount(String streamKey, String groupName) {
    RStream<Object, Object> stream = redissonClient.getStream(streamKey);
    return stream.getGroupInfo(groupName).getPending();
}

3. 死信队列

处理失败超过最大次数的消息,可以写入死信队列:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
protected void markFailed(T message, String errorMsg) {
    // 写入死信队列
    redisService.streamAdd("dlq:" + streamKey(),
        Map.of(
            "originalKey", streamKey(),
            "error", errorMsg,
            "payload", JSON.toJSONString(message),
            "failedAt", LocalDateTime.now().toString()
        )
    );
}

与 Kafka 对比

特性 Redis Stream Kafka
部署复杂度 低(依赖 Redis) 高(独立集群)
消息持久化 支持 支持
消息回溯 支持 支持
消费者组 支持 支持
消息吞吐 中等
消息重放 不支持 支持
事务支持 不支持 支持
适用场景 小中规模、简单场景 大规模、复杂场景

最佳实践

1. 合理设置批量大小和轮询间隔

1
2
3
// 平衡延迟和性能
private static final int BATCH_SIZE = 10;
private static final long POLL_INTERVAL_MS = 1000;

2. 启用 Stream 自动裁剪

1
streamAdd(streamKey, message, STREAM_MAX_LEN); // 最大 1000 条

3. 消费者优雅关闭

1
2
3
4
5
@PreDestroy
public void destroy() {
    running.set(false);
    executorService.shutdown();
}

4. 做好幂等处理

消息可能被重复消费,确保业务逻辑幂等:

1
2
3
4
5
6
7
8
protected void processBusiness(AnalyzePayload payload) {
    // 检查是否已处理(幂等)
    if (resume.getAnalyzeStatus() == AsyncTaskStatus.COMPLETED) {
        log.info("简历已分析过,跳过: resumeId={}", payload.resumeId());
        return;
    }
    // ... 正常处理逻辑
}

5. 监控关键指标

  • Stream 长度:stream.size()
  • 消费者数量:stream.getGroupInfo().getConsumers()
  • PEL 大小:stream.getGroupInfo().getPending()

总结

Redis Stream 是 Redis 提供的轻量级消息队列方案,适合中小规模应用。通过 Redisson 的封装,我们可以方便地实现:

  • 消息发送(生产者)
  • 消息消费(消费者)
  • 消费者组管理
  • 消息确认机制
  • 重试和死信处理

本项目中的设计模式(AbstractStreamProducer/Consumer)提供了良好的扩展性,可以轻松添加新的异步任务类型。

使用 Hugo 构建
主题 StackJimmy 设计

发布了 29 篇文章 | 共 67213 字