ソースを参照

feat: 剔除ServiceActivator相关的业务处理

yangliu 4 ヶ月 前
コミット
1b43d36cf6

+ 9 - 35
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/MqttMessageHandler.java

@@ -92,16 +92,14 @@ public class MqttMessageHandler {
     private static final Pattern DEV_REBOOT_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_DEV_REBOOT);
 
     /**
-     * ⚠️ 已禁用:处理MQTT入站消息
+     * ⚠️ 临时处理器:处理MQTT入站消息
      * 
-     * 为避免与@MqttSubscriber注解方式产生重复消费,此方法已被注释。
-     * 现在统一使用各个Subscriber类处理MQTT消息
+     * 由于cn.hfln.framework.mqtt框架自动创建了入站适配器并发送消息到mqttInputChannel,
+     * 我们需要提供一个处理器来避免"no subscribers"错误
      * 
-     * 如果需要重新启用,请确保:
-     * 1. 移除相应的@MqttSubscriber注解方法
-     * 2. 或者为此handler配置不同的MQTT客户端
+     * 此处理器仅记录日志,不进行任何业务处理。
+     * 所有实际的业务处理都由各个@MqttSubscriber注解的方法处理。
      */
-    /*
     @Bean
     @ServiceActivator(inputChannel = "mqttInputChannel")
     public MessageHandler handleMessage() {
@@ -112,40 +110,16 @@ public class MqttMessageHandler {
                     String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
                     String payload = (String) message.getPayload();
                     
-                    log.debug("Received MQTT message: topic={}, payload={}", topic, payload);
-                    
-                    if (topic == null) {
-                        return;
-                    }
-                    
-                    // 处理不同类型的消息
-                    if (matchTopic(DEV_LOGIN_PATTERN, topic)) {
-                        handleDeviceLogin(topic, payload);
-                    } else if (matchTopic(DEV_KEEPALIVE_PATTERN, topic)) {
-                        handleDeviceKeepAlive(topic, payload);
-                    } else if (matchTopic(DEV_REPORT_INFO_PATTERN, topic)) {
-                        handleDeviceReportInfo(topic, payload);
-                    } else if (matchTopic(DEV_REPORT_PARAM_PATTERN, topic)) {
-                        handleDeviceReportParam(topic, payload);
-                    } else if (matchTopic(DEV_REPORT_FALL_PATTERN, topic)) {
-                        handleDeviceReportFall(topic, payload);
-                    } else if (matchTopic(DEV_CLOUDPOINT_PATTERN, topic)) {
-                        handleDeviceCloudPoint(topic, payload);
-                    } else if (matchTopic(DEV_DSP_DATA_PATTERN, topic)) {
-                        handleDeviceDspData(topic, payload);
-                    } else if (matchTopic(MPS_FALL_EVENT_ACK_PATTERN, topic)) {
-                        handleFallEventAck(topic, payload);
-                    } else if (matchTopic(DEV_REBOOT_PATTERN, topic)) {
-                        handleDeviceReboot(topic, payload);
-                    }
+                    // 仅记录调试日志,不进行任何业务处理
+                    // 所有业务处理由@MqttSubscriber注解的方法完成
+                    log.debug("Spring Integration received MQTT message (ignored): topic={}", topic);
                     
                 } catch (Exception e) {
-                    log.error("Error handling MQTT message: {}", e.getMessage(), e);
+                    log.debug("Error in Spring Integration MQTT handler (ignored): {}", e.getMessage());
                 }
             }
         };
     }
-    */
     
     /**
      * ⚠️ 以下方法保留用于工具类用途,不再直接处理MQTT消息

+ 260 - 0
device-service-infrastructure/src/test/java/com/hfln/device/infrastructure/mqtt/MqttSubscriberFunctionalTest.java

@@ -0,0 +1,260 @@
+package com.hfln.device.infrastructure.mqtt;
+
+import cn.hfln.framework.mqtt.annotation.MqttSubscriber;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.context.ContextConfiguration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * @MqttSubscriber注解功能测试
+ * 
+ * 测试目标:
+ * 1. 验证@MqttSubscriber注解是否能正常工作
+ * 2. 测试消息路由和处理机制
+ * 3. 确认没有重复消费问题
+ */
+@SpringBootTest(classes = {MqttSubscriberFunctionalTest.TestConfig.class})
+@ContextConfiguration
+public class MqttSubscriberFunctionalTest {
+
+    @TestConfiguration
+    static class TestConfig {
+        
+        @Bean
+        public TestMqttSubscriber testMqttSubscriber() {
+            return new TestMqttSubscriber();
+        }
+    }
+    
+    /**
+     * 测试用的MQTT订阅者
+     */
+    public static class TestMqttSubscriber {
+        
+        private final CountDownLatch latch = new CountDownLatch(1);
+        private volatile String lastReceivedTopic;
+        private volatile String lastReceivedPayload;
+        private volatile boolean messageReceived = false;
+        
+        @MqttSubscriber(topic = "/test/device/+/login", qos = 1, desc = "测试设备登录")
+        public void handleTestDeviceLogin(String topic, Message<?> message) {
+            System.out.println("收到MQTT消息 - Topic: " + topic + ", Payload: " + message.getPayload());
+            
+            this.lastReceivedTopic = topic;
+            this.lastReceivedPayload = message.getPayload().toString();
+            this.messageReceived = true;
+            
+            latch.countDown();
+        }
+        
+        @MqttSubscriber(topic = "/test/device/+/keepalive", qos = 0, desc = "测试设备心跳") 
+        public void handleTestDeviceKeepAlive(String topic, Message<?> message) {
+            System.out.println("收到心跳消息 - Topic: " + topic);
+        }
+        
+        @MqttSubscriber(topic = "/test/app/+/command", qos = 1, desc = "测试应用命令")
+        public void handleTestAppCommand(String topic, Message<?> message) {
+            System.out.println("收到应用命令 - Topic: " + topic + ", Payload: " + message.getPayload());
+        }
+        
+        public boolean waitForMessage(long timeoutSeconds) throws InterruptedException {
+            return latch.await(timeoutSeconds, TimeUnit.SECONDS);
+        }
+        
+        public String getLastReceivedTopic() {
+            return lastReceivedTopic;
+        }
+        
+        public String getLastReceivedPayload() {
+            return lastReceivedPayload;
+        }
+        
+        public boolean isMessageReceived() {
+            return messageReceived;
+        }
+    }
+
+    @Test
+    public void testMqttSubscriberAnnotationBasicFunctionality() throws Exception {
+        System.out.println("=== 测试@MqttSubscriber注解基础功能 ===");
+        
+        // 这个测试验证@MqttSubscriber注解能否被正确识别和处理
+        TestMqttSubscriber subscriber = new TestMqttSubscriber();
+        
+        // 验证订阅者对象创建成功
+        assertNotNull(subscriber, "@MqttSubscriber对象应该能正常创建");
+        
+        // 模拟消息处理
+        Map<String, Object> headers = new HashMap<>();
+        headers.put("mqtt_receivedTopic", "/test/device/test001/login");
+        
+        String testPayload = "{\"device_info\":{\"deviceid\":\"test001\",\"firmware\":\"v1.0.0\"}}";
+        Message<String> testMessage = new GenericMessage<>(testPayload, headers);
+        
+        // 直接调用处理方法(模拟MQTT框架的调用)
+        subscriber.handleTestDeviceLogin("/test/device/test001/login", testMessage);
+        
+        // 验证消息处理结果
+        assertTrue(subscriber.isMessageReceived(), "消息应该被正确接收");
+        assertEquals("/test/device/test001/login", subscriber.getLastReceivedTopic(), "主题应该匹配");
+        assertEquals(testPayload, subscriber.getLastReceivedPayload(), "载荷应该匹配");
+        
+        System.out.println("✓ @MqttSubscriber注解基础功能测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberMultipleTopicPatterns() throws Exception {
+        System.out.println("=== 测试@MqttSubscriber多主题模式 ===");
+        
+        TestMqttSubscriber subscriber = new TestMqttSubscriber();
+        
+        // 测试不同的主题模式
+        String[] testTopics = {
+            "/test/device/abc123/login",
+            "/test/device/xyz789/keepalive", 
+            "/test/app/admin/command"
+        };
+        
+        for (String topic : testTopics) {
+            Message<String> message = new GenericMessage<>("test payload");
+            
+            // 根据主题调用相应的处理方法
+            if (topic.contains("/login")) {
+                assertDoesNotThrow(() -> {
+                    subscriber.handleTestDeviceLogin(topic, message);
+                }, "登录消息处理不应该抛出异常");
+            } else if (topic.contains("/keepalive")) {
+                assertDoesNotThrow(() -> {
+                    subscriber.handleTestDeviceKeepAlive(topic, message);
+                }, "心跳消息处理不应该抛出异常");
+            } else if (topic.contains("/command")) {
+                assertDoesNotThrow(() -> {
+                    subscriber.handleTestAppCommand(topic, message);
+                }, "命令消息处理不应该抛出异常");
+            }
+        }
+        
+        System.out.println("✓ 多主题模式测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberErrorHandling() throws Exception {
+        System.out.println("=== 测试@MqttSubscriber错误处理 ===");
+        
+        TestMqttSubscriber subscriber = new TestMqttSubscriber();
+        
+        // 测试null消息
+        assertDoesNotThrow(() -> {
+            subscriber.handleTestDeviceLogin("/test/device/test001/login", null);
+        }, "null消息不应该导致异常");
+        
+        // 测试空载荷
+        Message<String> emptyMessage = new GenericMessage<>("");
+        assertDoesNotThrow(() -> {
+            subscriber.handleTestDeviceLogin("/test/device/test001/login", emptyMessage);
+        }, "空载荷消息不应该导致异常");
+        
+        // 测试无效JSON载荷
+        Message<String> invalidJsonMessage = new GenericMessage<>("invalid json{[}");
+        assertDoesNotThrow(() -> {
+            subscriber.handleTestDeviceLogin("/test/device/test001/login", invalidJsonMessage);
+        }, "无效JSON载荷不应该导致异常");
+        
+        System.out.println("✓ 错误处理测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberPerformance() throws Exception {
+        System.out.println("=== 测试@MqttSubscriber性能 ===");
+        
+        TestMqttSubscriber subscriber = new TestMqttSubscriber();
+        
+        // 性能测试:处理大量消息
+        int messageCount = 1000;
+        long startTime = System.currentTimeMillis();
+        
+        for (int i = 0; i < messageCount; i++) {
+            String topic = "/test/device/perf" + i + "/login";
+            String payload = "{\"device_info\":{\"deviceid\":\"perf" + i + "\"}}";
+            Message<String> message = new GenericMessage<>(payload);
+            
+            subscriber.handleTestDeviceLogin(topic, message);
+        }
+        
+        long endTime = System.currentTimeMillis();
+        long duration = endTime - startTime;
+        
+        System.out.println("处理 " + messageCount + " 条消息耗时: " + duration + "ms");
+        System.out.println("平均每条消息耗时: " + (duration * 1.0 / messageCount) + "ms");
+        
+        // 性能要求:平均每条消息处理时间应该小于10ms
+        assertTrue(duration < messageCount * 10, "性能测试:平均处理时间应该小于10ms/条");
+        
+        System.out.println("✓ 性能测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberConfiguration() throws Exception {
+        System.out.println("=== 测试@MqttSubscriber配置 ===");
+        
+        // 这个测试验证注解的配置参数是否正确
+        TestMqttSubscriber subscriber = new TestMqttSubscriber();
+        
+        // 通过反射检查方法上的注解
+        Class<?> subscriberClass = subscriber.getClass();
+        
+        // 检查登录处理方法的注解
+        try {
+            java.lang.reflect.Method loginMethod = subscriberClass.getMethod("handleTestDeviceLogin", String.class, Message.class);
+            MqttSubscriber annotation = loginMethod.getAnnotation(MqttSubscriber.class);
+            
+            assertNotNull(annotation, "应该能找到@MqttSubscriber注解");
+            assertEquals("/test/device/+/login", annotation.topic(), "主题配置应该正确");
+            assertEquals(1, annotation.qos(), "QoS配置应该正确");
+            assertEquals("测试设备登录", annotation.desc(), "描述配置应该正确");
+            
+        } catch (NoSuchMethodException e) {
+            fail("应该能找到handleTestDeviceLogin方法");
+        }
+        
+        System.out.println("✓ 配置测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberIsolation() throws Exception {
+        System.out.println("=== 测试@MqttSubscriber消息隔离 ===");
+        
+        TestMqttSubscriber subscriber = new TestMqttSubscriber();
+        
+        // 验证不同的订阅方法相互独立
+        String loginTopic = "/test/device/test001/login";
+        String keepaliveTopic = "/test/device/test001/keepalive";
+        String commandTopic = "/test/app/admin/command";
+        
+        Message<String> message = new GenericMessage<>("test");
+        
+        // 调用登录处理方法
+        subscriber.handleTestDeviceLogin(loginTopic, message);
+        assertTrue(subscriber.isMessageReceived(), "登录消息应该被处理");
+        
+        // 调用其他方法不应该影响登录消息的状态
+        subscriber.handleTestDeviceKeepAlive(keepaliveTopic, message);
+        subscriber.handleTestAppCommand(commandTopic, message);
+        
+        // 验证消息状态仍然正确
+        assertEquals(loginTopic, subscriber.getLastReceivedTopic(), "最后接收的主题应该仍是登录主题");
+        
+        System.out.println("✓ 消息隔离测试通过");
+    }
+} 

+ 228 - 0
device-service-infrastructure/src/test/java/com/hfln/device/infrastructure/mqtt/SimpleMqttSubscriberTest.java

@@ -0,0 +1,228 @@
+package com.hfln.device.infrastructure.mqtt;
+
+import cn.hfln.framework.mqtt.annotation.MqttSubscriber;
+import org.junit.jupiter.api.Test;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * 简单的@MqttSubscriber注解测试
+ * 
+ * 不依赖Spring Boot Context,仅测试注解本身的功能
+ */
+public class SimpleMqttSubscriberTest {
+
+    /**
+     * 测试类,包含@MqttSubscriber注解的方法
+     */
+    public static class TestSubscriber {
+        
+        @MqttSubscriber(topic = "/test/device/+/login", qos = 1, desc = "测试设备登录")
+        public void handleDeviceLogin(String topic, Message<?> message) {
+            System.out.println("处理设备登录: " + topic + " -> " + message.getPayload());
+        }
+        
+        @MqttSubscriber(topic = "/test/device/+/keepalive", qos = 0, desc = "测试设备心跳")
+        public void handleDeviceKeepAlive(String topic, Message<?> message) {
+            System.out.println("处理设备心跳: " + topic);
+        }
+        
+        @MqttSubscriber(topic = "/test/app/+/command", qos = 1, desc = "测试应用命令")
+        public void handleAppCommand(String topic, Message<?> message) {
+            System.out.println("处理应用命令: " + topic + " -> " + message.getPayload());
+        }
+        
+        // 没有注解的方法
+        public void handleRegularMethod(String topic, Message<?> message) {
+            System.out.println("普通方法: " + topic);
+        }
+    }
+
+    @Test
+    public void testMqttSubscriberAnnotationPresence() {
+        System.out.println("=== 测试@MqttSubscriber注解是否存在 ===");
+        
+        TestSubscriber subscriber = new TestSubscriber();
+        Class<?> subscriberClass = subscriber.getClass();
+        
+        // 获取所有方法
+        Method[] methods = subscriberClass.getDeclaredMethods();
+        
+        int annotatedMethodCount = 0;
+        
+        for (Method method : methods) {
+            MqttSubscriber annotation = method.getAnnotation(MqttSubscriber.class);
+            
+            if (annotation != null) {
+                annotatedMethodCount++;
+                System.out.println("找到@MqttSubscriber注解方法: " + method.getName());
+                System.out.println("  - 主题: " + annotation.topic());
+                System.out.println("  - QoS: " + annotation.qos());
+                System.out.println("  - 描述: " + annotation.desc());
+                
+                // 验证注解属性
+                assertNotNull(annotation.topic(), "topic不应该为null");
+                assertTrue(annotation.topic().length() > 0, "topic不应该为空");
+                assertTrue(annotation.qos() >= 0 && annotation.qos() <= 2, "QoS应该在0-2之间");
+                assertNotNull(annotation.desc(), "desc不应该为null");
+            }
+        }
+        
+        // 应该有3个带@MqttSubscriber注解的方法
+        assertEquals(3, annotatedMethodCount, "应该有3个带@MqttSubscriber注解的方法");
+        
+        System.out.println("✓ 注解存在性测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberAnnotationConfiguration() {
+        System.out.println("=== 测试@MqttSubscriber注解配置 ===");
+        
+        TestSubscriber subscriber = new TestSubscriber();
+        Class<?> subscriberClass = subscriber.getClass();
+        
+        try {
+            // 测试设备登录方法的注解
+            Method loginMethod = subscriberClass.getMethod("handleDeviceLogin", String.class, Message.class);
+            MqttSubscriber loginAnnotation = loginMethod.getAnnotation(MqttSubscriber.class);
+            
+            assertNotNull(loginAnnotation, "应该能找到登录方法的@MqttSubscriber注解");
+            assertEquals("/test/device/+/login", loginAnnotation.topic(), "登录方法的主题应该正确");
+            assertEquals(1, loginAnnotation.qos(), "登录方法的QoS应该是1");
+            assertEquals("测试设备登录", loginAnnotation.desc(), "登录方法的描述应该正确");
+            
+            // 测试设备心跳方法的注解
+            Method keepaliveMethod = subscriberClass.getMethod("handleDeviceKeepAlive", String.class, Message.class);
+            MqttSubscriber keepaliveAnnotation = keepaliveMethod.getAnnotation(MqttSubscriber.class);
+            
+            assertNotNull(keepaliveAnnotation, "应该能找到心跳方法的@MqttSubscriber注解");
+            assertEquals("/test/device/+/keepalive", keepaliveAnnotation.topic(), "心跳方法的主题应该正确");
+            assertEquals(0, keepaliveAnnotation.qos(), "心跳方法的QoS应该是0");
+            assertEquals("测试设备心跳", keepaliveAnnotation.desc(), "心跳方法的描述应该正确");
+            
+            // 测试应用命令方法的注解
+            Method commandMethod = subscriberClass.getMethod("handleAppCommand", String.class, Message.class);
+            MqttSubscriber commandAnnotation = commandMethod.getAnnotation(MqttSubscriber.class);
+            
+            assertNotNull(commandAnnotation, "应该能找到命令方法的@MqttSubscriber注解");
+            assertEquals("/test/app/+/command", commandAnnotation.topic(), "命令方法的主题应该正确");
+            assertEquals(1, commandAnnotation.qos(), "命令方法的QoS应该是1");
+            assertEquals("测试应用命令", commandAnnotation.desc(), "命令方法的描述应该正确");
+            
+            // 测试普通方法没有注解
+            Method regularMethod = subscriberClass.getMethod("handleRegularMethod", String.class, Message.class);
+            MqttSubscriber regularAnnotation = regularMethod.getAnnotation(MqttSubscriber.class);
+            
+            assertNull(regularAnnotation, "普通方法不应该有@MqttSubscriber注解");
+            
+        } catch (NoSuchMethodException e) {
+            fail("应该能找到测试方法: " + e.getMessage());
+        }
+        
+        System.out.println("✓ 注解配置测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberMethodInvocation() {
+        System.out.println("=== 测试@MqttSubscriber方法调用 ===");
+        
+        TestSubscriber subscriber = new TestSubscriber();
+        
+        // 创建测试消息
+        Message<String> testMessage = new GenericMessage<>("test payload");
+        
+        // 测试各个方法是否能正常调用
+        assertDoesNotThrow(() -> {
+            subscriber.handleDeviceLogin("/test/device/test001/login", testMessage);
+        }, "设备登录方法调用不应该抛出异常");
+        
+        assertDoesNotThrow(() -> {
+            subscriber.handleDeviceKeepAlive("/test/device/test001/keepalive", testMessage);
+        }, "设备心跳方法调用不应该抛出异常");
+        
+        assertDoesNotThrow(() -> {
+            subscriber.handleAppCommand("/test/app/admin/command", testMessage);
+        }, "应用命令方法调用不应该抛出异常");
+        
+        assertDoesNotThrow(() -> {
+            subscriber.handleRegularMethod("/test/regular", testMessage);
+        }, "普通方法调用不应该抛出异常");
+        
+        System.out.println("✓ 方法调用测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberTopicPatterns() {
+        System.out.println("=== 测试@MqttSubscriber主题模式 ===");
+        
+        TestSubscriber subscriber = new TestSubscriber();
+        Class<?> subscriberClass = subscriber.getClass();
+        
+        // 收集所有的主题模式
+        String[] expectedTopics = {
+            "/test/device/+/login",
+            "/test/device/+/keepalive", 
+            "/test/app/+/command"
+        };
+        
+        int foundTopics = 0;
+        Method[] methods = subscriberClass.getDeclaredMethods();
+        
+        for (Method method : methods) {
+            MqttSubscriber annotation = method.getAnnotation(MqttSubscriber.class);
+            if (annotation != null) {
+                String topic = annotation.topic();
+                
+                // 检查是否是预期的主题之一
+                boolean isExpectedTopic = false;
+                for (String expectedTopic : expectedTopics) {
+                    if (expectedTopic.equals(topic)) {
+                        isExpectedTopic = true;
+                        foundTopics++;
+                        break;
+                    }
+                }
+                
+                assertTrue(isExpectedTopic, "主题 " + topic + " 应该是预期的主题之一");
+                
+                // 验证主题格式
+                assertTrue(topic.startsWith("/"), "主题应该以/开头");
+                assertTrue(topic.contains("+"), "测试主题应该包含通配符+");
+            }
+        }
+        
+        assertEquals(expectedTopics.length, foundTopics, "应该找到所有预期的主题");
+        
+        System.out.println("✓ 主题模式测试通过");
+    }
+    
+    @Test
+    public void testMqttSubscriberErrorHandling() {
+        System.out.println("=== 测试@MqttSubscriber错误处理 ===");
+        
+        TestSubscriber subscriber = new TestSubscriber();
+        
+        // 测试null参数处理
+        assertDoesNotThrow(() -> {
+            subscriber.handleDeviceLogin(null, null);
+        }, "null参数不应该导致异常");
+        
+        // 测试空主题
+        Message<String> message = new GenericMessage<>("test");
+        assertDoesNotThrow(() -> {
+            subscriber.handleDeviceLogin("", message);
+        }, "空主题不应该导致异常");
+        
+        // 测试空载荷
+        Message<String> emptyMessage = new GenericMessage<>("");
+        assertDoesNotThrow(() -> {
+            subscriber.handleDeviceLogin("/test/device/test001/login", emptyMessage);
+        }, "空载荷不应该导致异常");
+        
+        System.out.println("✓ 错误处理测试通过");
+    }
+}