瀏覽代碼

feat:修改mqtt注入

yangliu 4 月之前
父節點
當前提交
001c4279ac

+ 12 - 0
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/config/MqttAutoConfiguration.java

@@ -2,6 +2,9 @@ package cn.hfln.framework.mqtt.config;
 
 import cn.hfln.framework.mqtt.converter.JsonMessageConverter;
 import cn.hfln.framework.mqtt.converter.MessageConverter;
+import cn.hfln.framework.mqtt.interceptor.MqttMessageInterceptor;
+import cn.hfln.framework.mqtt.template.MqttTemplate;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -9,6 +12,8 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 
+import java.util.List;
+
 @Configuration
 @ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true", matchIfMissing = true)
 @EnableConfigurationProperties(MqttProperties.class)
@@ -20,4 +25,11 @@ public class MqttAutoConfiguration {
     public MessageConverter messageConverter() {
         return new JsonMessageConverter();
     }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public MqttTemplate mqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors, MessageConverter messageConverter) {
+        return new MqttTemplate(mqttClient, interceptors, messageConverter);
+    }
+
 } 

+ 5 - 7
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/config/MqttClientConfig.java

@@ -1,24 +1,22 @@
 package cn.hfln.framework.mqtt.config;
 
-import cn.hfln.framework.mqtt.interceptor.MqttMessageInterceptor;
-import cn.hfln.framework.mqtt.listener.MqttConnectionListener;
-import cn.hfln.framework.mqtt.listener.MqttMessageListener;
 import cn.hfln.framework.mqtt.handler.MqttMessageHandler;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Lazy;
 
-import java.util.List;
-
 @Slf4j
 @Configuration
 @EnableConfigurationProperties(MqttProperties.class)
+@ComponentScan("cn.hfln.framework.mqtt")
 public class MqttClientConfig {
 
     @Autowired

+ 47 - 41
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/handler/MqttMessageHandler.java

@@ -19,6 +19,12 @@ public class MqttMessageHandler implements MqttCallback {
     @Autowired
     private ApplicationContext applicationContext;
 
+    @Autowired
+    private MqttSubscribeProcessor subscribeProcessor;
+
+    @Autowired
+    private MessageConverter messageConverter;
+
     @Override
     public void connectionLost(Throwable cause) {
         log.error("Connection lost", cause);
@@ -26,56 +32,56 @@ public class MqttMessageHandler implements MqttCallback {
 
     @Override
     public void messageArrived(String topic, MqttMessage message) {
-        log.info("Message arrived on topic: {}", topic);
-        log.info("Message content: {}", new String(message.getPayload()));
+//        log.info("Message arrived on topic: {}", topic);
+//        log.debug("Message content: {}", new String(message.getPayload()));
 
-        // 从 ApplicationContext 中获取 MqttSubscribeProcessor
-        MqttSubscribeProcessor subscribeProcessor = applicationContext.getBean(MqttSubscribeProcessor.class);
         Method method = subscribeProcessor.getMethodForTopic(topic);
-        
-        if (method != null) {
-            try {
-                Object bean = applicationContext.getBean(method.getDeclaringClass());
-                log.info("Found handler bean: {} for topic: {}", bean.getClass().getName(), topic);
-                
-                // 获取方法参数类型
-                Class<?>[] parameterTypes = method.getParameterTypes();
-                Object[] args = new Object[parameterTypes.length];
-                
-                // 根据参数类型转换消息内容
-                if (parameterTypes.length == 1) {
-                    if (parameterTypes[0] == String.class) {
-                        args[0] = new String(message.getPayload(), StandardCharsets.UTF_8);
-                        log.info("Converting message to String: {}", args[0]);
-                    } else {
-                        // 尝试使用消息转换器转换
-                        MessageConverter converter = applicationContext.getBean(MessageConverter.class);
-                        args[0] = converter.fromMessage(message, parameterTypes[0]);
-                        log.info("Converting message to {}: {}", parameterTypes[0].getName(), args[0]);
-                    }
-                } else {
-                    log.error("Handler method must have exactly one parameter, but found {} parameters", parameterTypes.length);
-                    return;
-                }
-                
-                log.info("Invoking method: {} on bean: {} with args: {}", method.getName(), bean.getClass().getName(), args[0]);
-                Object result = method.invoke(bean, args);
-                log.info("Method invocation result: {}", result);
-                log.info("Successfully processed message for topic: {}", topic);
-            } catch (Exception e) {
-                log.error("Error invoking method for topic: {} - Error: {}", topic, e.getMessage(), e);
-                log.error("Stack trace:", e);
-            }
-        } else {
+        if (method == null) {
             log.warn("No handler method found for topic: {}", topic);
+            return;
+        }
+
+        try {
+            Object bean = applicationContext.getBean(method.getDeclaringClass());
+//            log.debug("Found handler bean: {} for topic: {}", bean.getClass().getName(), topic);
+            
+            // 获取方法参数类型
+            Class<?>[] parameterTypes = method.getParameterTypes();
+            if (parameterTypes.length != 1) {
+                log.error("Handler method must have exactly one parameter, but found {} parameters", parameterTypes.length);
+                return;
+            }
+
+            Object[] args = new Object[1];
+            Class<?> paramType = parameterTypes[0];
+            
+            // 根据参数类型转换消息内容
+            if (paramType == String.class) {
+                args[0] = new String(message.getPayload(), StandardCharsets.UTF_8);
+//                log.debug("Converting message to String: {}", args[0]);
+            } else if (paramType == byte[].class) {
+                args[0] = message.getPayload();
+//                log.debug("Using raw byte array as message payload");
+            } else {
+                // 使用消息转换器转换
+                args[0] = messageConverter.fromMessage(message, paramType);
+//                log.debug("Converting message to {}: {}", paramType.getName(), args[0]);
+            }
+            
+//            log.debug("Invoking method: {} on bean: {} with args: {}", method.getName(), bean.getClass().getName(), args[0]);
+            Object result = method.invoke(bean, args);
+//            log.debug("Method invocation result: {}", result);
+//            log.info("Successfully processed message for topic: {}", topic);
+        } catch (Exception e) {
+            log.error("Error invoking method for topic: {} - Error: {}", topic, e.getMessage(), e);
         }
     }
 
     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {
-        log.info("Message delivery complete for token: {}", token);
         try {
-            log.info("Message delivered to topics: {}", token.getTopics());
+//            log.debug("Message delivery complete for token: {}", token);
+//            log.debug("Message delivered to topics: {}", (Object) token.getTopics());
         } catch (Exception e) {
             log.error("Error getting delivery topics", e);
         }

+ 52 - 17
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/handler/MqttSubscribeProcessor.java

@@ -9,6 +9,7 @@ import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
+import org.springframework.util.ReflectionUtils;
 
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -30,31 +31,65 @@ public class MqttSubscribeProcessor implements BeanPostProcessor {
 
     @Override
     public Object postProcessAfterInitialization(Object bean, String beanName) {
-        log.info("Processing bean: {} for MQTT subscriptions", beanName);
+//        log.info("Processing bean: {} for MQTT subscriptions", beanName);
 
         Class<?> beanClass = bean.getClass();
-        for (Method method : beanClass.getMethods()) {
+        
+        // 处理类级别的注解
+        MqttSubscribe classAnnotation = beanClass.getAnnotation(MqttSubscribe.class);
+        if (classAnnotation != null) {
+            processMqttSubscribe(bean, beanClass, classAnnotation);
+        }
+
+        // 处理方法级别的注解
+        ReflectionUtils.doWithMethods(beanClass, method -> {
             MqttSubscribe annotation = method.getAnnotation(MqttSubscribe.class);
             if (annotation != null) {
-                String topic = annotation.topic();
-                int qos = annotation.qos();
-                
-                if (!mqttClient.isConnected()) {
-                    log.error("MQTT client is not connected, cannot subscribe to topic: {}", topic);
-                    continue;
-                }
+                processMqttSubscribe(bean, method, annotation);
+            }
+        });
 
-                try {
-                    log.info("Subscribing to topic: {} with QoS: {}", topic, qos);
-                    mqttClient.subscribe(topic, qos);
+        return bean;
+    }
+
+    private void processMqttSubscribe(Object bean, Object target, MqttSubscribe annotation) {
+        String topic = annotation.topic();
+        int qos = annotation.qos();
+        
+        if (!mqttClient.isConnected()) {
+            log.error("MQTT client is not connected, cannot subscribe to topic: {}", topic);
+            return;
+        }
+
+        try {
+//            log.info("Subscribing to topic: {} with QoS: {}", topic, qos);
+            mqttClient.subscribe(topic, qos);
+            
+            if (target instanceof Method) {
+                topicMethodMap.put(topic, (Method) target);
+            } else if (target instanceof Class) {
+                // 对于类级别的注解,我们需要找到合适的处理方法
+                Method method = findHandlerMethod((Class<?>) target);
+                if (method != null) {
                     topicMethodMap.put(topic, method);
-                    log.info("Successfully subscribed to topic: {} with QoS: {}", topic, qos);
-                } catch (MqttException e) {
-                    log.error("Failed to subscribe to topic: {}", topic, e);
                 }
             }
+            
+//            log.info("Successfully subscribed to topic: {} with QoS: {}", topic, qos);
+        } catch (MqttException e) {
+            log.error("Failed to subscribe to topic: {}", topic, e);
         }
-        return bean;
+    }
+
+    private Method findHandlerMethod(Class<?> targetClass) {
+        for (Method method : targetClass.getMethods()) {
+            if (method.getParameterCount() == 1 && 
+                (method.getParameterTypes()[0] == String.class || 
+                 method.getParameterTypes()[0] == byte[].class)) {
+                return method;
+            }
+        }
+        return null;
     }
 
     public Method getMethodForTopic(String topic) {
@@ -70,7 +105,7 @@ public class MqttSubscribeProcessor implements BeanPostProcessor {
                     .replace("+", "[^/]+")  // 将 + 替换为匹配除 / 外的任意字符
                     .replace("#", ".*");     // 将 # 替换为匹配任意字符
             if (Pattern.matches(pattern, topic)) {
-                log.info("Found matching method for topic: {} using pattern: {}", topic, pattern);
+//                log.info("Found matching method for topic: {} using pattern: {}", topic, pattern);
                 return entry.getValue();
             }
         }

+ 3 - 3
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/interceptor/DefaultMqttMessageInterceptor.java

@@ -9,7 +9,7 @@ import org.springframework.stereotype.Component;
 public class DefaultMqttMessageInterceptor implements MqttMessageInterceptor {
     @Override
     public boolean preSend(String topic, MqttMessage message) {
-        log.debug("Pre-send message to topic: {}", topic);
+//        log.debug("Pre-send message to topic: {}", topic);
         return true;
     }
 
@@ -20,12 +20,12 @@ public class DefaultMqttMessageInterceptor implements MqttMessageInterceptor {
 
     @Override
     public boolean preReceive(String topic, MqttMessage message) {
-        log.debug("Pre-receive message from topic: {}", topic);
+//        log.debug("Pre-receive message from topic: {}", topic);
         return true;
     }
 
     @Override
     public void postReceive(String topic, MqttMessage message) {
-        log.debug("Post-receive message from topic: {}", topic);
+//        log.debug("Post-receive message from topic: {}", topic);
     }
 }