正在做一个mq发送的业务,在微服务模块中,如果每个模块都要单独去引入MQ并且实现各种配置,感觉这个操作太乱了。突发奇想自己试着实现一个统一的模块来实现管理,接下来就是上手时间。
代码结构
我以整合RabbitMQ类捋一下我的思路
一、定义一个消息体接口,用于统一的发送接口的参数
|
|
二、定义一个发送消息接口,用于多态实现统一的发送接口
|
|
接下来就是整合RabbitMQ的具体操作了
三、加入依赖
|
|
四、连接信息类RabbitMqProperties
|
|
使用了@ConditionalOnProperty
和 @ConfigurationProperties
来实现当配置了share.rabbit.active = true 时,该配置类才会注册
五、连接配置类RabbitConfig
|
|
这部分遇到了几个问题,在这里总结一下
- 问题一:加载时机的问题
问题原因:在上面配置信息类中使用的时
@Component
注解将配置信息注入到Spring容器中,但是这里是配置类使用的注解@Configuration
在正常不做干扰的情况下执行顺序是比@Component
早的。而我为了防止在获取配置属性空指针,加上了@ConditionalOnBean(RabbitMqProperties.class)
注解,只有当Spring容器中存在RabbitMqProperties对象时才会构建当前类,那么问题就出现了,我都比你先加载了,我还能要求你存在吗,这肯定不符合逻辑。虽然一开始我加上了@DependsOn("rabbitMqProperties")
,但是这个注解只针对控制普通 bean 的初始化顺序,和@Configuration
不搭。解决办法:使用
@AutoConfigureAfter(RabbitMqProperties.class)
注解。
- 问题二:延时消息的发送一定会进入返回回调setReturnsCallback中,判断有问题导致一直在重发
问题原因:这里讲一下消息发送的过程
普通消息:客户端发送消息 ——> 交换机(根据路由匹配队列)——> 队列
延时消息:客户端发送消息 ——> 交换机不会马上路由匹配队列,而是暂存消息,等到设置的时间到了才会路由匹配队列——> 队列
会导致进入setReturnsCallback的原因:
- 对于普通交换机(如
direct
/topic
):消息到达后立即匹配路由键,若有队列绑定则路由,无则触发NO_ROUTE
。- 对于延迟交换机:消息到达后不立即路由,而是暂存,因此 RabbitMQ 会判定 “当前无队列匹配”,即使后续延迟到期会路由,也会在 消息到达时即时触发
NO_ROUTE
,进而触发ReturnsCallback
。所以在进入回调后需要判断是否是由延迟消息导致,如果是就不能进行重发了。
六、消息实现类RabbitCorrelationData
|
|
扩展:
交换机类型 | 匹配规则 | 效率 | 通信 |
---|---|---|---|
DirectExchange | 精确路由键匹配 | 低 | 一对一通信、精确路由 |
TopicExchange | 通配符路由键匹配 | 中 | 按主题 / 类别路由(如日志) |
FanoutExchange | 无(广播) | 低 | 广播通知、多队列同步 |
HeadersExchange | 消息头信息匹配 | 高 | 复杂多条件路由(较少用) |
DelayExchange | 延迟时间 + 基础路由规则 | 中 | 延迟任务、定时投递 |
七、消息发送类RabbitMqService
|
|
在消息发送时判断了交换机、队列、路由是否存在,不存在则创建。
八、最后,为了在其他模块引入该模块这些类能正确被Spring扫描到,需要在org.springframework.boot.autoconfigure.AutoConfiguration.imports中加入配置
|
|
九、测试
-
在业务模块中引入该模块包依赖
-
配置相关信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /share publisher-confirm-type: CORRELATED publisher-returns: true listener: simple: cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发 # 使用rabbitMQ share: rabbit: active: true
-
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
@RestController public class MqController { @Autowired private RabbitMqService rabbitMqService; @Operation(summary = "发送延迟消息") @GetMapping("/sendDelay") public AjaxResult sendDelay(String msg) { RabbitCorrelationData data = new RabbitCorrelationData(); data.setMessage(msg); data.setExchangeName("test-delay-exchange"); data.setRoutingKey("test"); data.setId(UUID.randomUUID().toString()); rabbitMqService.sendMessage( data, 15); return success(); } @Operation(summary = "发送确认消息") @GetMapping("/send") public AjaxResult send(String msg) { RabbitCorrelationData data = new RabbitCorrelationData(); data.setMessage(msg); data.setExchangeName("test-exchange"); data.setRoutingKey("test"); data.setId(UUID.randomUUID().toString()); rabbitMqService.sendMessage( data, null); return success(); } }
-
监听消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
@Component @Slf4j public class Test { @Autowired private RedisTemplate redisTemplate; @RabbitListener(queues = "test-delay-exchange.test.queue") public void testListener(String msg, Message message, Channel channel) throws IOException { //接收消息,消费者端判断是否需要做幂等性处理 //如果业务保证幂等性,基于redis setnx保证 String key = "mq:" + msg; Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 200, TimeUnit.SECONDS); if (!flag) { //说明该业务数据已经被执行 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } // 执行业务 // TODO log.info("接受到消息:{},主体为{},--{}" , msg , message.toString(),message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "test-exchange.test.queue") public void testListener(String msg, Message message, Channel channel) throws IOException { //接收消息,消费者端判断是否需要做幂等性处理 //如果业务保证幂等性,基于redis setnx保证 String key = "mq:" + msg; Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 200, TimeUnit.SECONDS); if (!flag) { //说明该业务数据已经被执行 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } // 执行业务 // TODO log.info("接受到消息:{},主体为{},--{}" , msg , message.toString(),message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
打印延迟消息发送日志,23:01:23.981发送,23:01:38.984接受成功,证明成功了
1 2 3 4 5 6
23:01:23.973 [http-nio-9212-exec-1] INFO c.s.m.r.s.RabbitMqService - [ensureBindingExists,114] - 绑定已就绪: test-exchange->test-exchange.test.queue[test] 23:01:23.980 [connectionFactory1] INFO c.s.m.r.c.RabbitConfig - [lambda$rabbitTemplate$1,96] - 延迟消息[3343986a-662e-41ae-8a0f-9dbbcc2793d2]暂存中,忽略NO_ROUTE(交换机: test-exchange, 路由键: test) 23:01:23.981 [connectionFactory1] INFO c.s.m.r.c.RabbitConfig - [lambda$rabbitTemplate$0,72] - 消息[3343986a-662e-41ae-8a0f-9dbbcc2793d2]发送成功(交换机: test-exchange) 23:01:38.984 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] WARN o.s.a.s.c.Jackson2JsonMessageConverter - [fromMessage,325] - Could not convert incoming message with content-type [text/plain], 'json' keyword missing. 23:01:38.997 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.s.o.r.l.Test - [testListener,37] - 接受到消息:"正在进行测试延迟消息的发送32",主体为(Body:'"正在进行测试延迟消息的发送32"' MessageProperties [headers={spring_listener_return_correlation=bf33998d-639a-450e-a258-d59de7c19e6c, spring_returned_message_correlation=3343986a-662e-41ae-8a0f-9dbbcc2793d2, retry_count=0, correlation_id=3343986a-662e-41ae-8a0f-9dbbcc2793d2, is_delay=true}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test-exchange, receivedRoutingKey=test, receivedDelay=15000, deliveryTag=1, consumerTag=amq.ctag--7dyhSB05z_2PmUs7A6hbw, consumerQueue=test-exchange.test.queue]),--1 23:01:39.001 [AMQP Connection 192.168.23.100:5672] ERROR o.s.a.r.c.CachingConnectionFactory - [log,742] - Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)