先将配置类和常量类创建好
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {
private String clientId;
private String username;
private String password;
private String serverURI;
private int keepAliveInterval;
private int connectionTimeout;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class EmqxConstants {
/** 充电宝插入,柜机发布Topic消息, 服务器监听消息 */
public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";
/** 用户扫码,服务器发布Topic消息 柜机监听消息 */
public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";
/** 充电宝弹出,柜机发布Topic消息,服务器监听消息 */
public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";
/** 柜机属性上报,服务器监听消息 */
public final static String TOPIC_PROPERTY_POST = "/sys/property/post";
}
|
这里相关类的UML类图

接下来针对逐个类进行分析
首先是负责连接和发送emqx的EmqxClientWrapper , 这里比较重要的是MqttCallback mqttCallback 回调对象,这个是我们后面实现对topic动态的监听关键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
@Component
@Slf4j
public class EmqxClientWrapper {
@Autowired
private EmqxProperties emqxProperties;
@Autowired
private MqttCallback mqttCallback;
private MqttClient client;
@PostConstruct
private void init() {
MqttClientPersistence mqttClientPersistence = new MemoryPersistence();
try {
//新建客户端 参数:MQTT服务的地址,客户端名称,持久化
client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);
// 设置回调
client.setCallback(mqttCallback);
// 建立连接
connect();
} catch (MqttException e) {
log.info("MqttClient创建失败");
throw new RuntimeException(e);
}
}
private void connect() {
try {
MqttConnectOptions options = mqttConnectOptions();
client.connect(options);
log.info("连接成功。。。。。。。");
//订阅topic
client.subscribe(new String[]{EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST});
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
private MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setPassword(emqxProperties.getPassword().toCharArray());
mqttConnectOptions.setUserName(emqxProperties.getUsername());
mqttConnectOptions.setAutomaticReconnect(true);//是否自动重新连接
mqttConnectOptions.setCleanSession(true);//是否清除之前的连接信息
mqttConnectOptions.setConnectionTimeout(emqxProperties.getConnectionTimeout());//连接超时时间
mqttConnectOptions.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳
return mqttConnectOptions;
}
public void publish(String topic, String data) {
MqttMessage mqttMessage = new MqttMessage(data.getBytes());
mqttMessage.setQos(2);
try {
client.publish(topic, mqttMessage);
} catch (MqttException e) {
log.error("发送消息失败,topic:" + topic);
throw new RuntimeException(e);
}
}
}
|
假设我们有两个业务需求需要监听不同的topic
业务A
1
2
3
4
5
6
7
8
|
@Component
@Slf4j
public class PowerBankConnectedHandler {
@Override
public void handleMessage(JSONObject message) {
log.info("handleMessage: {}", message.toJSONString());
}
}
|
业务B
1
2
3
4
5
6
7
8
|
@Component
@Slf4j
public class PropertyPostHandler{
@Override
public void handleMessage(JSONObject message) {
log.info("handleMessage: {}", message.toJSONString());
}
}
|
正常来说,我们会去创建MqttCallback实现类,然后注入业务A 和 业务 B对象,最后根据具体的topic判断调用相应业务类方法。
but ….,这样就会在后续我们要加入业务C,D,E······时要不断的在MqttCallback实现类上加入对象和相应的判断。
这样真是灾难, so 这里我们引入一个设计模式,借用工厂来创建好对象,然后MqttCallback实现类中去工厂类里面去拿对象,MqttCallback实现类中无需自行注入对象
首先我们创建一个自定义注解(用于标记topic)和一个接口用于规范所有要使用emqx的业务
1
2
3
4
5
6
7
8
|
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyEmqx {
String topic();
}
|
1
2
3
4
5
6
7
8
9
|
public interface MassageHandler {
/**
* 策略接口
* @param message
*/
void handleMessage(JSONObject message);
}
|
然后有进化版的业务A、B,这样我们在工厂里利用多态的特性来实现MassageHandler实现类和topic的绑定
1
2
3
4
5
6
7
8
9
|
@Component
@Slf4j
@MyEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {
@Override
public void handleMessage(JSONObject message) {
log.info("handleMessage: {}", message.toJSONString());
}
}
|
1
2
3
4
5
6
7
8
9
|
@Component
@Slf4j
@MyEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PropertyPostHandler implements MassageHandler {
@Override
public void handleMessage(JSONObject message) {
log.info("handleMessage: {}", message.toJSONString());
}
}
|
重头戏来了,他就是我们的工厂类,考虑到后续可能存在不同的工厂(创建逻辑不一样),所以可以先暂时将工厂接口化
1
2
3
4
5
|
public interface MessageHandlerFactory {
MassageHandler getMassageHandler(String topic);
}
|
具体的工厂实现类,该类实现了MessageHandlerFactory, ApplicationContextAware接口,ApplicationContextAware主要用于从spring容器中获取对象,然后在获取到相应对象上的注解属性实现动态绑定topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {
Map<String, MassageHandler> massageHandlerBeanMap = new HashMap<>();
@Override
public MassageHandler getMassageHandler(String topic) {
return massageHandlerBeanMap.get(topic);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, MassageHandler> beans = applicationContext.getBeansOfType(MassageHandler.class);
for (MassageHandler massageHandler : beans.values()) {
// 从 massageHandler 的类上查找 @MyEmqx 注解,并获取第一个找到的注解实例。
MyEmqx myEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), MyEmqx.class).iterator().next();
if (myEmqx != null){
String topic = myEmqx.topic();
massageHandlerBeanMap.put(topic, massageHandler);
}
}
}
}
|
最后我们实现MqttCallback接口来实现消息的监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@Component
@Slf4j
@AllArgsConstructor
public class OnMessageCallback implements MqttCallback {
private final MessageHandlerFactory messageHandlerFactoryImpl;
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,一般在这里面进行重连
log.warn("连接断开,可以做重连" + throwable);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
// subscribe后得到的消息会执行到这里面
log.info("接收消息主题:" + topic);
log.info("接收消息Qos:" + message.getQos());
log.info("接收消息内容:" + new String(message.getPayload()));
//根据topic获取相应的消息处理类
MassageHandler massageHandler = messageHandlerFactoryImpl.getMassageHandler(topic);
if (null != massageHandler) {
String content = new String(message.getPayload());
massageHandler.handleMessage(JSONObject.parseObject(content));
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("deliveryComplete---------" + token.isComplete());
}
}
|
这里我们可以看到我通过 MassageHandler massageHandler = messageHandlerFactoryImpl.getMassageHandler(topic);来获取业务类。这样在后续业务拓展的时候就只需要关注业务了,真是妙哉。