整合MQ的公共模块

正在做一个mq发送的业务,在微服务模块中,如果每个模块都要单独去引入MQ并且实现各种配置,感觉这个操作太乱了。突发奇想自己试着实现一个统一的模块来实现管理,接下来就是上手时间。

代码结构

我以整合RabbitMQ类捋一下我的思路

一、定义一个消息体接口,用于统一的发送接口的参数

1
2
3
public interface MessageModel {

}

二、定义一个发送消息接口,用于多态实现统一的发送接口

1
2
3
4
5
public interface MqSendModel {

    boolean sendMessage(MessageModel messageModel);

}

接下来就是整合RabbitMQ的具体操作了

三、加入依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
     <groupId>com.alibaba.fastjson2</groupId>
     <artifactId>fastjson2</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<!-- 缓存服务 -->
<dependency>
    <groupId>com.share</groupId>
    <artifactId>share-common-redis</artifactId>
</dependency>

四、连接信息类RabbitMqProperties

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Data
@ConditionalOnProperty(name = "share.rabbit.active", havingValue = "true")
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Component
public class RabbitMqProperties {
    private String host;
    private int port;
    private String username;
    private String password;
    private String virtualHost;
    private int connectionTimeout =5000;
    private int requestedHeartbeat =60;
    private String publisherConfirmType;
    private Boolean publisherReturns;
}

使用了@ConditionalOnProperty@ConfigurationProperties来实现当配置了share.rabbit.active = true 时,该配置类才会注册

五、连接配置类RabbitConfig

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package com.xxx.mq.rabbit.config;


import com.alibaba.fastjson2.JSON;
import com.xxx.mq.rabbit.dto.RabbitCorrelationData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.redis.core.RedisTemplate;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;


@ConditionalOnBean(RabbitMqProperties.class)  // 使用类字面量
@AutoConfigureAfter(RabbitMqProperties.class)
@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private RabbitMqProperties rabbitMqProperties;
    @Autowired
    private RedisTemplate redisTemplate;

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitMqProperties.getHost());
        connectionFactory.setPort(rabbitMqProperties.getPort());
        connectionFactory.setUsername(rabbitMqProperties.getUsername());
        connectionFactory.setPassword(rabbitMqProperties.getPassword());
        connectionFactory.setVirtualHost(rabbitMqProperties.getVirtualHost());
        String publisherConfirmType = rabbitMqProperties.getPublisherConfirmType();
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.valueOf(publisherConfirmType));
        connectionFactory.setConnectionTimeout(rabbitMqProperties.getConnectionTimeout());
        connectionFactory.setRequestedHeartBeat(rabbitMqProperties.getRequestedHeartbeat());
        connectionFactory.setPublisherReturns(rabbitMqProperties.getPublisherReturns());
        log.info("自定义RabbitMQ连接工厂已创建,连接至: {}:{}", rabbitMqProperties.getHost(), rabbitMqProperties.getPort());
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消息转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        // 开启返回模式
        rabbitTemplate.setMandatory(true);
        // 确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!(correlationData instanceof RabbitCorrelationData rabbitCorrelationData)) {
                log.warn("非RabbitCorrelationData类型,忽略确认回调: {}", correlationData);
                return;
            }
            if (ack) {
                log.info("消息[{}]发送成功(交换机: {})",
                        rabbitCorrelationData.getId(), rabbitCorrelationData.getExchangeName());
                // 成功后删除Redis备份(避免冗余)
                redisTemplate.delete(rabbitCorrelationData.getId());
            } else {
                log.error("消息[{}]发送失败(交换机: {}),原因: {}",
                        rabbitCorrelationData.getId(), rabbitCorrelationData.getExchangeName(), cause);
                retrySendMsg(rabbitCorrelationData, rabbitTemplate); // 投递失败重试
            }
        });
        // 返回回调
        rabbitTemplate.setReturnsCallback(returned -> {
            Message message = returned.getMessage();
            if (message == null) {
                log.error("路由失败的消息为空,忽略处理");
                return;
            }
            // 提取消息元数据(防御空指针)
            MessageProperties props = message.getMessageProperties();
            String correlationId = props.getHeader("correlation_id");
            Boolean isDelay = props.getHeader("is_delay");

            // 关键:延迟消息的NO_ROUTE是正常暂存,无需重发
            if (isDelay != null && isDelay) {
                log.info("延迟消息[{}]暂存中,忽略NO_ROUTE(交换机: {}, 路由键: {})",
                        correlationId, returned.getExchange(), returned.getRoutingKey());
                return;
            }

            // 非延迟消息的路由失败:触发重发
            log.error("非延迟消息路由失败!correlationId: {}, replyCode: {}, replyText: {}, 交换机/路由键: {}/{}",
                    correlationId, returned.getReplyCode(), returned.getReplyText(),
                    returned.getExchange(), returned.getRoutingKey());

            // 构建CorrelationData并重试
            if (correlationId != null) {
                String correlationDataStr = (String) redisTemplate.opsForValue().get(correlationId);
                if (correlationDataStr != null) {
                    RabbitCorrelationData rabbitCorrelationData = JSON.parseObject(correlationDataStr, RabbitCorrelationData.class);
                    retrySendMsg(rabbitCorrelationData, rabbitTemplate);
                } else {
                    log.error("消息[{}]的元数据在Redis中不存在,无法重试", correlationId);
                }
            }
        });

        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    /**
     * 消息重新发送
     *
     * @param correlationData
     */
    private void retrySendMsg(CorrelationData correlationData, RabbitTemplate rabbitTemplate) {
        //获取相关数据
        RabbitCorrelationData rabbitCorrelationData = (RabbitCorrelationData) correlationData;
        //获取redis中存放重试次数
        //先重发,在写会到redis中次数
        int retryCount = rabbitCorrelationData.getRetryCount();
        if (retryCount >= 3) {
            //超过最大重试次数
            log.error("生产者超过最大重试次数,将失败的消息存入数据库用人工处理;给管理员发送邮件;给管理员发送短信;");
            return;
        }
        //重发消息
        sendMessage(rabbitCorrelationData, rabbitTemplate);
        //重发次数+1
        retryCount += 1;
        rabbitCorrelationData.setRetryCount(retryCount);
        redisTemplate.opsForValue().set(rabbitCorrelationData.getId(), JSON.toJSONString(rabbitCorrelationData), 10, TimeUnit.MINUTES);
        log.info("进行消息重发!");
    }

    private void sendMessage(RabbitCorrelationData rabbitCorrelationData, RabbitTemplate rabbitTemplate) {
        Objects.requireNonNull(rabbitCorrelationData.getExchangeName(), "交换机名称不能为空");
        Objects.requireNonNull(rabbitCorrelationData.getRoutingKey(), "路由键不能为空");
        Objects.requireNonNull(rabbitCorrelationData.getMessage(), "消息体不能为空");
        byte[] messageBody;
        if (rabbitCorrelationData.getMessage() instanceof String) {
            messageBody = JSON.toJSONString(rabbitCorrelationData.getMessage()).getBytes(StandardCharsets.UTF_8);
        } else {
            messageBody = JSON.toJSONString(rabbitCorrelationData).getBytes(StandardCharsets.UTF_8);
        }
        // .withBody(messageBody) 部分是 真正传递到消费者的
        // send参数中的rabbitCorrelationData只会在回调中使用,不会到消费者
        Message message = MessageBuilder
                .withBody(messageBody).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(StandardCharsets.UTF_8.name())
                .setHeader("correlation_id", rabbitCorrelationData.getId())
                .setHeader("retry_count", rabbitCorrelationData.getRetryCount())
                .setHeader("is_delay", rabbitCorrelationData.isDelay())
                .build();
        if (rabbitCorrelationData.getDelayTime() != null) {
            message.getMessageProperties()
                    .setDelay(rabbitCorrelationData.getDelayTime());
        }
        rabbitTemplate.send(rabbitCorrelationData.getExchangeName(), rabbitCorrelationData.getRoutingKey(), message, rabbitCorrelationData);
    }

}
这部分遇到了几个问题,在这里总结一下
  1. 问题一:加载时机的问题

问题原因:在上面配置信息类中使用的时@Component注解将配置信息注入到Spring容器中,但是这里是配置类使用的注解@Configuration在正常不做干扰的情况下执行顺序是比@Component早的。而我为了防止在获取配置属性空指针,加上了@ConditionalOnBean(RabbitMqProperties.class)注解,只有当Spring容器中存在RabbitMqProperties对象时才会构建当前类,那么问题就出现了,我都比你先加载了,我还能要求你存在吗,这肯定不符合逻辑。虽然一开始我加上了@DependsOn("rabbitMqProperties"),但是这个注解只针对控制普通 bean 的初始化顺序,和@Configuration不搭。

解决办法:使用@AutoConfigureAfter(RabbitMqProperties.class)注解。

  1. 问题二:延时消息的发送一定会进入返回回调setReturnsCallback中,判断有问题导致一直在重发

问题原因:这里讲一下消息发送的过程

普通消息:客户端发送消息 ——> 交换机(根据路由匹配队列)——> 队列

延时消息:客户端发送消息 ——> 交换机不会马上路由匹配队列,而是暂存消息,等到设置的时间到了才会路由匹配队列——> 队列

会导致进入setReturnsCallback的原因:

  1. 对于普通交换机(如 direct/topic):消息到达后立即匹配路由键,若有队列绑定则路由,无则触发 NO_ROUTE
  2. 对于延迟交换机:消息到达后不立即路由,而是暂存,因此 RabbitMQ 会判定 “当前无队列匹配”,即使后续延迟到期会路由,也会在 消息到达时即时触发 NO_ROUTE,进而触发 ReturnsCallback

所以在进入回调后需要判断是否是由延迟消息导致,如果是就不能进行重发了。

六、消息实现类RabbitCorrelationData

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Data
public class RabbitCorrelationData extends CorrelationData implements MessageModel {
    //消息体
    private Object message;
    //交换机名称
    private String exchangeName;
    //路由键
    private String routingKey;
    //重试次数
    private int retryCount = 0;
    //是否延迟消息
    private boolean isDelay ;
    //延迟时长
    private Integer delayTime ;
    //交换机类型
    private MqConstants.ExchangeType exchangeType = MqConstants.ExchangeType.DIRECT_EXCHANGE;
}

public interface MqConstants {


    @Getter
    @AllArgsConstructor
    enum ExchangeType {

        /**
         * 精确路由键匹配 一对一通信、精确路由
         */
        DIRECT_EXCHANGE("direct"),
        /**
         * 通配符路由键匹配 按主题 / 类别路由(如日志)
         */
        TOPIC_EXCHANGE("topic"),
        /**
         * (广播)广播通知、多队列同步
         */
        FANOUT_EXCHANGE("fanout"),

        /**
         * 消息头信息匹配 复杂多条件路由(较少用)
         */
        HEADERS_EXCHANGE("headers"),

        /**
         * 延迟时间 + 基础路由规则中延迟任务、定时投递
         */
//        DELAY_EXCHANGE("delay"),

        ;
        private String name;


    }
}

扩展:

交换机类型 匹配规则 效率 通信
DirectExchange 精确路由键匹配 一对一通信、精确路由
TopicExchange 通配符路由键匹配 按主题 / 类别路由(如日志)
FanoutExchange 无(广播) 广播通知、多队列同步
HeadersExchange 消息头信息匹配 复杂多条件路由(较少用)
DelayExchange 延迟时间 + 基础路由规则 延迟任务、定时投递

七、消息发送类RabbitMqService

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package com.xxx.mq.rabbit.service;


import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.xxx.common.core.exception.ServiceException;
import com.xxx.mq.model.MessageModel;
import com.xxx.mq.model.MqSendModel;
import com.xxx.mq.rabbit.constants.MqConstants;
import com.xxx.mq.rabbit.dto.RabbitCorrelationData;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

@Component
@ConditionalOnBean(name = "rabbitTemplate")
@AllArgsConstructor
@Slf4j
public class RabbitMqService implements MqSendModel {

    private final RabbitTemplate rabbitTemplate;
    private final RedisTemplate redisTemplate;
    private final RabbitAdmin rabbitAdmin;



    @Override
    public boolean sendMessage(MessageModel messageModel, Integer delaySeconds) {
        if (!(messageModel instanceof RabbitCorrelationData)) {
            throw new IllegalArgumentException("RabbitMQ发送失败,期望参数类型为:RabbitCorrelationData , 实际参数类型为:" + messageModel.getClass());
        }
        RabbitCorrelationData rabbitCorrelationData = (RabbitCorrelationData) messageModel;
        rabbitCorrelationData.setDelay(delaySeconds != null);
        rabbitCorrelationData.setDelayTime(delaySeconds);
        //redis 存储消息元数据
        int time = delaySeconds == null ? 0 : delaySeconds;
        redisTemplate.opsForValue().set(rabbitCorrelationData.getId(), JSON.toJSONString(rabbitCorrelationData), 10 + ((time / 60) > 0 ? time / 60 : 1), TimeUnit.MINUTES);
        //发送MQ消息
        sendMessage(rabbitCorrelationData, delaySeconds);
        return true;
    }

    private void sendMessage(RabbitCorrelationData rabbitCorrelationData, Integer delaySeconds) {
        byte[] messageBody = null;
        if (rabbitCorrelationData.getMessage() instanceof String) {
            messageBody = JSON.toJSONString(rabbitCorrelationData.getMessage()).getBytes(StandardCharsets.UTF_8);
        } else {
            messageBody = JSON.toJSONString(rabbitCorrelationData).getBytes(StandardCharsets.UTF_8);
        }
        // .withBody(messageBody) 部分是 真正传递到消费者的
        // send参数中的rabbitCorrelationData只会在回调中使用,不会到消费者
        Message message = MessageBuilder
                .withBody(messageBody).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setHeader("correlation_id", rabbitCorrelationData.getId())
                .setHeader("retry_count", rabbitCorrelationData.getRetryCount())
                .setHeader("is_delay", rabbitCorrelationData.isDelay())
                .build();
        if (delaySeconds != null) {
            message.getMessageProperties()
                    .setDelay(delaySeconds * 1000);
        }
        exchangeAndQueue(rabbitCorrelationData, delaySeconds != null);
        rabbitTemplate.send(rabbitCorrelationData.getExchangeName(), rabbitCorrelationData.getRoutingKey(), message, rabbitCorrelationData);
    }

    private void exchangeAndQueue(RabbitCorrelationData rabbitCorrelationData, boolean isDelay) {
        Exchange exchange = null;
        // 1. 处理交换机
        //交换机是否存在
        boolean exchangeExists = isExchangeExists(rabbitCorrelationData.getExchangeName());
        if (!exchangeExists) {
            //是否创建队列
            exchange = createExchange(rabbitCorrelationData.getExchangeName(), rabbitCorrelationData.getExchangeType(), isDelay);
            rabbitAdmin.declareExchange(exchange);
            log.info("完成创建交换机:{} 类型 :{}", exchange.getName(), exchange.getType());
        }
        // 2. 处理队列
        String queueName = generateQueueName(rabbitCorrelationData.getExchangeName(), rabbitCorrelationData.getRoutingKey());
        boolean queueExists = isQueueExists(queueName);
        Queue queue = null;
        if (!queueExists) {
            queue = new Queue(queueName, true, false, false);
            rabbitAdmin.declareQueue(queue);
            log.info("完成创建队列:{}", queue.getName());
        }
        Binding binding = null;

        // 3. 确保绑定存在(直接声明,利用异常处理)
        ensureBindingExists(rabbitCorrelationData.getExchangeName(), queueName, rabbitCorrelationData.getRoutingKey());

    }

    private void ensureBindingExists(String exchangeName, String queueName, String routingKey) {
        try {
            Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
            rabbitAdmin.declareBinding(binding);
            log.info("绑定已就绪: {}->{}[{}]", exchangeName, queueName, routingKey);
        } catch (Exception e) {
            // 绑定已存在的异常可以安全忽略
            if (e.getMessage().contains("already exists") ||
                    (e.getCause() != null && e.getCause().getMessage().contains("no exchange"))) {
                log.debug("绑定已存在: {}->{}[{}]", exchangeName, queueName, routingKey);
            } else {
                log.warn("绑定声明异常(可能已存在): {}->{}[{}]", exchangeName, queueName, routingKey, e);
            }
        }
    }


    private Exchange createExchange(String exchangeName, MqConstants.ExchangeType exchangeType, Boolean isDelay) {
        if (isDelay) {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", exchangeType.getName()); // 指定交换机类型(如 direct、topic 等)
            return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
        }
        switch (exchangeType) {
            case FANOUT_EXCHANGE:
                return new FanoutExchange(exchangeName, true, false);
            case DIRECT_EXCHANGE:
                return new DirectExchange(exchangeName, true, false);
            case TOPIC_EXCHANGE:
                return new TopicExchange(exchangeName, true, false);
            case HEADERS_EXCHANGE:
//                return new HeadersExchange();
                throw new ServiceException("暂不支持该类型:" + MqConstants.ExchangeType.HEADERS_EXCHANGE.getName() + "交换机创建!");
            default:
                throw new ServiceException("暂不支持该类型:" + MqConstants.ExchangeType.HEADERS_EXCHANGE.getName() + "交换机创建!");
        }

    }

    public String generateQueueName(String exchangeName, String routingKey) {
        return exchangeName + StringPool.DOT + routingKey + StringPool.DOT + "queue";
    }

    private boolean isQueueExists(String queueName) {
        Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
        return queueProperties != null;
    }

    private boolean isExchangeExists(String exchangeName) {
        try {
            // 关键:通过 RabbitTemplate.execute() 获取 Spring 管理的 Channel
            // 被动声明交换机:仅查询,不创建
            // 无异常 → 交换机存在
            // 捕获 Channel 操作中的异常(如 404)
            // 交换机不存在
            // 其他异常向上抛出(被外层 try-catch 捕获)
            return Boolean.TRUE.equals(rabbitTemplate.execute(channel -> {
                try {
                    // 被动声明交换机:仅查询,不创建
                    channel.exchangeDeclarePassive(exchangeName);
                    // 无异常 → 交换机存在
                    return true;
                } catch (IOException e) {
                    // 捕获 Channel 操作中的异常(如 404)
                    if (e.getCause().getMessage().contains("404")) {
                        return false; // 交换机不存在
                    }
                    // 其他异常向上抛出(被外层 try-catch 捕获)
                    throw e;
                }
            }));
        } catch (Exception e) {
            // 处理整体异常(如连接超时、权限不足)
            if (e.getCause() instanceof IOException && e.getCause().getMessage().contains("404")) {
                return false;
            }
            throw new AmqpException("查询交换机失败:" + e.getMessage(), e);
        }
    }

}

在消息发送时判断了交换机、队列、路由是否存在,不存在则创建。

八、最后,为了在其他模块引入该模块这些类能正确被Spring扫描到,需要在org.springframework.boot.autoconfigure.AutoConfiguration.imports中加入配置

1
2
3
com.xxx.mq.rabbit.config.RabbitMqProperties
com.xxx.mq.rabbit.config.RabbitConfig
com.xxx.mq.rabbit.service.RabbitMqService

九、测试

  1. 在业务模块中引入该模块包依赖

  2. 配置相关信息

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    spring:
      rabbitmq:
          host: localhost
          port: 5672
          username: guest
          password: guest
          virtual-host: /share
          publisher-confirm-type: CORRELATED
          publisher-returns: true
          listener:
            simple:
              cknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
              prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发
    
    # 使用rabbitMQ
    share:
      rabbit:
        active: true   
    
  3. 发送消息

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    @RestController
    public class MqController {
    
    
        @Autowired
        private RabbitMqService rabbitMqService;
    
        @Operation(summary = "发送延迟消息")
        @GetMapping("/sendDelay")
        public AjaxResult sendDelay(String msg) {
            RabbitCorrelationData data = new RabbitCorrelationData();
            data.setMessage(msg);
            data.setExchangeName("test-delay-exchange");
            data.setRoutingKey("test");
            data.setId(UUID.randomUUID().toString());
            rabbitMqService.sendMessage( data, 15);
            return success();
    
        }
    
        @Operation(summary = "发送确认消息")
        @GetMapping("/send")
        public AjaxResult send(String msg) {
            RabbitCorrelationData data = new RabbitCorrelationData();
            data.setMessage(msg);
            data.setExchangeName("test-exchange");
            data.setRoutingKey("test");
            data.setId(UUID.randomUUID().toString());
            rabbitMqService.sendMessage( data, null);
            return success();
        }
    
    }
    
  4. 监听消息

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    
    @Component
    @Slf4j
    public class Test {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @RabbitListener(queues = "test-delay-exchange.test.queue")
        public void testListener(String msg, Message message, Channel channel) throws IOException {
            //接收消息,消费者端判断是否需要做幂等性处理
            //如果业务保证幂等性,基于redis setnx保证
            String key = "mq:" + msg;
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 200, TimeUnit.SECONDS);
            if (!flag) {
                //说明该业务数据已经被执行
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
    
            // 执行业务
            //  TODO
            log.info("接受到消息:{},主体为{},--{}" , msg , message.toString(),message.getMessageProperties().getDeliveryTag());
    
    
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    	 @RabbitListener(queues = "test-exchange.test.queue")
        public void testListener(String msg, Message message, Channel channel) throws IOException {
            //接收消息,消费者端判断是否需要做幂等性处理
            //如果业务保证幂等性,基于redis setnx保证
            String key = "mq:" + msg;
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 200, TimeUnit.SECONDS);
            if (!flag) {
                //说明该业务数据已经被执行
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
    
            // 执行业务
            //  TODO
            log.info("接受到消息:{},主体为{},--{}" , msg , message.toString(),message.getMessageProperties().getDeliveryTag());
    
    
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    }
    

    打印延迟消息发送日志,23:01:23.981发送,23:01:38.984接受成功,证明成功了

    1
    2
    3
    4
    5
    6
    
    23:01:23.973 [http-nio-9212-exec-1] INFO  c.s.m.r.s.RabbitMqService - [ensureBindingExists,114] - 绑定已就绪: test-exchange->test-exchange.test.queue[test]
    23:01:23.980 [connectionFactory1] INFO  c.s.m.r.c.RabbitConfig - [lambda$rabbitTemplate$1,96] - 延迟消息[3343986a-662e-41ae-8a0f-9dbbcc2793d2]暂存中,忽略NO_ROUTE(交换机: test-exchange, 路由键: test)
    23:01:23.981 [connectionFactory1] INFO  c.s.m.r.c.RabbitConfig - [lambda$rabbitTemplate$0,72] - 消息[3343986a-662e-41ae-8a0f-9dbbcc2793d2]发送成功(交换机: test-exchange)
    23:01:38.984 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] WARN  o.s.a.s.c.Jackson2JsonMessageConverter - [fromMessage,325] - Could not convert incoming message with content-type [text/plain], 'json' keyword missing.
    23:01:38.997 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  c.s.o.r.l.Test - [testListener,37] - 接受到消息:"正在进行测试延迟消息的发送32",主体为(Body:'"正在进行测试延迟消息的发送32"' MessageProperties [headers={spring_listener_return_correlation=bf33998d-639a-450e-a258-d59de7c19e6c, spring_returned_message_correlation=3343986a-662e-41ae-8a0f-9dbbcc2793d2, retry_count=0, correlation_id=3343986a-662e-41ae-8a0f-9dbbcc2793d2, is_delay=true}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test-exchange, receivedRoutingKey=test, receivedDelay=15000, deliveryTag=1, consumerTag=amq.ctag--7dyhSB05z_2PmUs7A6hbw, consumerQueue=test-exchange.test.queue]),--1
    23:01:39.001 [AMQP Connection 192.168.23.100:5672] ERROR o.s.a.r.c.CachingConnectionFactory - [log,742] - Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)