1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
@Service
@Slf4j
@RequiredArgsConstructor
public class RedisService {
private final RedissonClient redissonClient;
/**
* 发送消息到 Stream
*/
public String streamAdd(String streamKey, Map<String, String> message, int maxLen) {
RStream<String, String> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
StreamAddArgs<String, String> addArgs = StreamAddArgs.entries(message);
if (maxLen > 0) {
// 非严格裁剪,超过最大长度时裁剪旧消息
addArgs.trimNonStrict().maxLen(maxLen);
}
StreamMessageId streamMessageId = stream.add(addArgs);
return streamMessageId.toString();
}
/**
* 创建消费者组
*/
public void createConsumer(String streamKey, String groupName) {
RStream<Object, Object> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
try {
stream.createGroup(StreamCreateGroupArgs.name(groupName).makeStream());
} catch (Exception e) {
// 忽略组已存在的异常
if (!e.getMessage().contains("BUSYGROUP")) {
log.warn("创建消费者组失败: {}", e.getMessage());
}
}
}
/**
* 消费消息(阻塞模式)
*/
public boolean streamConsumeMessages(String streamKey, String groupName,
String consumerName, int count, long blockTimeoutMs,
StreamMessageProcessor processor) {
RStream<String, String> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
try {
Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
groupName, consumerName,
StreamReadGroupArgs.neverDelivered().count(count)
.timeout(Duration.ofMillis(blockTimeoutMs)));
if (messages == null || messages.isEmpty()) {
return false;
}
for (StreamMessageId streamMessageId : messages.keySet()) {
processor.process(streamMessageId, messages.get(streamMessageId));
}
return true;
} catch (Exception e) {
// Redisson bug: 无消息时返回 EmptyList
return false;
}
}
/**
* 确认消息
*/
public void streamAck(String streamKey, String groupName, StreamMessageId... ids) {
RStream<Object, Object> stream = redissonClient.getStream(streamKey, StringCodec.INSTANCE);
stream.ack(groupName, ids);
}
@FunctionalInterface
public interface StreamMessageProcessor {
void process(StreamMessageId messageId, Map<String, String> data);
}
}
|