浏览代码

feat:修改mqtt注入

yangliu 4 月之前
父节点
当前提交
e6cac0bd8f

+ 65 - 30
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/config/MqttAutoConfiguration.java

@@ -16,6 +16,7 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -28,6 +29,8 @@ import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannel
 import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
 import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
 import org.springframework.messaging.MessageChannel;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 import java.util.Collections;
 import java.util.List;
@@ -66,8 +69,17 @@ public class MqttAutoConfiguration {
     public MqttConnectOptions mqttConnectOptions() {
         MqttConnectOptions options = new MqttConnectOptions();
         options.setServerURIs(new String[]{mqttProperties.getBroker()});
-        options.setUserName(mqttProperties.getUsername());
-        options.setPassword(mqttProperties.getPassword().toCharArray());
+        
+        // 设置用户名,如果不为null
+        if (mqttProperties.getUsername() != null) {
+            options.setUserName(mqttProperties.getUsername());
+        }
+        
+        // 设置密码,如果不为null
+        if (mqttProperties.getPassword() != null) {
+            options.setPassword(mqttProperties.getPassword().toCharArray());
+        }
+        
         options.setKeepAliveInterval(mqttProperties.getKeepalive());
         options.setConnectionTimeout(mqttProperties.getTimeout());
         options.setCleanSession(mqttProperties.isCleanSession());
@@ -77,7 +89,7 @@ public class MqttAutoConfiguration {
 
     @Bean
     @ConditionalOnMissingBean
-    public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions mqttConnectOptions) {
+    public MqttPahoClientFactory mqttClientFactory(@Qualifier("mqttConnectOptions") MqttConnectOptions mqttConnectOptions) {
         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
         factory.setConnectionOptions(mqttConnectOptions);
         return factory;
@@ -85,7 +97,8 @@ public class MqttAutoConfiguration {
 
     @Bean
     @ConditionalOnMissingBean
-    public MqttClient mqttClient(MqttClientPersistence mqttClientPersistence) throws MqttException {
+    public MqttClient mqttClient(@Qualifier("mqttClientPersistence") MqttClientPersistence mqttClientPersistence,
+                                 @Qualifier("mqttConnectionListeners") List<MqttConnectionListener> connectionListeners) throws MqttException {
         String clientId = mqttProperties.getClientId();
         if (clientId == null || clientId.isEmpty()) {
             clientId = "mqtt-client-" + UUID.randomUUID();
@@ -94,7 +107,7 @@ public class MqttAutoConfiguration {
         MqttClient mqttClient = new MqttClient(mqttProperties.getBroker(), clientId, mqttClientPersistence);
         
         // 设置回调
-        mqttClient.setCallback(new MqttClientCallback(mqttClient, mqttConnectionListeners()));
+        mqttClient.setCallback(new MqttClientCallback(mqttClient, connectionListeners));
         
         // 连接MQTT服务器
         try {
@@ -102,23 +115,33 @@ public class MqttAutoConfiguration {
             mqttClient.connect(options);
             log.info("Connected to MQTT broker: {}", mqttProperties.getBroker());
         } catch (MqttException e) {
-            log.error("Failed to connect to MQTT broker: {}", mqttProperties.getBroker(), e);
-            // 后台尝试重连
-            if (mqttProperties.isAutomaticReconnect()) {
-                new Thread(() -> {
-                    int retryCount = 0;
-                    while (!mqttClient.isConnected() && retryCount < mqttProperties.getMaxReconnectAttempts()) {
-                        try {
-                            Thread.sleep(mqttProperties.getReconnectInterval());
-                            mqttClient.connect(mqttConnectOptions());
-                            log.info("Successfully reconnected to MQTT broker");
-                            break;
-                        } catch (Exception ex) {
-                            retryCount++;
-                            log.error("Failed to reconnect to MQTT broker, attempt: {}", retryCount, ex);
+            if (mqttProperties.isFailFastOnInitialConnection()) {
+                log.error("Failed to connect to MQTT broker and failFastOnInitialConnection is enabled: {}", mqttProperties.getBroker(), e);
+                throw e;
+            } else {
+                log.warn("Failed to connect to MQTT broker: {}, will retry in background", mqttProperties.getBroker());
+                log.debug("Connection error details", e);
+                
+                // 后台尝试重连
+                if (mqttProperties.isAutomaticReconnect()) {
+                    new Thread(() -> {
+                        int retryCount = 0;
+                        while (!mqttClient.isConnected() && retryCount < mqttProperties.getMaxReconnectAttempts()) {
+                            try {
+                                Thread.sleep(mqttProperties.getReconnectInterval());
+                                mqttClient.connect(mqttConnectOptions());
+                                log.info("Successfully reconnected to MQTT broker");
+                                break;
+                            } catch (Exception ex) {
+                                retryCount++;
+                                log.debug("Failed to reconnect to MQTT broker, attempt: {}", retryCount, ex);
+                            }
+                        }
+                        if (!mqttClient.isConnected()) {
+                            log.warn("Failed to reconnect to MQTT broker after {} attempts", mqttProperties.getMaxReconnectAttempts());
                         }
-                    }
-                }).start();
+                    }).start();
+                }
             }
         }
         
@@ -127,15 +150,16 @@ public class MqttAutoConfiguration {
 
     @Bean
     @ConditionalOnMissingBean
-    public MqttTemplate mqttTemplate(MqttClient mqttClient,
-                                     MessageConverter messageConverter) {
-        List<MqttMessageInterceptor> interceptors = mqttMessageInterceptors();
-        return new MqttTemplate(mqttClient, interceptors, messageConverter);
+    public MqttTemplate mqttTemplate(@Qualifier("mqttClient") MqttClient mqttClient,
+                                     @Qualifier("messageConverter") MessageConverter messageConverter,
+                                     @Qualifier("mqttMessageInterceptors") List<MqttMessageInterceptor> mqttMessageInterceptors) {
+        return new MqttTemplate(mqttClient, mqttMessageInterceptors, messageConverter);
     }
     
     @Bean
     @ConditionalOnMissingBean
-    public MqttGateway mqttGateway(MqttTemplate mqttTemplate, MqttClient mqttClient) {
+    public MqttGateway mqttGateway(@Qualifier("mqttTemplate") MqttTemplate mqttTemplate, 
+                                   @Qualifier("mqttClient") MqttClient mqttClient) {
         return new DefaultMqttGateway(mqttTemplate, mqttClient);
     }
 
@@ -159,13 +183,24 @@ public class MqttAutoConfiguration {
 
     @Bean
     @ConditionalOnMissingBean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(1);
+        scheduler.setThreadNamePrefix("mqtt-task-");
+        scheduler.setWaitForTasksToCompleteOnShutdown(true);
+        scheduler.setAwaitTerminationSeconds(30);
+        return scheduler;
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
     public MessageChannel mqttInputChannel() {
         return new DirectChannel();
     }
 
     @Bean
     @ConditionalOnMissingBean
-    public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
+    public MqttPahoMessageHandler mqttOutbound(@Qualifier("mqttClientFactory") MqttPahoClientFactory mqttClientFactory) {
         String clientId = mqttProperties.getClientId();
         if (clientId == null || clientId.isEmpty()) {
             clientId = "mqtt-outbound-" + UUID.randomUUID();
@@ -180,7 +215,7 @@ public class MqttAutoConfiguration {
 
     @Bean
     @ConditionalOnMissingBean
-    public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory mqttClientFactory) {
+    public MqttPahoMessageDrivenChannelAdapter mqttInbound(@Qualifier("mqttClientFactory") MqttPahoClientFactory mqttClientFactory) {
         String clientId = mqttProperties.getClientId();
         if (clientId == null || clientId.isEmpty()) {
             clientId = "mqtt-inbound-" + UUID.randomUUID();
@@ -205,8 +240,8 @@ public class MqttAutoConfiguration {
     
     @Bean
     @ConditionalOnMissingBean
-    public MqttSubscriberProcessor mqttSubscriberProcessor(MqttPahoMessageDrivenChannelAdapter mqttInbound,
-                                                           MqttClient mqttClient) {
+    public MqttSubscriberProcessor mqttSubscriberProcessor(@Qualifier("mqttInbound") MqttPahoMessageDrivenChannelAdapter mqttInbound,
+                                                           @Qualifier("mqttClient") MqttClient mqttClient) {
         return new MqttSubscriberProcessor(mqttInbound, mqttClient);
     }
     

+ 0 - 5
hfln-framework-design-starter/mqtt-spring-boot-starter/src/main/java/cn/hfln/framework/mqtt/template/MqttTemplate.java

@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * 提供发送MQTT消息的便捷方法
  */
 @Slf4j
-@Component
 public class MqttTemplate implements DisposableBean {
     private final MqttPahoMessageHandler mqttOutbound;
     private final JsonMessageConverter jsonConverter;
@@ -46,7 +45,6 @@ public class MqttTemplate implements DisposableBean {
     private final AtomicInteger failedMessages = new AtomicInteger(0);
     private final AtomicInteger retriedMessages = new AtomicInteger(0);
 
-    @Autowired(required = false)
     public MqttTemplate(MqttPahoMessageHandler mqttOutbound, JsonMessageConverter jsonConverter) {
         this.mqttOutbound = mqttOutbound;
         this.jsonConverter = jsonConverter;
@@ -58,7 +56,6 @@ public class MqttTemplate implements DisposableBean {
         this.retryExecutor = null;
     }
 
-    @Autowired(required = false)
     public MqttTemplate(MqttPahoMessageHandler mqttOutbound, JsonMessageConverter jsonConverter, 
                       List<MqttMessageInterceptor> interceptors) {
         this.mqttOutbound = mqttOutbound;
@@ -71,7 +68,6 @@ public class MqttTemplate implements DisposableBean {
         this.retryExecutor = null;
     }
     
-    @Autowired(required = false)
     public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors,
                      MessageConverter messageConverter) {
         this.mqttClient = mqttClient;
@@ -85,7 +81,6 @@ public class MqttTemplate implements DisposableBean {
         this.retryExecutor = null;
     }
     
-    @Autowired(required = false)
     public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors,
                      MessageConverter messageConverter, MqttProperties mqttProperties,
                      MqttReconnectManager reconnectManager) {