Sfoglia il codice sorgente

feat: 剔除ServiceActivator相关的业务处理

yangliu 4 mesi fa
parent
commit
c7e5789d31

+ 28 - 3
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/config/MqttConfig.java

@@ -24,6 +24,10 @@ import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
 
 /**
  * MQTT配置类
+ * 
+ * ⚠️ 注意:为避免与@MqttSubscriber注解方式产生重复消费,
+ * 已禁用Spring Integration的入站适配器。
+ * 现在统一使用@MqttSubscriber注解方式处理MQTT消息。
  */
 @Configuration
 @Slf4j
@@ -80,6 +84,7 @@ public class MqttConfig {
     
     /**
      * MQTT消息输入通道
+     * 保留用于框架需要,但不再使用Spring Integration处理入站消息
      */
     @Bean
     public MessageChannel mqttInputChannel() {
@@ -95,8 +100,12 @@ public class MqttConfig {
     }
     
     /**
-     * MQTT消息驱动通道适配器
+     * ⚠️ 已禁用:MQTT消息驱动通道适配器
+     * 
+     * 为避免与@MqttSubscriber注解方式产生重复消费,此方法已被注释。
+     * 现在统一使用@MqttSubscriber注解方式处理MQTT消息。
      */
+    /*
     @Bean
     public MessageProducer inbound() {
         String clientIdWithRandom = clientId + System.currentTimeMillis();
@@ -110,9 +119,10 @@ public class MqttConfig {
         
         return adapter;
     }
+    */
     
     /**
-     * MQTT消息处理器
+     * MQTT消息处理器(出站)
      */
     @Bean
     @ServiceActivator(inputChannel = "mqttOutputChannel")
@@ -128,12 +138,15 @@ public class MqttConfig {
     }
     
     /**
-     * MQTT入站适配器
+     * ⚠️ 已禁用:MQTT入站适配器
+     * 为避免重复消费,统一使用@MqttSubscriber注解方式
      */
+    /*
     @Bean
     public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
         return (MqttPahoMessageDrivenChannelAdapter) inbound();
     }
+    */
     
     /**
      * MQTT出站处理器
@@ -153,6 +166,17 @@ public class MqttConfig {
         return mqttGatewayEmqx;
     }
 
+    /**
+     * ⚠️ 已禁用:OPC消息通道和适配器
+     * 
+     * 为了避免与@MqttSubscriber注解方式产生重复消费,
+     * OPC相关的Spring Integration配置已被注释。
+     * 
+     * 如果需要OPC功能,请:
+     * 1. 使用@MqttSubscriber注解在OpcMessageSubscriber中处理
+     * 2. 或者确保这些通道使用不同的MQTT客户端
+     */
+    /*
     // OPC消息通道
     @Bean
     public MessageChannel opcAllChannel() {
@@ -202,4 +226,5 @@ public class MqttConfig {
         adapter.setOutputChannel(opcSetAlarmParamChannel());
         return adapter;
     }
+    */
 } 

+ 54 - 15
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/MqttMessageHandler.java

@@ -33,6 +33,15 @@ import java.time.LocalDateTime;
 
 /**
  * MQTT消息处理器
+ * 
+ * ⚠️ 注意:为避免与@MqttSubscriber注解方式产生重复消费,
+ * 此类中的@ServiceActivator方法已被禁用。
+ * 
+ * 现在统一使用各个Subscriber类处理MQTT消息:
+ * - DeviceMessageSubscriber: 处理设备相关消息
+ * - MpsMessageSubscriber: 处理小程序消息  
+ * - AppMessageSubscriber: 处理应用消息
+ * - OpcMessageSubscriber: 处理OPC消息
  */
 @Component
 @Slf4j
@@ -83,8 +92,16 @@ public class MqttMessageHandler {
     private static final Pattern DEV_REBOOT_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_DEV_REBOOT);
 
     /**
-     * 处理MQTT入站消息
+     * ⚠️ 已禁用:处理MQTT入站消息
+     * 
+     * 为避免与@MqttSubscriber注解方式产生重复消费,此方法已被注释。
+     * 现在统一使用各个Subscriber类处理MQTT消息。
+     * 
+     * 如果需要重新启用,请确保:
+     * 1. 移除相应的@MqttSubscriber注解方法
+     * 2. 或者为此handler配置不同的MQTT客户端
      */
+    /*
     @Bean
     @ServiceActivator(inputChannel = "mqttInputChannel")
     public MessageHandler handleMessage() {
@@ -128,6 +145,12 @@ public class MqttMessageHandler {
             }
         };
     }
+    */
+    
+    /**
+     * ⚠️ 以下方法保留用于工具类用途,不再直接处理MQTT消息
+     * 如果需要在Subscriber中复用这些逻辑,可以将此类改为@Service
+     */
     
     /**
      * 处理设备登录消息
@@ -172,6 +195,9 @@ public class MqttMessageHandler {
     
     /**
      * 处理设备上报信息
+     * 
+     * 注意:Python版本中,设备信息上报不会自动发送状态消息
+     * 仅在设备状态发生变化(如从离线变为在线)时才发送状态通知
      */
     private void handleDeviceReportInfo(String topic, String payload) {
         try {
@@ -195,6 +221,7 @@ public class MqttMessageHandler {
             
             if (existingDevice.isPresent()) {
                 Device device = existingDevice.get();
+                boolean statusChanged = false;
                 
                 // 更新设备信息
                 device.setDevType(deviceType);
@@ -203,10 +230,11 @@ public class MqttMessageHandler {
                     device.getNetwork().setIp(deviceIp);
                 }
                 
-                // 如果设备离线,更新为在线
+                // 仅在设备状态发生变化时才标记需要发送状态消息
                 if (device.getOnline() != 1) {
                     device.updateOnlineStatus(1);
                     device.updateKeepAliveTime(System.currentTimeMillis());
+                    statusChanged = true;
                     
                     // 更新数据库
                     deviceGateway.updateDeviceOnlineStatus(deviceId, 1);
@@ -214,10 +242,13 @@ public class MqttMessageHandler {
                 
                 deviceManagerService.updateDeviceInCache(device);
                 
-                // 发送设备状态通知
-                mqttGateway.sendDeviceStatusMessage(device);
-                
-                log.debug("Device info updated: {}", deviceId);
+                // 只有在状态发生变化时才发送设备状态通知
+                if (statusChanged) {
+                    mqttGateway.sendDeviceStatusMessage(device);
+                    log.info("Device status changed to online: {}", deviceId);
+                } else {
+                    log.debug("Device info updated without status change: {}", deviceId);
+                }
             }
             
         } catch (Exception e) {
@@ -227,6 +258,9 @@ public class MqttMessageHandler {
     
     /**
      * 处理设备上报参数
+     * 
+     * 注意:Python版本中,参数更新不会自动发送状态消息
+     * 仅更新设备信息,不进行额外的消息发送
      */
     private void handleDeviceReportParam(String topic, String payload) {
         try {
@@ -293,9 +327,7 @@ public class MqttMessageHandler {
                         // 更新数据库
                         deviceGateway.saveDevice(device);
                         
-                        // 发送设备状态通知
-                        mqttGateway.sendDeviceStatusMessage(device);
-                        
+                        // Python版本不在此处发送设备状态消息,仅记录日志
                         log.info("Device parameters updated: {}", deviceId);
                     }
                 }
@@ -380,6 +412,9 @@ public class MqttMessageHandler {
     
     /**
      * 处理设备上报点云数据
+     * 
+     * 注意:Python版本中,点云数据的处理更加谨慎
+     * 不会自动发送实时姿态消息,仅进行数据记录和处理
      */
     private void handleDeviceCloudPoint(String topic, String payload) {
         try {
@@ -393,13 +428,16 @@ public class MqttMessageHandler {
                 // 获取点云数据
                 List<List<Number>> pointCloud = (List<List<Number>>) message.get("point_cloud");
                 
-                // 处理点云数据(这里可以添加算法处理,如姿态识别等)
+                // Python版本中,点云数据主要用于算法处理
+                // 不会自动转发实时姿态消息,仅进行内部处理
+                // 这里可以添加算法处理,如姿态识别等
+                
+                // 注释掉自动转发功能,避免重复消息
+                // mqttGateway.sendRealtimePoseMessage(deviceId, 
+                //         DeviceConstants.PoseEnum.POSE_STANDING.getCode(), // 默认姿态
+                //         pointCloud.get(0)); // 使用第一个点作为目标点
                 
-                // 转发点云数据
-                // 这里简化处理,仅转发部分数据
-                mqttGateway.sendRealtimePoseMessage(deviceId, 
-                        DeviceConstants.PoseEnum.POSE_STANDING.getCode(), // 默认姿态
-                        pointCloud.get(0)); // 使用第一个点作为目标点
+                log.debug("Cloud point data processed for device: {}", deviceId);
             }
         } catch (Exception e) {
             log.error("Error handling device cloud point: {}", e.getMessage(), e);
@@ -418,6 +456,7 @@ public class MqttMessageHandler {
                 log.debug("Device DSP data: {}", deviceId);
                 
                 // 这里可以添加DSP数据处理逻辑
+                // Python版本中,DSP数据主要用于内部算法处理
             }
         } catch (Exception e) {
             log.error("Error handling device DSP data: {}", e.getMessage(), e);