Explorar el Código

修改mqtt依赖

chejianzheng hace 4 meses
padre
commit
fc94667b63

+ 12 - 0
portal-service-common/src/main/java/com/hfln/portal/common/constant/mqtt/topic/TopicConstants.java

@@ -24,4 +24,16 @@ public interface TopicConstants {
     String TOPIC_SET_DEVICE_PARAM = "/mps/set_device_param";
 
 
+    // 订阅主题
+    String TOPIC_DAS_EVENT = "/das/event";
+    // alarm事件
+    String TOPIC_DAS_ALARM_EVENT = "/das/alarm_event";
+    // 实时点位
+    String TOPIC_DAS_REALTIME_POS = "/das/realtime_pos";
+    // 信息更新
+    String TOPIC_DAS_DEV_STATUS = "/das/dev_status";
+    // 存在事件
+    String TOPIC_DAS_ESIST = "/das/exist";
+
+
 }

+ 0 - 4
portal-service-domain/pom.xml

@@ -32,10 +32,6 @@
             <artifactId>tencentcloud-sdk-java</artifactId>
             <version>3.1.1190</version>
         </dependency>
-        <dependency>
-            <groupId>cn.hfln.framework</groupId>
-            <artifactId>mqtt-spring-boot-starter</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>

+ 6 - 0
portal-service-infrastructure/pom.xml

@@ -70,6 +70,12 @@
             <artifactId>weixin-java-mp</artifactId>
         </dependency>
 
+        <!-- Spring Integration MQTT -->
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+
 
     </dependencies>
 

+ 195 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/config/MqttConfig.java

@@ -0,0 +1,195 @@
+package com.hfln.portal.infrastructure.config;
+
+import com.hfln.portal.common.constant.mqtt.topic.TopicConstants;
+import com.hfln.portal.infrastructure.mqtt.MqttSubHandle;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+/**
+ * MQTT配置类
+ * 
+ * 统一管理所有MQTT相关配置,包括:
+ * 1. MQTT客户端工厂
+ * 2. Spring Integration入站和出站适配器
+ * 3. 消息处理器
+ * 4. 任务调度器
+ */
+@Configuration
+@Slf4j
+@ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true", matchIfMissing = true)
+public class MqttConfig {
+
+    @Value("${mqtt.broker:tcp://8.130.28.21:1883}")
+    private String serverUri;
+    
+    @Value("${mqtt.client.id:hfln-device-service}")
+    private String clientId;
+    
+    @Value("${mqtt.username:admin}")
+    private String username;
+    
+    @Value("${mqtt.password:public}")
+    private String password;
+    
+    @Value("${mqtt.connect.timeout:30}")
+    private int connectTimeout;
+    
+    @Value("${mqtt.keep.alive.interval:60}")
+    private int keepAliveInterval;
+    
+    @Value("${mqtt.clean.session:true}")
+    private boolean cleanSession;
+
+    // ===========================================
+    // 基础配置
+    // ===========================================
+
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(10);
+        scheduler.setThreadNamePrefix("mqtt-task-");
+        scheduler.setWaitForTasksToCompleteOnShutdown(true);
+        scheduler.setAwaitTerminationSeconds(60);
+        return scheduler;
+    }
+    
+    /**
+     * MQTT客户端工厂
+     */
+    @Bean
+    public MqttPahoClientFactory mqttClientFactory() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        MqttConnectOptions options = new MqttConnectOptions();
+        
+        options.setServerURIs(new String[] { serverUri });
+        
+        if (username != null && !username.isEmpty()) {
+            options.setUserName(username);
+        }
+        
+        if (password != null && !password.isEmpty()) {
+            options.setPassword(password.toCharArray());
+        }
+        
+        options.setConnectionTimeout(connectTimeout);
+        options.setKeepAliveInterval(keepAliveInterval);
+        options.setCleanSession(cleanSession);
+        
+        factory.setConnectionOptions(options);
+        
+        return factory;
+    }
+
+
+    // ===========================================
+    // 消息通道和适配器
+    // ===========================================
+
+    @Bean
+    public MessageChannel inputChannel() {
+        return new DirectChannel();
+    }
+
+    @Bean
+    public MessageProducer mpsInbound() {
+        String[] topics = {
+                TopicConstants.TOPIC_DAS_EVENT,
+                TopicConstants.TOPIC_DAS_ALARM_EVENT,
+                TopicConstants.TOPIC_DAS_DEV_STATUS,
+                TopicConstants.TOPIC_DAS_ESIST,
+                TopicConstants.TOPIC_DAS_REALTIME_POS
+        };
+        
+        MqttPahoMessageDrivenChannelAdapter adapter =
+                new MqttPahoMessageDrivenChannelAdapter(clientId + "_das", mqttClientFactory(), topics);
+        adapter.setCompletionTimeout(5000);
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(2, 2, 0, 0, 0);
+        adapter.setOutputChannel(inputChannel());
+        adapter.setTaskScheduler(taskScheduler());
+        return adapter;
+    }
+
+    @Bean
+    @ServiceActivator(inputChannel = "inputChannel")
+    public MessageHandler dasMqttMessageHandler(MqttSubHandle handler) {
+        return new MessageHandler() {
+            @Override
+            public void handleMessage(Message<?> message) throws MessagingException {
+                handler.handleMessage(message);
+            }
+        };
+    }
+
+
+
+    // ===========================================
+    // 出站配置和框架兼容
+    // ===========================================
+    
+    /**
+     * MQTT消息输入通道(框架需要)
+     */
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+    
+    /**
+     * MQTT消息输出通道
+     */
+    @Bean
+    public MessageChannel mqttOutputChannel() {
+        return new DirectChannel();
+    }
+    
+    /**
+     * MQTT消息处理器(出站)
+     */
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutputChannel")
+    public MessageHandler outbound() {
+        String clientIdWithRandom = clientId + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler = 
+                new MqttPahoMessageHandler(clientIdWithRandom + "-out", mqttClientFactory());
+        
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultQos(1);
+        
+        return messageHandler;
+    }
+    
+    /**
+     * MQTT出站处理器
+     */
+    @Bean
+    public MqttPahoMessageHandler mqttOutbound() {
+        String clientIdWithRandom = clientId + System.currentTimeMillis();
+        MqttPahoMessageHandler messageHandler = 
+                new MqttPahoMessageHandler(clientIdWithRandom + "-outbound", mqttClientFactory());
+        
+        messageHandler.setAsync(true);
+        messageHandler.setDefaultQos(1);
+        
+        return messageHandler;
+    }
+} 

+ 13 - 14
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/gateway/impl/DeviceGatewayImpl.java

@@ -1,7 +1,6 @@
 package com.hfln.portal.infrastructure.gateway.impl;
 
 import cn.hfln.framework.extension.BizException;
-import cn.hfln.framework.mqtt.template.MqttTemplate;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -20,7 +19,6 @@ import com.hfln.portal.common.dto.data.share.ShareDto;
 import com.hfln.portal.common.request.device.DeviceBandingParams;
 import com.hfln.portal.common.request.device.DeviceListParams;
 import com.hfln.portal.common.request.device.DeviceLocationParams;
-import com.hfln.portal.common.request.device.*;
 import com.hfln.portal.common.request.event.EventListParams;
 import com.hfln.portal.common.request.share.ShareConfirmParam;
 import com.hfln.portal.common.request.share.ShareParam;
@@ -30,6 +28,7 @@ import com.hfln.portal.domain.customer.OssBusiType;
 import com.hfln.portal.domain.customer.util.DevPosFixUtil;
 import com.hfln.portal.domain.exception.ErrorEnum;
 import com.hfln.portal.domain.gateway.DeviceGateway;
+import com.hfln.portal.infrastructure.mqtt.MqttClient;
 import com.hfln.portal.infrastructure.oss.OssClient;
 import com.hfln.portal.infrastructure.oss.OssUtils;
 import com.hfln.portal.infrastructure.po.*;
@@ -79,8 +78,8 @@ public class DeviceGatewayImpl implements DeviceGateway {
     @Autowired
     private UserService userService;
 
-    @Autowired(required = false)
-    private MqttTemplate mqttTemplate;
+    @Autowired
+    private MqttClient mqttClient;
 
     @Autowired
     private OssClient ossClient;
@@ -488,8 +487,8 @@ public class DeviceGatewayImpl implements DeviceGateway {
         JSONObject msg = new JSONObject();
         msg.put("dev_id", clientId);
 
-        if (mqttTemplate != null) {
-            mqttTemplate.send(topic, msg.toJSONString());
+        if (mqttClient != null) {
+            mqttClient.sendMessage(topic, msg.toJSONString());
         } else {
             log.warn("MQTT template is not available, message not sent to topic: {}", topic);
         }
@@ -500,8 +499,8 @@ public class DeviceGatewayImpl implements DeviceGateway {
 
         String topic = String.format(TopicConstants.TOPIC_DEV_REBOOT, clientId);
 
-        if (mqttTemplate != null) {
-            mqttTemplate.send(topic, "1");
+        if (mqttClient != null) {
+            mqttClient.sendMessage(topic, "1");
         } else {
             log.warn("MQTT template is not available, message not sent to topic: {}", topic);
         }
@@ -512,8 +511,8 @@ public class DeviceGatewayImpl implements DeviceGateway {
 
         String topic = String.format(TopicConstants.TOPIC_DEV_UPDATEOTA, clientId);
 
-        if (mqttTemplate != null) {
-            mqttTemplate.send(topic, "1");
+        if (mqttClient != null) {
+            mqttClient.sendMessage(topic, "1");
         } else {
             log.warn("MQTT template is not available, message not sent to topic: {}", topic);
         }
@@ -556,8 +555,8 @@ public class DeviceGatewayImpl implements DeviceGateway {
         JSONObject msg = new JSONObject();
         msg.put("dev_id", clientId);
 
-        if (mqttTemplate != null) {
-            mqttTemplate.send(TopicConstants.TOPIC_DEV_GET_INFO, msg.toJSONString());
+        if (mqttClient != null) {
+            mqttClient.sendMessage(TopicConstants.TOPIC_DEV_GET_INFO, msg.toJSONString());
         } else {
             log.warn("MQTT template is not available, message not sent to topic: {}", TopicConstants.TOPIC_DEV_GET_INFO);
         }
@@ -569,8 +568,8 @@ public class DeviceGatewayImpl implements DeviceGateway {
         JSONObject msg = new JSONObject();
         msg.put("dev_id", clientId);
 
-        if (mqttTemplate != null) {
-            mqttTemplate.send(TopicConstants.TOPIC_DEV_GET_PARAM, msg.toJSONString());
+        if (mqttClient != null) {
+            mqttClient.sendMessage(TopicConstants.TOPIC_DEV_GET_PARAM, msg.toJSONString());
         } else {
             log.warn("MQTT template is not available, message not sent to topic: {}", TopicConstants.TOPIC_DEV_GET_PARAM);
         }

+ 78 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttClient.java

@@ -0,0 +1,78 @@
+package com.hfln.portal.infrastructure.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+/**
+ * MQTT网关实现类 - 基于Spring Integration MQTT
+ * 合并了原MqttGatewayDefaultImpl的优秀特性
+ */
+@Component
+@Slf4j
+public class MqttClient {
+
+    @Autowired
+    private MqttPahoMessageHandler mqttOutbound;
+    
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    /**
+     * 发送消息
+     * 默认QoS 0,适合大多数业务场景
+     */
+    public void sendMessage(String topic, String payload) {
+        sendMessage(topic, payload, 0, false);
+    }
+
+    /**
+     * 发送消息
+     * 默认QoS 0,适合大多数业务场景
+     */
+    public void sendToMqtt(String topic, Object payload) {
+        sendToMqtt(topic, payload, 0, false);
+    }
+    
+    /**
+     * 发送消息 - 支持动态QoS
+     */
+    public void sendToMqtt(String topic, Object payload, int qos, boolean retain) {
+        try {
+            String json = objectMapper.writeValueAsString(payload);
+            sendMessage(topic, json, qos, retain);
+            log.debug("MQTT message sent to topic: {}", topic);
+        } catch (Exception e) {
+            log.error("Error sending MQTT message to topic: {}", topic, e);
+        }
+    }
+
+    /**
+     * 发送消息到MQTT服务器 - 增强版
+     * 支持动态QoS和消息保留设置
+     * @param topic 主题
+     * @param payload 负载
+     * @param qos 质量等级
+     * @param retain 是否保留
+     */
+    public void sendMessage(String topic, String payload, int qos, boolean retain) {
+        try {
+            Message<String> message = MessageBuilder
+                    .withPayload(payload)
+                    .setHeader(MqttHeaders.TOPIC, topic)
+                    .setHeader(MqttHeaders.QOS, qos)
+                    .setHeader(MqttHeaders.RETAINED, retain)
+                    .build();
+
+            mqttOutbound.handleMessage(message);
+            log.trace("MQTT message sent to topic: {}", topic);
+        } catch (Exception e) {
+            log.error("Failed to send MQTT message to topic: {}, error: {}", topic, e.getMessage());
+        }
+    }
+} 

+ 88 - 17
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttSubHandle.java

@@ -1,7 +1,5 @@
 package com.hfln.portal.infrastructure.mqtt;
 
-import cn.hfln.framework.mqtt.annotation.MqttSubscribe;
-import cn.hfln.framework.mqtt.template.MqttTemplate;
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
@@ -14,6 +12,7 @@ import com.hfln.portal.infrastructure.service.*;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
 import org.springframework.stereotype.Component;
 
 import java.math.BigDecimal;
@@ -29,8 +28,8 @@ public class MqttSubHandle {
     private static final int QOS = 1;                  // MQTT服务质量等级
     private static final boolean RETAIN = true;        // MQTT保留消息标志
 
-    @Autowired(required = false)
-    private MqttTemplate mqttTemplate; // MQTT消息发布器
+    @Autowired
+    private MqttClient mqttClient; // MQTT消息发布器
 
     @Autowired
     private DevInfoService devInfoService;
@@ -53,8 +52,66 @@ public class MqttSubHandle {
     @Autowired
     private WxOfficeAccountClient wxOfficeAccountClient;
 
-    @MqttSubscribe(topic = "/das/event", qos = 2)
-    public void subDasEvent(String payload) {
+
+
+    /**
+     * MQTT消息统一入口处理方法
+     *
+     * 业务流程:
+     * 1. 从消息头提取MQTT主题和负载
+     * 2. 从主题路径中提取操作类型(action)
+     * 3. 根据操作类型路由到具体的处理方法
+     * 4. 异常情况统一捕获和日志记录
+     *
+     * @param message Spring Integration封装的MQTT消息对象
+     */
+    public void handleMessage(Message<?> message) {
+        try {
+            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+            String payload = message.getPayload().toString();
+
+            if (topic == null) {
+                log.warn("Received message without topic header");
+                return;
+            }
+
+            log.debug("Received device message: topic={}, payload={}", topic, payload);
+
+            // 根据主题路由到不同的处理方法
+            // 提取主题的操作部分(最后一段)
+            String action = extractActionFromTopic(topic);
+            if (action != null) {
+                switch (action) {
+                    case "event":
+                        subDasEvent(topic, payload);
+                        break;
+                    case "alarm_event":
+                        subDasAlarmEvent(topic, payload);
+                        break;
+                    case "realtime_pos":
+                        subDasRealtimePos(topic, payload);
+                        break;
+                        case "dev_status":
+                        subDasDevStatus(topic, payload);
+                        break;
+                    case "exist":
+                        subDasExist(topic, payload);
+                        break;
+                    default:
+                        log.debug("Unhandled device topic action: {} for topic: {}", action, topic);
+                        break;
+                }
+            } else {
+                log.debug("Could not extract action from device topic: {}", topic);
+            }
+
+        } catch (Exception e) {
+            log.error("Error handling device message: {}", e.getMessage(), e);
+        }
+    }
+
+
+    public void subDasEvent(String topic, String payload) {
 
         JSONObject obj = JSONObject.parseObject(payload);
         String clientId = obj.getString("dev_id");
@@ -62,8 +119,8 @@ public class MqttSubHandle {
         String event = obj.getString("event");
         // 跌倒确认返回
         if (event.equals("跌倒确认")) {
-            if (mqttTemplate != null) {
-                mqttTemplate.send("/mps/fall_event/ack", clientId);
+            if (mqttClient != null) {
+                mqttClient.sendMessage("/mps/fall_event/ack", clientId);
             } else {
                 log.warn("MQTT template is not available, message not sent to topic: /mps/fall_event/ack");
             }
@@ -196,9 +253,7 @@ public class MqttSubHandle {
         }
 
 
-
-    @MqttSubscribe(topic = "/das/alarm_event", qos = 2)
-    public void subDasAlarmEvent(String payload) {
+        public void subDasAlarmEvent(String topic, String payload) {
 
         JSONObject obj = JSONObject.parseObject(payload);
         String clientId = obj.getString("dev_id");
@@ -248,8 +303,7 @@ public class MqttSubHandle {
         }
     }
 
-    @MqttSubscribe(topic = "/das/realtime_pos")
-    public void subDasRealtimePos(String payload) {
+    public void subDasRealtimePos(String topic, String payload) {
 
         JSONObject obj = JSONObject.parseObject(payload);
         String clientId = obj.getString("dev_id");
@@ -336,8 +390,7 @@ public class MqttSubHandle {
 //            PushMsgWebSocket.sendMessageTo(msg.toString(), dev_id);
     }
 
-    @MqttSubscribe(topic = "/das/dev_status")
-    public void subDasDevStatus(String payload) {
+    public void subDasDevStatus(String topic, String payload) {
 
         JSONObject obj = JSONObject.parseObject(payload);
         String clientId = obj.getString("dev_id");
@@ -416,8 +469,7 @@ public class MqttSubHandle {
         }
     }
 
-    @MqttSubscribe(topic = "/das/exist")
-    public void subDasExist(String payload) {
+    public void subDasExist(String topic, String payload) {
 
         JSONObject obj = JSONObject.parseObject(payload);
         String dev_id = obj.getString("dev_id");
@@ -431,4 +483,23 @@ public class MqttSubHandle {
         // todo websocketservice
 //        PushMsgWebSocket.sendMessageTo(msg.toString(), dev_id);
     }
+
+    /**
+     * 从MQTT主题中提取操作名称(最后一段路径)
+     *
+     * 主题格式:/dev/{device_id}/{action}
+     * 例如:/dev/123456/login 返回 "login"
+     *
+     * @param topic MQTT主题字符串
+     * @return 操作名称,如果解析失败返回null
+     */
+    private String extractActionFromTopic(String topic) {
+        if (topic != null && topic.startsWith("/dev/")) {
+            String[] parts = topic.split("/");
+            if (parts.length >= 3) {
+                return parts[parts.length - 1]; // 返回最后一段
+            }
+        }
+        return null;
+    }
 }

+ 0 - 6
portal-service-server/pom.xml

@@ -59,12 +59,6 @@
             <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>org.eclipse.paho</groupId>
-            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
-            <version>1.2.5</version>
-        </dependency>
-
         <!--        <dependency>-->
 <!--            <groupId>org.springframework.cloud</groupId>-->
 <!--            <artifactId>spring-cloud-starter-bootstrap</artifactId>-->