Browse Source

产线新增MQTT通信

zhangxin 1 tháng trước cách đây
mục cha
commit
70d323c7c5

+ 0 - 18
pom.xml

@@ -19,24 +19,6 @@
         <module>prod-line-bas</module>
     </modules>
 
-<!--<dependencies>
-    <dependency>
-        <groupId>cn.hutool</groupId>
-        <artifactId>hutool-all</artifactId>
-        <version>5.8.18</version>
-    </dependency>
-    <dependency>
-        <groupId>org.apache.poi</groupId>
-        <artifactId>poi-ooxml</artifactId>
-        <version>5.0.0</version>
-    </dependency>
-    &lt;!&ndash; https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml-lite &ndash;&gt;
-    <dependency>
-        <groupId>org.apache.poi</groupId>
-        <artifactId>poi-ooxml-lite</artifactId>
-        <version>5.0.0</version>
-    </dependency>
-</dependencies>-->
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>

+ 84 - 0
prod-line-admin/src/main/java/com/huaxia/MqttConsumerConfig.java

@@ -0,0 +1,84 @@
+package com.huaxia;
+
+import com.huaxia.imes.mqtt.MqttConsumerCallBack;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author zx
+ * @since 2024年11月13日
+ * MQTT 消费端配置
+ */
+@Configuration
+@Slf4j
+public class MqttConsumerConfig {
+
+    @Value("${mqtt.url}")
+    private String url;
+    @Value("${mqtt.username}")
+    private String username;
+    @Value("${mqtt.password}")
+    private String password;
+    @Value("${mqtt.client.id}")
+    private String clientId;
+    @Value("${mqtt.client.topic.default}")
+    private String topic;
+
+    /**
+     * mqtt 客户端
+     */
+    private MqttClient mqttClient;
+
+
+    /**
+     * 在bean初始化后连接mqtt服务器
+     */
+    @PostConstruct
+    public void init(){
+        connect();
+    }
+
+
+    /**
+     * 连接mqtt服务器
+     */
+    private void connect() {
+        try {
+            // 创建客户端
+            mqttClient=new MqttClient(url,clientId,new MemoryPersistence());
+            //连接参数设置
+            MqttConnectOptions options=new MqttConnectOptions();
+            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
+            options.setCleanSession(false);
+            //设置连接用户名和密码
+            options.setUserName(username);
+            options.setPassword(password.toCharArray());
+            //设置超时时间单位为秒
+            options.setConnectionTimeout(30);
+            //设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
+            options.setKeepAliveInterval(60);
+            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外中断,服务器将发布客户端的遗嘱信息
+            options.setWill(topic, (clientId+"与服务器断开连接").getBytes(), 0, false);
+            //设置回调
+            mqttClient.setCallback(new MqttConsumerCallBack());
+            // 连接服务器
+            mqttClient.connect(options);
+            // 订阅主题
+            //消息等级
+            int[] qos={1,1,2};
+            //主题
+            final String topics[] = {topic, topic + "/+"/*, TestTopic + "/r1"*/, topic + "/r/+"/*, TestTopic + "/r2"*/};
+            //订阅主题
+            mqttClient.subscribe(topics,qos);
+        }catch (Exception e){
+           log.error("连接异常{}",e.getMessage());
+        }
+
+    }
+}

+ 13 - 0
prod-line-imes/pom.xml

@@ -32,6 +32,19 @@
             <version>${ruoyi.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.2</version>
+        </dependency>
     </dependencies>
 
+
+    <repositories>
+        <repository>
+            <id>Eclipse Paho Repository</id>
+            <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+        </repository>
+    </repositories>
+
 </project>

+ 103 - 0
prod-line-imes/src/main/java/com/huaxia/imes/mqtt/MqttConsumer.java

@@ -0,0 +1,103 @@
+package com.huaxia.imes.mqtt;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+/**
+ * @author zx
+ * @since 2024年11月11日
+ * MQTT 消费者
+ */
+@Slf4j
+public class MqttConsumer {
+
+
+    public static void main(String[] args) {
+
+        String borketUrl = "tcp://117.72.113.54:1883";
+        String clientId = "demo_client"+ System.currentTimeMillis();
+        String subTopic = "TestTopic";
+        String userName = "admin";
+        String password = "public";
+        // MQTT 连接选项
+        MqttConnectOptions mqtt = buildMqtt(userName, password);
+        // 内存存储
+        MemoryPersistence persistence = new MemoryPersistence();
+        try {
+            // 创建客户端
+            MqttClient client = new MqttClient(borketUrl, clientId, persistence);
+            client.setTimeToWait(5000L);
+            client.setCallback(new MqttCallbackExtended() {
+                // 连接成功
+                @Override
+                public void connectComplete(boolean b, String borketUrl) {
+                    log.info("连接成功,等待消费{},{}",b,borketUrl);
+                    try {
+                        final String topicFilter[] = {subTopic, subTopic + "/+"/*, firstTopic + "/r1"*/, subTopic + "/r/+"/*, firstTopic + "/r2"*/};
+                        final int[] qos = {1, 1, 2};
+                        client.subscribe(topicFilter, qos);
+                    } catch (MqttException e) {
+                        log.error("连接断开异常{}", e.getMessage());
+                    }
+                }
+                // 连接断开
+                @Override
+                public void connectionLost(Throwable throwable) {
+                    try {
+                        log.info("连接断开,正在重连>>>>>>");
+                        Thread.sleep(5000);
+                        client.connect(mqtt);
+                        log.info("重连成功,等待消费");
+                    } catch (Exception e) {
+                        log.error("连接断开异常{}", e.getMessage());
+                    }
+
+                }
+
+                // 消息到达
+                @Override
+                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+                    log.info("开始消费:{},{}", s,new String(mqttMessage.getPayload()));
+                }
+
+                // 发布完成
+                @Override
+                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+                    log.info("发布完成{}", iMqttDeliveryToken.getClient());
+
+                }
+            });
+            // 连接
+            client.connect(mqtt);
+        }catch (Exception e){
+          log.error("连接异常{}",e.getMessage());
+        }
+
+
+    }
+
+    /**
+     * 构建Mqtt连接参数
+     *
+     * @param userName
+     * @param password
+     * @return
+     */
+    private static MqttConnectOptions buildMqtt(String userName, String password) {
+        //设置参数
+        MqttConnectOptions options = new MqttConnectOptions();
+        // 设置是否清空 session
+        options.setCleanSession(true);
+        //心跳间隔
+        options.setKeepAliveInterval(60);
+        // 设置连接密码
+        options.setPassword(password.toCharArray());
+        options.setUserName(userName);
+        //是否自动重连
+        options.setAutomaticReconnect(true);
+        // 最大挂起消息条数
+        options.setMaxInflight(10000);
+        return options;
+    }
+}

+ 58 - 0
prod-line-imes/src/main/java/com/huaxia/imes/mqtt/MqttConsumerCallBack.java

@@ -0,0 +1,58 @@
+package com.huaxia.imes.mqtt;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+import java.util.Arrays;
+
+import static com.sun.deploy.util.ArrayUtil.arrayToString;
+
+/**
+ * @author zx
+ * @since 2024年11月13日
+ * @Description 消费者回调
+ */
+@Slf4j
+public class MqttConsumerCallBack implements MqttCallback {
+
+    /**
+     * 连接丢失
+     * @param throwable
+     */
+    @Override
+    public void connectionLost(Throwable throwable) {
+
+        log.error("连接断开异常{}",throwable.getMessage());
+
+    }
+
+    /**
+     * 消息到达
+     * @param s
+     * @param mqttMessage
+     * @throws Exception
+     */
+    @Override
+    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+         log.info("收到消息主题信息:{}",s);
+         log.info("收到消息内容:{}",new String(mqttMessage.getPayload()));
+         log.info("收到消息质量:{}",mqttMessage.getQos());
+         log.info("收到消息是否重复:{}",mqttMessage.isDuplicate());
+         log.info("收到消息是否已发布:{}",mqttMessage.isRetained());
+
+    }
+
+    /**
+     * 消息发布成功的回馈
+     * @param iMqttDeliveryToken
+     */
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+        log.info("发布完成客户端信息{}", iMqttDeliveryToken.getClient());
+        log.info("发布完成主题信息{}", arrayToString(iMqttDeliveryToken.getTopics()));
+        log.info("发布完成消息id{}", iMqttDeliveryToken.getMessageId());
+
+    }
+}

+ 125 - 0
prod-line-imes/src/main/java/com/huaxia/imes/mqtt/MqttProducer.java

@@ -0,0 +1,125 @@
+package com.huaxia.imes.mqtt;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+/**
+ * @author zx
+ * @since 2024年11月11日
+ * MQTT 消息生产者
+ */
+@Slf4j
+public class MqttProducer {
+
+    public static void main(String[] args) {
+        String borketUrl = "tcp://117.72.113.54:1883";
+        String clientId = "demo_client"+System.currentTimeMillis();
+        String subTopic = "TestTopic";
+        int Qos = 1;
+        String userName = "admin";
+        String password = "public";
+        // MQTT 连接选项设置连接参数
+        MqttConnectOptions  options = buildMqttConnectOptions(userName, password);
+        // 内存存储
+        MemoryPersistence persistence = new MemoryPersistence();
+        try {
+            // 创建客户端
+            MqttClient client = new MqttClient(borketUrl, clientId, persistence);
+            // 设置超时时间
+            client.setTimeToWait(5000);
+            // 设置回调
+            client.setCallback(new MqttCallbackExtended() {
+                // 连接断开
+                @Override
+                public void connectionLost(Throwable throwable) {
+                    log.error("连接断开");
+                    try {
+                        log.info("连接断开,正在重连>>>>>>");
+                        Thread.sleep(5000);
+                        client.connect(options);
+                        log.info("重连成功");
+                    } catch (Exception e) {
+                        log.error("连接断开异常{}", e.getMessage());
+                    }
+                }
+
+                // 消息到达
+                @Override
+                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+                    log.info("发送消息内容:" + new String(mqttMessage.getPayload()));
+                    log.info("消息主题:" + s);
+                    log.info("消息质量:" + mqttMessage.getQos());
+                    log.info("消息是否重复:" + mqttMessage.isDuplicate());
+                    log.info("消息是否已发布:" + mqttMessage.isRetained());
+                }
+
+                // 发布完成
+                @Override
+                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+                    log.info("发布完成客户端信息:{}",iMqttDeliveryToken.getClient().getClientId());
+                }
+                // 连接完成
+                @Override
+                public void connectComplete(boolean b, String s) {
+                    log.info("连接完成{}",s);
+                    log.info("b{}",b);
+                }
+            });
+            // 连接
+            client.connect(options);
+            // 订阅主题
+            client.subscribe(subTopic);
+            //循环发送消息
+            boolean keepSending = true;
+            int i = 0;
+            while (keepSending) {
+                // 控制发送次数
+                i++;
+                // 发布消息
+                MqttMessage message = new MqttMessage(("第" + i + "只杨羊").getBytes());
+                // 设置消息质量
+                message.setQos(Qos);
+                // 发布消息
+                client.publish(subTopic, message);
+                // 控制发送频率
+                Thread.sleep(1000); // 每秒发送一次
+                if (i >= 1000) { // 发送 10 次后停止
+                    keepSending = false;
+                }
+            }
+            // 断开连接
+            client.disconnect();
+            // 关闭客户端
+            client.close();
+
+        } catch (Exception e) {
+            log.error("连接异常{}", e.getMessage());
+        }
+
+
+    }
+
+    /**
+     * 构建连接参数
+     *
+     * @param userName
+     * @param password
+     * @return
+     */
+    private static MqttConnectOptions  buildMqttConnectOptions(String userName, String password) {
+        MqttConnectOptions  connOpts = new MqttConnectOptions();
+        // 连接 MQTT Broker 的用户名密码
+        connOpts.setUserName(userName);
+        connOpts.setPassword(password.toCharArray());
+        // 设置是否清除会话
+        connOpts.setCleanSession(true);
+        //心跳间隔
+        connOpts.setKeepAliveInterval(60);
+        // 连接超时时间,单位为秒
+        connOpts.setConnectionTimeout(30);
+        // 是否自动重连
+        connOpts.setAutomaticReconnect(true);
+        return connOpts;
+    }
+}