|
|
@@ -1,32 +1,121 @@
|
|
|
package cn.hfln.framework.mqtt.template;
|
|
|
|
|
|
+import cn.hfln.framework.mqtt.config.MqttProperties;
|
|
|
+import cn.hfln.framework.mqtt.connection.MqttReconnectManager;
|
|
|
+import cn.hfln.framework.mqtt.converter.JsonMessageConverter;
|
|
|
import cn.hfln.framework.mqtt.converter.MessageConverter;
|
|
|
+import cn.hfln.framework.mqtt.exception.MqttExceptionHandler;
|
|
|
import cn.hfln.framework.mqtt.interceptor.MqttMessageInterceptor;
|
|
|
-import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
-import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
+import org.springframework.beans.factory.DisposableBean;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
+import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
+import org.springframework.messaging.Message;
|
|
|
+import org.springframework.messaging.support.MessageBuilder;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+/**
|
|
|
+ * MQTT消息模板
|
|
|
+ * 提供发送MQTT消息的便捷方法
|
|
|
+ */
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
-public class MqttTemplate {
|
|
|
- private final MqttClient mqttClient;
|
|
|
- private final ObjectMapper objectMapper;
|
|
|
+public class MqttTemplate implements DisposableBean {
|
|
|
+ private final MqttPahoMessageHandler mqttOutbound;
|
|
|
+ private final JsonMessageConverter jsonConverter;
|
|
|
private final List<MqttMessageInterceptor> interceptors;
|
|
|
+ private final MqttClient mqttClient;
|
|
|
private final MessageConverter messageConverter;
|
|
|
+ private final MqttProperties mqttProperties;
|
|
|
+ private final MqttReconnectManager reconnectManager;
|
|
|
+
|
|
|
+ // 用于重试发送消息的线程池
|
|
|
+ private final ScheduledExecutorService retryExecutor;
|
|
|
+
|
|
|
+ // 消息发送统计
|
|
|
+ private final AtomicInteger totalMessagesSent = new AtomicInteger(0);
|
|
|
+ private final AtomicInteger failedMessages = new AtomicInteger(0);
|
|
|
+ private final AtomicInteger retriedMessages = new AtomicInteger(0);
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ public MqttTemplate(MqttPahoMessageHandler mqttOutbound, JsonMessageConverter jsonConverter) {
|
|
|
+ this.mqttOutbound = mqttOutbound;
|
|
|
+ this.jsonConverter = jsonConverter;
|
|
|
+ this.interceptors = Collections.emptyList();
|
|
|
+ this.mqttClient = null;
|
|
|
+ this.messageConverter = null;
|
|
|
+ this.mqttProperties = null;
|
|
|
+ this.reconnectManager = null;
|
|
|
+ this.retryExecutor = null;
|
|
|
+ }
|
|
|
|
|
|
- @Autowired
|
|
|
- public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors, MessageConverter messageConverter) {
|
|
|
+ @Autowired(required = false)
|
|
|
+ public MqttTemplate(MqttPahoMessageHandler mqttOutbound, JsonMessageConverter jsonConverter,
|
|
|
+ List<MqttMessageInterceptor> interceptors) {
|
|
|
+ this.mqttOutbound = mqttOutbound;
|
|
|
+ this.jsonConverter = jsonConverter;
|
|
|
+ this.interceptors = interceptors != null ? interceptors : Collections.emptyList();
|
|
|
+ this.mqttClient = null;
|
|
|
+ this.messageConverter = null;
|
|
|
+ this.mqttProperties = null;
|
|
|
+ this.reconnectManager = null;
|
|
|
+ this.retryExecutor = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors,
|
|
|
+ MessageConverter messageConverter) {
|
|
|
+ this.mqttClient = mqttClient;
|
|
|
+ this.messageConverter = messageConverter;
|
|
|
+ this.interceptors = interceptors != null ? interceptors : Collections.emptyList();
|
|
|
+ this.mqttOutbound = null;
|
|
|
+ this.jsonConverter = (messageConverter instanceof JsonMessageConverter) ?
|
|
|
+ (JsonMessageConverter) messageConverter : null;
|
|
|
+ this.mqttProperties = null;
|
|
|
+ this.reconnectManager = null;
|
|
|
+ this.retryExecutor = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Autowired(required = false)
|
|
|
+ public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors,
|
|
|
+ MessageConverter messageConverter, MqttProperties mqttProperties,
|
|
|
+ MqttReconnectManager reconnectManager) {
|
|
|
this.mqttClient = mqttClient;
|
|
|
- this.objectMapper = new ObjectMapper();
|
|
|
- this.interceptors = interceptors;
|
|
|
this.messageConverter = messageConverter;
|
|
|
+ this.interceptors = interceptors != null ? interceptors : Collections.emptyList();
|
|
|
+ this.mqttOutbound = null;
|
|
|
+ this.jsonConverter = (messageConverter instanceof JsonMessageConverter) ?
|
|
|
+ (JsonMessageConverter) messageConverter : null;
|
|
|
+ this.mqttProperties = mqttProperties;
|
|
|
+ this.reconnectManager = reconnectManager;
|
|
|
+
|
|
|
+ // 创建重试线程池
|
|
|
+ ThreadFactory threadFactory = r -> {
|
|
|
+ Thread t = new Thread(r, "mqtt-retry-thread");
|
|
|
+ t.setDaemon(true);
|
|
|
+ return t;
|
|
|
+ };
|
|
|
+
|
|
|
+ this.retryExecutor = Executors.newScheduledThreadPool(
|
|
|
+ Runtime.getRuntime().availableProcessors(),
|
|
|
+ threadFactory
|
|
|
+ );
|
|
|
+
|
|
|
+ log.info("MqttTemplate initialized with {} interceptors", this.interceptors.size());
|
|
|
+
|
|
|
+ // 添加JVM关闭钩子,确保线程池正确关闭
|
|
|
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
|
|
+ shutdown();
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -37,7 +126,7 @@ public class MqttTemplate {
|
|
|
* @return CompletableFuture<Void>
|
|
|
*/
|
|
|
public CompletableFuture<Void> send(String topic, String payload) {
|
|
|
- return send(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false);
|
|
|
+ return send(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -50,88 +139,270 @@ public class MqttTemplate {
|
|
|
* @return CompletableFuture<Void>
|
|
|
*/
|
|
|
public CompletableFuture<Void> send(String topic, String payload, int qos, boolean retain) {
|
|
|
- return send(topic, payload.getBytes(StandardCharsets.UTF_8), qos, retain);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 发送字节数组消息
|
|
|
- *
|
|
|
- * @param topic 主题
|
|
|
- * @param payload 消息内容
|
|
|
- * @return CompletableFuture<Void>
|
|
|
- */
|
|
|
- public CompletableFuture<Void> send(String topic, byte[] payload) {
|
|
|
- return send(topic, payload, 0, false);
|
|
|
+ CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
+ totalMessagesSent.incrementAndGet();
|
|
|
+
|
|
|
+ // 应用拦截器
|
|
|
+ String processedPayload = payload;
|
|
|
+ for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
+ try {
|
|
|
+ processedPayload = interceptor.beforePublish(topic, processedPayload);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error in message interceptor", e);
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ future.completeExceptionally(e);
|
|
|
+ return future;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ final String finalPayload = processedPayload;
|
|
|
+ if (mqttOutbound != null) {
|
|
|
+ Message<String> message = MessageBuilder
|
|
|
+ .withPayload(finalPayload)
|
|
|
+ .setHeader(MqttHeaders.TOPIC, topic)
|
|
|
+ .setHeader(MqttHeaders.QOS, qos)
|
|
|
+ .setHeader(MqttHeaders.RETAINED, retain)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ mqttOutbound.handleMessage(message);
|
|
|
+
|
|
|
+ // 通知拦截器
|
|
|
+ for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
+ try {
|
|
|
+ interceptor.afterPublish(topic, finalPayload);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error in message interceptor after publish", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ future.complete(null);
|
|
|
+ log.debug("Sent message to topic: {}", topic);
|
|
|
+ } else if (mqttClient != null) {
|
|
|
+ publishWithRetry(topic, finalPayload.getBytes(), qos, retain, future, 0);
|
|
|
+ return future;
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("No MQTT client or message handler available");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+
|
|
|
+ // 使用异常处理器处理异常
|
|
|
+ if (e instanceof MqttException) {
|
|
|
+ final String finalPayload = processedPayload;
|
|
|
+ MqttExceptionHandler.handleException((MqttException) e, ex -> {
|
|
|
+ if (mqttProperties != null && mqttProperties.isRetryOnPublishFailure() &&
|
|
|
+ MqttExceptionHandler.isRecoverableByReconnect((MqttException) e)) {
|
|
|
+ if (retryExecutor != null && !retryExecutor.isShutdown()) {
|
|
|
+ log.info("Scheduling retry for message to topic: {}", topic);
|
|
|
+ retryExecutor.schedule(() -> {
|
|
|
+ retriedMessages.incrementAndGet();
|
|
|
+ send(topic, finalPayload, qos, retain).thenAccept(v -> future.complete(null))
|
|
|
+ .exceptionally(t -> {
|
|
|
+ future.completeExceptionally(t);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ }, mqttProperties.getPublishRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
+ } else {
|
|
|
+ future.completeExceptionally(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ future.completeExceptionally(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ future.completeExceptionally(e);
|
|
|
+ log.error("Failed to send message to topic: {}", topic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return future;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 发送字节数组消息(指定QoS和保留标志)
|
|
|
- *
|
|
|
- * @param topic 主题
|
|
|
+ * 带重试机制的消息发布
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
- * @param qos QoS级别
|
|
|
- * @param retain 是否保留
|
|
|
- * @return CompletableFuture<Void>
|
|
|
+ * @param qos QoS级别
|
|
|
+ * @param retain 是否保留
|
|
|
+ * @param future 完成回调
|
|
|
+ * @param retryCount 当前重试次数
|
|
|
*/
|
|
|
- public CompletableFuture<Void> send(String topic, byte[] payload, int qos, boolean retain) {
|
|
|
- CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
+ private void publishWithRetry(String topic, byte[] payload, int qos, boolean retain,
|
|
|
+ CompletableFuture<Void> future, int retryCount) {
|
|
|
try {
|
|
|
MqttMessage message = new MqttMessage(payload);
|
|
|
message.setQos(qos);
|
|
|
message.setRetained(retain);
|
|
|
-
|
|
|
- // 执行发送前拦截
|
|
|
- boolean shouldSend = interceptors.stream()
|
|
|
- .allMatch(interceptor -> interceptor.preSend(topic, message));
|
|
|
-
|
|
|
- if (shouldSend) {
|
|
|
- mqttClient.publish(topic, message);
|
|
|
- // 执行发送后拦截
|
|
|
- interceptors.forEach(interceptor -> interceptor.postSend(topic, message));
|
|
|
- future.complete(null);
|
|
|
- } else {
|
|
|
- future.completeExceptionally(new RuntimeException("Message sending was intercepted"));
|
|
|
+
|
|
|
+ if (!mqttClient.isConnected()) {
|
|
|
+ if (mqttProperties != null && mqttProperties.isAutomaticReconnect() && reconnectManager != null) {
|
|
|
+ log.warn("MQTT client not connected, triggering reconnect before sending message");
|
|
|
+ reconnectManager.triggerReconnect();
|
|
|
+
|
|
|
+ // 如果启用了重试,则安排重试
|
|
|
+ if (shouldRetry(retryCount)) {
|
|
|
+ retriedMessages.incrementAndGet();
|
|
|
+ scheduleRetry(topic, payload, qos, retain, future, retryCount + 1);
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ future.completeExceptionally(new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ future.completeExceptionally(new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 应用拦截器
|
|
|
+ for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
+ if (!interceptor.preSend(topic, message)) {
|
|
|
+ log.warn("Message sending cancelled by interceptor for topic: {}", topic);
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ future.completeExceptionally(new IllegalStateException("Message sending cancelled by interceptor"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ mqttClient.publish(topic, message);
|
|
|
+ future.complete(null);
|
|
|
+ log.debug("Successfully published message to topic: {}", topic);
|
|
|
+
|
|
|
+ // 通知拦截器
|
|
|
+ for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
+ try {
|
|
|
+ interceptor.postSend(topic, message);
|
|
|
+ interceptor.afterPublish(topic, new String(payload));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error in message interceptor after publish", e);
|
|
|
+ }
|
|
|
}
|
|
|
} catch (MqttException e) {
|
|
|
- future.completeExceptionally(e);
|
|
|
+ // 处理异常
|
|
|
+ String detailedError = MqttExceptionHandler.getDetailedErrorMessage(e);
|
|
|
+ log.error("Failed to publish message to topic {}: {}", topic, detailedError);
|
|
|
+
|
|
|
+ // 如果是可恢复错误并且启用了重试,则安排重试
|
|
|
+ if (MqttExceptionHandler.isRecoverableByReconnect(e) && shouldRetry(retryCount)) {
|
|
|
+ log.info("Will retry publishing message to topic {} (attempt {}/{})",
|
|
|
+ topic, retryCount + 1, mqttProperties.getMaxPublishRetryAttempts());
|
|
|
+ retriedMessages.incrementAndGet();
|
|
|
+ scheduleRetry(topic, payload, qos, retain, future, retryCount + 1);
|
|
|
+ } else {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ future.completeExceptionally(e);
|
|
|
+
|
|
|
+ // 提供解决方案建议
|
|
|
+ String recommendation = MqttExceptionHandler.getRecommendedAction(e);
|
|
|
+ log.info("Recommended action: {}", recommendation);
|
|
|
+ }
|
|
|
}
|
|
|
- return future;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 判断是否应该重试发送消息
|
|
|
+ *
|
|
|
+ * @param retryCount 当前重试次数
|
|
|
+ * @return 是否应该重试
|
|
|
+ */
|
|
|
+ private boolean shouldRetry(int retryCount) {
|
|
|
+ return mqttProperties != null &&
|
|
|
+ mqttProperties.isRetryOnPublishFailure() &&
|
|
|
+ retryCount < mqttProperties.getMaxPublishRetryAttempts();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 安排重试发送消息
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 消息内容
|
|
|
+ * @param qos QoS级别
|
|
|
+ * @param retain 是否保留
|
|
|
+ * @param future 完成回调
|
|
|
+ * @param retryCount 当前重试次数
|
|
|
+ */
|
|
|
+ private void scheduleRetry(String topic, byte[] payload, int qos, boolean retain,
|
|
|
+ CompletableFuture<Void> future, int retryCount) {
|
|
|
+ if (retryExecutor != null && !retryExecutor.isShutdown()) {
|
|
|
+ long delay = calculateRetryDelay(retryCount);
|
|
|
+ log.debug("Scheduling retry {} for topic {} in {} ms", retryCount, topic, delay);
|
|
|
+
|
|
|
+ retryExecutor.schedule(
|
|
|
+ () -> publishWithRetry(topic, payload, qos, retain, future, retryCount),
|
|
|
+ delay,
|
|
|
+ TimeUnit.MILLISECONDS
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ future.completeExceptionally(new IllegalStateException("Retry executor is not available"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 计算重试延迟时间,使用指数退避算法
|
|
|
+ *
|
|
|
+ * @param retryCount 当前重试次数
|
|
|
+ * @return 延迟时间(毫秒)
|
|
|
+ */
|
|
|
+ private long calculateRetryDelay(int retryCount) {
|
|
|
+ if (mqttProperties == null) {
|
|
|
+ return 1000; // 默认1秒
|
|
|
+ }
|
|
|
+
|
|
|
+ long baseDelay = mqttProperties.getPublishRetryInterval();
|
|
|
+ double factor = Math.pow(mqttProperties.getReconnectBackoffMultiplier(), retryCount - 1);
|
|
|
+ long delay = (long)(baseDelay * factor);
|
|
|
+
|
|
|
+ // 确保不超过最大重试间隔
|
|
|
+ return Math.min(delay, mqttProperties.getMaxReconnectInterval());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 发送JSON对象消息
|
|
|
*
|
|
|
* @param topic 主题
|
|
|
- * @param obj 对象
|
|
|
+ * @param payload 消息内容对象
|
|
|
* @return CompletableFuture<Void>
|
|
|
*/
|
|
|
- public CompletableFuture<Void> sendJson(String topic, Object obj) {
|
|
|
- try {
|
|
|
- String json = objectMapper.writeValueAsString(obj);
|
|
|
- return send(topic, json);
|
|
|
- } catch (JsonProcessingException e) {
|
|
|
- CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
- future.completeExceptionally(e);
|
|
|
- return future;
|
|
|
- }
|
|
|
+ public CompletableFuture<Void> sendJson(String topic, Object payload) {
|
|
|
+ return sendJson(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 发送JSON对象消息(指定QoS和保留标志)
|
|
|
*
|
|
|
* @param topic 主题
|
|
|
- * @param obj 对象
|
|
|
+ * @param payload 消息内容对象
|
|
|
* @param qos QoS级别
|
|
|
* @param retain 是否保留
|
|
|
* @return CompletableFuture<Void>
|
|
|
*/
|
|
|
- public CompletableFuture<Void> sendJson(String topic, Object obj, int qos, boolean retain) {
|
|
|
+ public CompletableFuture<Void> sendJson(String topic, Object payload, int qos, boolean retain) {
|
|
|
+ CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
try {
|
|
|
- String json = objectMapper.writeValueAsString(obj);
|
|
|
- return send(topic, json, qos, retain);
|
|
|
- } catch (JsonProcessingException e) {
|
|
|
- CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
+ if (jsonConverter != null) {
|
|
|
+ String json = jsonConverter.toJson(payload);
|
|
|
+ return send(topic, json, qos, retain);
|
|
|
+ } else if (messageConverter != null) {
|
|
|
+ MqttMessage message = messageConverter.toMessage(payload);
|
|
|
+ message.setQos(qos);
|
|
|
+ message.setRetained(retain);
|
|
|
+
|
|
|
+ if (mqttClient != null) {
|
|
|
+ publishWithRetry(topic, message.getPayload(), qos, retain, future, 0);
|
|
|
+ return future;
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("No MQTT client available");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("No message converter available");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
future.completeExceptionally(e);
|
|
|
+ log.error("Failed to convert object to JSON: {}", payload, e);
|
|
|
return future;
|
|
|
}
|
|
|
}
|
|
|
@@ -141,10 +412,10 @@ public class MqttTemplate {
|
|
|
*
|
|
|
* @param topic 主题
|
|
|
* @param payload 消息内容
|
|
|
- * @throws MqttException 发送异常
|
|
|
+ * @throws Exception 发送异常
|
|
|
*/
|
|
|
- public void sendSync(String topic, String payload) throws MqttException {
|
|
|
- sendSync(topic, payload.getBytes(StandardCharsets.UTF_8), 0, false);
|
|
|
+ public void sendSync(String topic, String payload) throws Exception {
|
|
|
+ sendSync(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -154,23 +425,25 @@ public class MqttTemplate {
|
|
|
* @param payload 消息内容
|
|
|
* @param qos QoS级别
|
|
|
* @param retain 是否保留
|
|
|
- * @throws MqttException 发送异常
|
|
|
+ * @throws Exception 发送异常
|
|
|
*/
|
|
|
- public void sendSync(String topic, byte[] payload, int qos, boolean retain) throws MqttException {
|
|
|
- MqttMessage message = new MqttMessage(payload);
|
|
|
- message.setQos(qos);
|
|
|
- message.setRetained(retain);
|
|
|
-
|
|
|
- // 执行发送前拦截
|
|
|
- boolean shouldSend = interceptors.stream()
|
|
|
- .allMatch(interceptor -> interceptor.preSend(topic, message));
|
|
|
-
|
|
|
- if (shouldSend) {
|
|
|
- mqttClient.publish(topic, message);
|
|
|
- // 执行发送后拦截
|
|
|
- interceptors.forEach(interceptor -> interceptor.postSend(topic, message));
|
|
|
- } else {
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION);
|
|
|
+ public void sendSync(String topic, String payload, int qos, boolean retain) throws Exception {
|
|
|
+ try {
|
|
|
+ send(topic, payload, qos, retain).get(
|
|
|
+ mqttProperties != null ? mqttProperties.getCompletionTimeout() : 5000,
|
|
|
+ TimeUnit.MILLISECONDS
|
|
|
+ );
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e);
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ if (e.getCause() instanceof MqttException) {
|
|
|
+ throw (MqttException) e.getCause();
|
|
|
+ } else {
|
|
|
+ throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e.getCause());
|
|
|
+ }
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ throw new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -178,33 +451,89 @@ public class MqttTemplate {
|
|
|
* 同步发送JSON对象消息
|
|
|
*
|
|
|
* @param topic 主题
|
|
|
- * @param obj 对象
|
|
|
- * @throws MqttException 发送异常
|
|
|
+ * @param payload 消息内容对象
|
|
|
+ * @throws Exception 发送异常
|
|
|
*/
|
|
|
- public void sendJsonSync(String topic, Object obj) throws MqttException {
|
|
|
- try {
|
|
|
- String json = objectMapper.writeValueAsString(obj);
|
|
|
- sendSync(topic, json);
|
|
|
- } catch (JsonProcessingException e) {
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e);
|
|
|
- }
|
|
|
+ public void sendJsonSync(String topic, Object payload) throws Exception {
|
|
|
+ sendJsonSync(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 同步发送JSON对象消息(指定QoS和保留标志)
|
|
|
*
|
|
|
* @param topic 主题
|
|
|
- * @param obj 对象
|
|
|
+ * @param payload 消息内容对象
|
|
|
* @param qos QoS级别
|
|
|
* @param retain 是否保留
|
|
|
- * @throws MqttException 发送异常
|
|
|
+ * @throws Exception 发送异常
|
|
|
*/
|
|
|
- public void sendJsonSync(String topic, Object obj, int qos, boolean retain) throws MqttException {
|
|
|
+ public void sendJsonSync(String topic, Object payload, int qos, boolean retain) throws Exception {
|
|
|
try {
|
|
|
- String json = objectMapper.writeValueAsString(obj);
|
|
|
- sendSync(topic, json.getBytes(StandardCharsets.UTF_8), qos, retain);
|
|
|
- } catch (JsonProcessingException e) {
|
|
|
+ sendJson(topic, payload, qos, retain).get(
|
|
|
+ mqttProperties != null ? mqttProperties.getCompletionTimeout() : 5000,
|
|
|
+ TimeUnit.MILLISECONDS
|
|
|
+ );
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e);
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ if (e.getCause() instanceof MqttException) {
|
|
|
+ throw (MqttException) e.getCause();
|
|
|
+ } else {
|
|
|
+ throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e.getCause());
|
|
|
+ }
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ throw new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT, e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取消息发送统计信息
|
|
|
+ *
|
|
|
+ * @return 统计信息
|
|
|
+ */
|
|
|
+ public String getStatistics() {
|
|
|
+ int total = totalMessagesSent.get();
|
|
|
+ int failed = failedMessages.get();
|
|
|
+ int retried = retriedMessages.get();
|
|
|
+ int success = total - failed;
|
|
|
+ double successRate = total > 0 ? (double)success / total * 100 : 0;
|
|
|
+
|
|
|
+ return String.format(
|
|
|
+ "Messages - Total: %d, Success: %d (%.2f%%), Failed: %d, Retried: %d",
|
|
|
+ total, success, successRate, failed, retried
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 重置统计信息
|
|
|
+ */
|
|
|
+ public void resetStatistics() {
|
|
|
+ totalMessagesSent.set(0);
|
|
|
+ failedMessages.set(0);
|
|
|
+ retriedMessages.set(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭资源
|
|
|
+ */
|
|
|
+ private void shutdown() {
|
|
|
+ if (retryExecutor != null && !retryExecutor.isShutdown()) {
|
|
|
+ log.info("Shutting down MQTT template retry executor");
|
|
|
+ retryExecutor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!retryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
|
+ retryExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ retryExecutor.shutdownNow();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void destroy() throws Exception {
|
|
|
+ shutdown();
|
|
|
+ }
|
|
|
}
|