chejianzheng hai 2 meses
pai
achega
bade302979

+ 1 - 0
device-service-common/src/main/java/com/hfln/device/common/constant/mqtt/topic/MqttTopics.java

@@ -61,6 +61,7 @@ public class MqttTopics {
     public static final String DAS_STATUS = "/das/status";
     public static final String DAS_CLOUDPOINT = "/das/cloudpoint";
     public static final String DAS_REALTIME_POS = "/das/realtime_pos";
+    public static final String DAS_REALTIME_POS_02 = "/das/%s/realtime_pos";
     public static final String DAS_EVENT = "/das/event";
     public static final String DAS_EXIST_EVENT = "/das/exist";
     public static final String DAS_ALARM_EVENT = "/das/alarm_event";

+ 13 - 13
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/config/MqttConfig.java

@@ -100,10 +100,10 @@ public class MqttConfig {
         }
         
         // 优化连接稳定性
-        options.setKeepAliveInterval(30);     // 减少keep-alive间隔到30秒,更快检测连接状态
+        options.setKeepAliveInterval(60);     // 减少keep-alive间隔到30秒,更快检测连接状态
         options.setConnectionTimeout(10);     // 减少连接超时到10秒
-        options.setCleanSession(false);       // 保持会话,避免重连后丢失订阅
-        options.setMaxInflight(500);          // 适中的最大并发发布数
+        options.setCleanSession(true);       // 保持会话,避免重连后丢失订阅
+        options.setMaxInflight(1000);          // 适中的最大并发发布数
         
         // 添加重要的MQTT连接参数来缓解"Too many publishes in progress"错误
         options.setAutomaticReconnect(true);  // 启用自动重连
@@ -126,17 +126,17 @@ public class MqttConfig {
     @Bean
     public MessageProducer deviceInbound() {
         String[] topics = {
-//            MqttTopics.DEV_LOGIN,
-//            MqttTopics.DEV_KEEPALIVE,
-//            MqttTopics.DEV_DSP_DATA,
-//            MqttTopics.DEV_REP_DEV_PARAM,
-//            MqttTopics.DEV_DISCONNECT,
+            MqttTopics.DEV_LOGIN,
+            MqttTopics.DEV_KEEPALIVE,
+            MqttTopics.DEV_DSP_DATA,
+            MqttTopics.DEV_REP_DEV_PARAM,
+            MqttTopics.DEV_DISCONNECT,
 
-            MqttTopics.SHARE_DEV_LOGIN,
-            MqttTopics.SHARE_DEV_KEEPALIVE,
-            MqttTopics.SHARE_DEV_DSP_DATA,
-            MqttTopics.SHARE_DEV_REP_DEV_PARAM,
-            MqttTopics.SHARE_DEV_DISCONNECT,
+//            MqttTopics.SHARE_DEV_LOGIN,
+//            MqttTopics.SHARE_DEV_KEEPALIVE,
+//            MqttTopics.SHARE_DEV_DSP_DATA,
+//            MqttTopics.SHARE_DEV_REP_DEV_PARAM,
+//            MqttTopics.SHARE_DEV_DISCONNECT,
 //            MqttTopics.DEV_CLOUDPOINT,
 //            MqttTopics.DEV_REP_DEV_INFO,
 //            MqttTopics.DEV_REP_FALL_EVENT,

+ 90 - 65
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/gateway/MqttGatewayImpl.java

@@ -41,16 +41,16 @@ public class MqttGatewayImpl implements MqttGateway {
     @Autowired
     private ObjectMapper objectMapper;
     
-    @Autowired
-    private MqttHealthService mqttHealthService;
-    
-    @Autowired
-    private MqttMessageBufferService mqttBufferService;
-    
-    // 移除信号量控制,改为使用缓冲服务
-    // 重试配置
-    private static final int MAX_RETRY_ATTEMPTS = 3;
-    private static final long RETRY_DELAY_MS = 100;
+//    @Autowired
+//    private MqttHealthService mqttHealthService;
+//
+//    @Autowired
+//    private MqttMessageBufferService mqttBufferService;
+//
+//    // 移除信号量控制,改为使用缓冲服务
+//    // 重试配置
+//    private static final int MAX_RETRY_ATTEMPTS = 3;
+//    private static final long RETRY_DELAY_MS = 100;
 
     @Override
     public void initialize() {
@@ -713,7 +713,6 @@ public class MqttGatewayImpl implements MqttGateway {
     /**
      * 发送消息到MQTT服务器 - 增强版
      * 支持动态QoS和消息保留设置
-     * 使用缓冲服务确保数据不丢失,特别是实时点位数据
      *
      * @param topic   主题
      * @param payload 负载
@@ -722,63 +721,89 @@ public class MqttGatewayImpl implements MqttGateway {
      */
     private void sendToMqtt(String topic, String payload, int qos, boolean retain) {
         try {
-            // 判断是否为高优先级消息(实时点位数据)
-            boolean isHighPriority = isHighPriorityTopic(topic);
-            
-            if (isHighPriority) {
-                // 高优先级消息使用阻塞队列,确保不丢失
-                mqttBufferService.sendHighPriorityMessage(topic, payload, qos, retain);
-                mqttHealthService.recordMessageSent();
-                log.trace("High priority MQTT message queued: topic={}", topic);
-            } else {
-                // 普通消息使用非阻塞队列
-                boolean queued = mqttBufferService.sendNormalPriorityMessage(topic, payload, qos, retain);
-                if (queued) {
-                    mqttHealthService.recordMessageSent();
-                    log.trace("Normal priority MQTT message queued: topic={}", topic);
-                } else {
-                    mqttHealthService.recordMessageDropped();
-                    log.warn("Normal priority message queue full, dropped: topic={}", topic);
-                }
-            }
+            Message<String> message = MessageBuilder
+                    .withPayload(payload)
+                    .setHeader(MqttHeaders.TOPIC, topic)
+                    .setHeader(MqttHeaders.QOS, qos)
+                    .setHeader(MqttHeaders.RETAINED, retain)
+                    .build();
+
+            mqttOutbound.handleMessage(message);
+            log.trace("MQTT message sent to topic: {}", topic);
         } catch (Exception e) {
-            log.error("Failed to queue MQTT message to topic: {}, error: {}", topic, e.getMessage());
-            mqttHealthService.recordMessageFailed();
-        }
-    }
-    
-    /**
-     * 判断是否为高优先级主题
-     * 实时点位数据等关键数据使用高优先级
-     */
-    private boolean isHighPriorityTopic(String topic) {
-        if (topic == null) {
-            return false;
-        }
-        
-        // 实时点位数据 - 最高优先级
-        if (topic.equals("/das/realtime_pos") || 
-            topic.equals(MqttTopics.DAS_REALTIME_POS)) {
-            return true;
+            log.error("Failed to send MQTT message to topic: {}, error: {}", topic, e.getMessage());
         }
-        
-        // 告警事件 - 高优先级
-        if (topic.contains("/alarm_event") || 
-            topic.contains("/event") ||
-            topic.contains("/fall")) {
-            return true;
-        }
-        
-        // 设备状态 - 中等优先级
-        if (topic.contains("/status") || 
-            topic.contains("/keepalive")) {
-            return false;
-        }
-        
-        // 其他消息 - 普通优先级
-        return false;
     }
 
+//    /**
+//     * 发送消息到MQTT服务器 - 增强版
+//     * 支持动态QoS和消息保留设置
+//     * 使用缓冲服务确保数据不丢失,特别是实时点位数据
+//     *
+//     * @param topic   主题
+//     * @param payload 负载
+//     * @param qos     质量等级
+//     * @param retain  是否保留
+//     */
+//    private void sendToMqtt(String topic, String payload, int qos, boolean retain) {
+//        try {
+//            // 判断是否为高优先级消息(实时点位数据)
+//            boolean isHighPriority = isHighPriorityTopic(topic);
+//
+//            if (isHighPriority) {
+//                // 高优先级消息使用阻塞队列,确保不丢失
+//                mqttBufferService.sendHighPriorityMessage(topic, payload, qos, retain);
+//                mqttHealthService.recordMessageSent();
+//                log.trace("High priority MQTT message queued: topic={}", topic);
+//            } else {
+//                // 普通消息使用非阻塞队列
+//                boolean queued = mqttBufferService.sendNormalPriorityMessage(topic, payload, qos, retain);
+//                if (queued) {
+//                    mqttHealthService.recordMessageSent();
+//                    log.trace("Normal priority MQTT message queued: topic={}", topic);
+//                } else {
+//                    mqttHealthService.recordMessageDropped();
+//                    log.warn("Normal priority message queue full, dropped: topic={}", topic);
+//                }
+//            }
+//        } catch (Exception e) {
+//            log.error("Failed to queue MQTT message to topic: {}, error: {}", topic, e.getMessage());
+//            mqttHealthService.recordMessageFailed();
+//        }
+//    }
+//
+//    /**
+//     * 判断是否为高优先级主题
+//     * 实时点位数据等关键数据使用高优先级
+//     */
+//    private boolean isHighPriorityTopic(String topic) {
+//        if (topic == null) {
+//            return false;
+//        }
+//
+//        // 实时点位数据 - 最高优先级
+//        if (topic.equals("/das/realtime_pos") ||
+//            topic.equals(MqttTopics.DAS_REALTIME_POS)) {
+//            return true;
+//        }
+//
+//        // 告警事件 - 高优先级
+//        if (topic.contains("/alarm_event") ||
+//            topic.contains("/event") ||
+//            topic.contains("/fall")) {
+//            return true;
+//        }
+//
+//        // 设备状态 - 中等优先级
+//        if (topic.contains("/status") ||
+//            topic.contains("/keepalive")) {
+//            return false;
+//        }
+//
+//        // 其他消息 - 普通优先级
+//        return false;
+//    }
+
     /**
      * 发送消息 - 私有方法
      * 默认QoS 1,适合大多数业务场景
@@ -879,7 +904,7 @@ public class MqttGatewayImpl implements MqttGateway {
                 message.put("target_point", new ArrayList<>());
             }
 
-            publishJson(MqttTopics.DAS_REALTIME_POS, message);
+            publishJson(String.format(MqttTopics.DAS_REALTIME_POS_02, deviceId), message);
             log.trace("Realtime position message sent: deviceId={}, targetCount={}",
                     deviceId, targets != null ? targets.size() : 0);
         } catch (Exception e) {