使用工厂模式实现连接emqx

先将配置类和常量类创建好

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {

    private String clientId;
    private String username;
    private String password;
    private String serverURI;
    private int keepAliveInterval;
    private int connectionTimeout;

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class EmqxConstants {
    /** 充电宝插入,柜机发布Topic消息, 服务器监听消息 */
    public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";

    /** 用户扫码,服务器发布Topic消息 柜机监听消息  */
    public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";

    /** 充电宝弹出,柜机发布Topic消息,服务器监听消息  */
    public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";

    /** 柜机属性上报,服务器监听消息  */
    public final static String TOPIC_PROPERTY_POST = "/sys/property/post";

}

这里相关类的UML类图

接下来针对逐个类进行分析

首先是负责连接和发送emqx的EmqxClientWrapper , 这里比较重要的是MqttCallback mqttCallback 回调对象,这个是我们后面实现对topic动态的监听关键

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Component
@Slf4j
public class EmqxClientWrapper {

    @Autowired
    private EmqxProperties emqxProperties;
    @Autowired
    private MqttCallback mqttCallback;

    private MqttClient client;

    @PostConstruct
    private void init() {
        MqttClientPersistence mqttClientPersistence = new MemoryPersistence();
        try {
            //新建客户端 参数:MQTT服务的地址,客户端名称,持久化
            client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);
            // 设置回调
            client.setCallback(mqttCallback);
            // 建立连接
            connect();


        } catch (MqttException e) {
            log.info("MqttClient创建失败");
            throw new RuntimeException(e);
        }
    }

    private void connect() {
        try {
            MqttConnectOptions options = mqttConnectOptions();
            client.connect(options);
            log.info("连接成功。。。。。。。");

            //订阅topic
            client.subscribe(new String[]{EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST});

        } catch (MqttException e) {
            throw new RuntimeException(e);
        }

    }

    private MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setPassword(emqxProperties.getPassword().toCharArray());
        mqttConnectOptions.setUserName(emqxProperties.getUsername());
        mqttConnectOptions.setAutomaticReconnect(true);//是否自动重新连接
        mqttConnectOptions.setCleanSession(true);//是否清除之前的连接信息
        mqttConnectOptions.setConnectionTimeout(emqxProperties.getConnectionTimeout());//连接超时时间
        mqttConnectOptions.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳
        return mqttConnectOptions;
    }


    public void publish(String topic, String data) {
        MqttMessage mqttMessage = new MqttMessage(data.getBytes());
        mqttMessage.setQos(2);
        try {
            client.publish(topic, mqttMessage);
        } catch (MqttException e) {
            log.error("发送消息失败,topic:" + topic);
            throw new RuntimeException(e);
        }
    }


}

假设我们有两个业务需求需要监听不同的topic

业务A

1
2
3
4
5
6
7
8
@Component
@Slf4j
public class PowerBankConnectedHandler {
    @Override
    public void handleMessage(JSONObject message) {
        log.info("handleMessage: {}", message.toJSONString());
    }
}

业务B

1
2
3
4
5
6
7
8
@Component
@Slf4j
public class PropertyPostHandler{
    @Override
    public void handleMessage(JSONObject message) {
        log.info("handleMessage: {}", message.toJSONString());
    }
}

正常来说,我们会去创建MqttCallback实现类,然后注入业务A 和 业务 B对象,最后根据具体的topic判断调用相应业务类方法。

but ….,这样就会在后续我们要加入业务C,D,E······时要不断的在MqttCallback实现类上加入对象和相应的判断。

这样真是灾难, so 这里我们引入一个设计模式,借用工厂来创建好对象,然后MqttCallback实现类中去工厂类里面去拿对象,MqttCallback实现类中无需自行注入对象

首先我们创建一个自定义注解(用于标记topic)和一个接口用于规范所有要使用emqx的业务

1
2
3
4
5
6
7
8
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyEmqx {

    String topic();

}
1
2
3
4
5
6
7
8
9
public interface MassageHandler {

    /**
     * 策略接口
     * @param message
     */
    void handleMessage(JSONObject message);

}

然后有进化版的业务A、B,这样我们在工厂里利用多态的特性来实现MassageHandler实现类和topic的绑定

1
2
3
4
5
6
7
8
9
@Component
@Slf4j
@MyEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {
    @Override
    public void handleMessage(JSONObject message) {
        log.info("handleMessage: {}", message.toJSONString());
    }
}
1
2
3
4
5
6
7
8
9
@Component
@Slf4j
@MyEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PropertyPostHandler implements MassageHandler {
    @Override
    public void handleMessage(JSONObject message) {
        log.info("handleMessage: {}", message.toJSONString());
    }
}

重头戏来了,他就是我们的工厂类,考虑到后续可能存在不同的工厂(创建逻辑不一样),所以可以先暂时将工厂接口化

1
2
3
4
5
public interface MessageHandlerFactory {

    MassageHandler getMassageHandler(String topic);

}

具体的工厂实现类,该类实现了MessageHandlerFactory, ApplicationContextAware接口,ApplicationContextAware主要用于从spring容器中获取对象,然后在获取到相应对象上的注解属性实现动态绑定topic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {

    Map<String, MassageHandler> massageHandlerBeanMap = new HashMap<>();

    @Override
    public MassageHandler getMassageHandler(String topic) {
        return massageHandlerBeanMap.get(topic);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, MassageHandler> beans = applicationContext.getBeansOfType(MassageHandler.class);
        for (MassageHandler massageHandler : beans.values()) {
            //            从 massageHandler 的类上查找 @MyEmqx 注解,并获取第一个找到的注解实例。
            MyEmqx myEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), MyEmqx.class).iterator().next();
            if (myEmqx != null){
                String topic = myEmqx.topic();
                massageHandlerBeanMap.put(topic, massageHandler);
            }
        }
    }
}

最后我们实现MqttCallback接口来实现消息的监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
@Slf4j
@AllArgsConstructor
public class OnMessageCallback implements MqttCallback {

    private final MessageHandlerFactory messageHandlerFactoryImpl;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        log.warn("连接断开,可以做重连" + throwable);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // subscribe后得到的消息会执行到这里面
        log.info("接收消息主题:" + topic);
        log.info("接收消息Qos:" + message.getQos());
        log.info("接收消息内容:" + new String(message.getPayload()));

        //根据topic获取相应的消息处理类
        MassageHandler massageHandler = messageHandlerFactoryImpl.getMassageHandler(topic);
        if (null != massageHandler) {
            String content = new String(message.getPayload());
            massageHandler.handleMessage(JSONObject.parseObject(content));
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------" + token.isComplete());
    }
}

这里我们可以看到我通过 MassageHandler massageHandler = messageHandlerFactoryImpl.getMassageHandler(topic);来获取业务类。这样在后续业务拓展的时候就只需要关注业务了,真是妙哉。