|
@@ -1,103 +0,0 @@
|
|
|
-package com.huaxia.imes.mqtt;
|
|
|
-
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.paho.client.mqttv3.*;
|
|
|
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
-
|
|
|
-
|
|
|
- * @author zx
|
|
|
- * @since 2024年11月11日
|
|
|
- * MQTT 消费者
|
|
|
- */
|
|
|
-@Slf4j
|
|
|
-public class MqttConsumer {
|
|
|
-
|
|
|
-
|
|
|
- public static void main(String[] args) {
|
|
|
-
|
|
|
- String borketUrl = "tcp://117.72.113.54:1883";
|
|
|
- String clientId = "demo_client"+ System.currentTimeMillis();
|
|
|
- String subTopic = "TestTopic";
|
|
|
- String userName = "admin";
|
|
|
- String password = "public";
|
|
|
-
|
|
|
- MqttConnectOptions mqtt = buildMqtt(userName, password);
|
|
|
-
|
|
|
- MemoryPersistence persistence = new MemoryPersistence();
|
|
|
- try {
|
|
|
-
|
|
|
- MqttClient client = new MqttClient(borketUrl, clientId, persistence);
|
|
|
- client.setTimeToWait(5000L);
|
|
|
- client.setCallback(new MqttCallbackExtended() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void connectComplete(boolean b, String borketUrl) {
|
|
|
- log.info("连接成功,等待消费{},{}",b,borketUrl);
|
|
|
- try {
|
|
|
- final String topicFilter[] = {subTopic, subTopic + "/+", subTopic + "/r/+"};
|
|
|
- final int[] qos = {1, 1, 2};
|
|
|
- client.subscribe(topicFilter, qos);
|
|
|
- } catch (MqttException e) {
|
|
|
- log.error("连接断开异常{}", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void connectionLost(Throwable throwable) {
|
|
|
- try {
|
|
|
- log.info("连接断开,正在重连>>>>>>");
|
|
|
- Thread.sleep(5000);
|
|
|
- client.connect(mqtt);
|
|
|
- log.info("重连成功,等待消费");
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("连接断开异常{}", e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @Override
|
|
|
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
|
- log.info("开始消费:{},{}", s,new String(mqttMessage.getPayload()));
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @Override
|
|
|
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
|
- log.info("发布完成{}", iMqttDeliveryToken.getClient());
|
|
|
-
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- client.connect(mqtt);
|
|
|
- }catch (Exception e){
|
|
|
- log.error("连接异常{}",e.getMessage());
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- * 构建Mqtt连接参数
|
|
|
- *
|
|
|
- * @param userName
|
|
|
- * @param password
|
|
|
- * @return
|
|
|
- */
|
|
|
- private static MqttConnectOptions buildMqtt(String userName, String password) {
|
|
|
-
|
|
|
- MqttConnectOptions options = new MqttConnectOptions();
|
|
|
-
|
|
|
- options.setCleanSession(true);
|
|
|
-
|
|
|
- options.setKeepAliveInterval(60);
|
|
|
-
|
|
|
- options.setPassword(password.toCharArray());
|
|
|
- options.setUserName(userName);
|
|
|
-
|
|
|
- options.setAutomaticReconnect(true);
|
|
|
-
|
|
|
- options.setMaxInflight(10000);
|
|
|
- return options;
|
|
|
- }
|
|
|
-}
|