|
@@ -1,540 +0,0 @@
|
|
-package cn.hfln.framework.mqtt.template;
|
|
|
|
-
|
|
|
|
-import cn.hfln.framework.mqtt.config.MqttProperties;
|
|
|
|
-import cn.hfln.framework.mqtt.connection.MqttReconnectManager;
|
|
|
|
-import cn.hfln.framework.mqtt.converter.JsonMessageConverter;
|
|
|
|
-import cn.hfln.framework.mqtt.converter.MessageConverter;
|
|
|
|
-import cn.hfln.framework.mqtt.exception.MqttExceptionHandler;
|
|
|
|
-import cn.hfln.framework.mqtt.interceptor.MqttMessageInterceptor;
|
|
|
|
-import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
|
-import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
|
-import org.springframework.beans.factory.DisposableBean;
|
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
-import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
|
-import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
|
-import org.springframework.messaging.Message;
|
|
|
|
-import org.springframework.messaging.support.MessageBuilder;
|
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
|
-
|
|
|
|
-import java.util.Collections;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.concurrent.*;
|
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
-
|
|
|
|
-/**
|
|
|
|
- * MQTT消息模板
|
|
|
|
- * 提供发送MQTT消息的便捷方法
|
|
|
|
- */
|
|
|
|
-@Slf4j
|
|
|
|
-@Component
|
|
|
|
-public class MqttTemplate implements DisposableBean {
|
|
|
|
- private final MqttPahoMessageHandler mqttOutbound;
|
|
|
|
- private final JsonMessageConverter jsonConverter;
|
|
|
|
- private final List<MqttMessageInterceptor> interceptors;
|
|
|
|
- private final MqttClient mqttClient;
|
|
|
|
- private final MessageConverter messageConverter;
|
|
|
|
- private final MqttProperties mqttProperties;
|
|
|
|
- private final MqttReconnectManager reconnectManager;
|
|
|
|
-
|
|
|
|
- // 用于重试发送消息的线程池
|
|
|
|
- private final ScheduledExecutorService retryExecutor;
|
|
|
|
-
|
|
|
|
- // 消息发送统计
|
|
|
|
- private final AtomicInteger totalMessagesSent = new AtomicInteger(0);
|
|
|
|
- private final AtomicInteger failedMessages = new AtomicInteger(0);
|
|
|
|
- private final AtomicInteger retriedMessages = new AtomicInteger(0);
|
|
|
|
-
|
|
|
|
- @Autowired(required = false)
|
|
|
|
- public MqttTemplate(MqttPahoMessageHandler mqttOutbound, JsonMessageConverter jsonConverter) {
|
|
|
|
- this.mqttOutbound = mqttOutbound;
|
|
|
|
- this.jsonConverter = jsonConverter;
|
|
|
|
- this.interceptors = Collections.emptyList();
|
|
|
|
- this.mqttClient = null;
|
|
|
|
- this.messageConverter = null;
|
|
|
|
- this.mqttProperties = null;
|
|
|
|
- this.reconnectManager = null;
|
|
|
|
- this.retryExecutor = null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Autowired(required = false)
|
|
|
|
- public MqttTemplate(MqttPahoMessageHandler mqttOutbound, JsonMessageConverter jsonConverter,
|
|
|
|
- List<MqttMessageInterceptor> interceptors) {
|
|
|
|
- this.mqttOutbound = mqttOutbound;
|
|
|
|
- this.jsonConverter = jsonConverter;
|
|
|
|
- this.interceptors = interceptors != null ? interceptors : Collections.emptyList();
|
|
|
|
- this.mqttClient = null;
|
|
|
|
- this.messageConverter = null;
|
|
|
|
- this.mqttProperties = null;
|
|
|
|
- this.reconnectManager = null;
|
|
|
|
- this.retryExecutor = null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Autowired(required = false)
|
|
|
|
- public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors,
|
|
|
|
- MessageConverter messageConverter) {
|
|
|
|
- this.mqttClient = mqttClient;
|
|
|
|
- this.messageConverter = messageConverter;
|
|
|
|
- this.interceptors = interceptors != null ? interceptors : Collections.emptyList();
|
|
|
|
- this.mqttOutbound = null;
|
|
|
|
- this.jsonConverter = (messageConverter instanceof JsonMessageConverter) ?
|
|
|
|
- (JsonMessageConverter) messageConverter : null;
|
|
|
|
- this.mqttProperties = null;
|
|
|
|
- this.reconnectManager = null;
|
|
|
|
- this.retryExecutor = null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Autowired(required = false)
|
|
|
|
- public MqttTemplate(MqttClient mqttClient, List<MqttMessageInterceptor> interceptors,
|
|
|
|
- MessageConverter messageConverter, MqttProperties mqttProperties,
|
|
|
|
- MqttReconnectManager reconnectManager) {
|
|
|
|
- this.mqttClient = mqttClient;
|
|
|
|
- this.messageConverter = messageConverter;
|
|
|
|
- this.interceptors = interceptors != null ? interceptors : Collections.emptyList();
|
|
|
|
- this.mqttOutbound = null;
|
|
|
|
- this.jsonConverter = (messageConverter instanceof JsonMessageConverter) ?
|
|
|
|
- (JsonMessageConverter) messageConverter : null;
|
|
|
|
- this.mqttProperties = mqttProperties;
|
|
|
|
- this.reconnectManager = reconnectManager;
|
|
|
|
-
|
|
|
|
- // 创建重试线程池
|
|
|
|
- ThreadFactory threadFactory = r -> {
|
|
|
|
- Thread t = new Thread(r, "mqtt-retry-thread");
|
|
|
|
- t.setDaemon(true);
|
|
|
|
- return t;
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- this.retryExecutor = Executors.newScheduledThreadPool(
|
|
|
|
- Runtime.getRuntime().availableProcessors(),
|
|
|
|
- threadFactory
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
- log.info("MqttTemplate initialized with {} interceptors", this.interceptors.size());
|
|
|
|
-
|
|
|
|
- // 添加JVM关闭钩子,确保线程池正确关闭
|
|
|
|
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
|
|
|
- shutdown();
|
|
|
|
- }));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 发送字符串消息
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容
|
|
|
|
- * @return CompletableFuture<Void>
|
|
|
|
- */
|
|
|
|
- public CompletableFuture<Void> send(String topic, String payload) {
|
|
|
|
- return send(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 发送字符串消息(指定QoS和保留标志)
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容
|
|
|
|
- * @param qos QoS级别
|
|
|
|
- * @param retain 是否保留
|
|
|
|
- * @return CompletableFuture<Void>
|
|
|
|
- */
|
|
|
|
- public CompletableFuture<Void> send(String topic, String payload, int qos, boolean retain) {
|
|
|
|
- CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
|
- totalMessagesSent.incrementAndGet();
|
|
|
|
-
|
|
|
|
- // 应用拦截器
|
|
|
|
- String processedPayload = payload;
|
|
|
|
- for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
|
- try {
|
|
|
|
- processedPayload = interceptor.beforePublish(topic, processedPayload);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("Error in message interceptor", e);
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
- future.completeExceptionally(e);
|
|
|
|
- return future;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- final String finalPayload = processedPayload;
|
|
|
|
- if (mqttOutbound != null) {
|
|
|
|
- Message<String> message = MessageBuilder
|
|
|
|
- .withPayload(finalPayload)
|
|
|
|
- .setHeader(MqttHeaders.TOPIC, topic)
|
|
|
|
- .setHeader(MqttHeaders.QOS, qos)
|
|
|
|
- .setHeader(MqttHeaders.RETAINED, retain)
|
|
|
|
- .build();
|
|
|
|
-
|
|
|
|
- mqttOutbound.handleMessage(message);
|
|
|
|
-
|
|
|
|
- // 通知拦截器
|
|
|
|
- for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
|
- try {
|
|
|
|
- interceptor.afterPublish(topic, finalPayload);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("Error in message interceptor after publish", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- future.complete(null);
|
|
|
|
- log.debug("Sent message to topic: {}", topic);
|
|
|
|
- } else if (mqttClient != null) {
|
|
|
|
- publishWithRetry(topic, finalPayload.getBytes(), qos, retain, future, 0);
|
|
|
|
- return future;
|
|
|
|
- } else {
|
|
|
|
- throw new IllegalStateException("No MQTT client or message handler available");
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
-
|
|
|
|
- // 使用异常处理器处理异常
|
|
|
|
- if (e instanceof MqttException) {
|
|
|
|
- final String finalPayload = processedPayload;
|
|
|
|
- MqttExceptionHandler.handleException((MqttException) e, ex -> {
|
|
|
|
- if (mqttProperties != null && mqttProperties.isRetryOnPublishFailure() &&
|
|
|
|
- MqttExceptionHandler.isRecoverableByReconnect((MqttException) e)) {
|
|
|
|
- if (retryExecutor != null && !retryExecutor.isShutdown()) {
|
|
|
|
- log.info("Scheduling retry for message to topic: {}", topic);
|
|
|
|
- retryExecutor.schedule(() -> {
|
|
|
|
- retriedMessages.incrementAndGet();
|
|
|
|
- send(topic, finalPayload, qos, retain).thenAccept(v -> future.complete(null))
|
|
|
|
- .exceptionally(t -> {
|
|
|
|
- future.completeExceptionally(t);
|
|
|
|
- return null;
|
|
|
|
- });
|
|
|
|
- }, mqttProperties.getPublishRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
- } else {
|
|
|
|
- future.completeExceptionally(e);
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- future.completeExceptionally(e);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- } else {
|
|
|
|
- future.completeExceptionally(e);
|
|
|
|
- log.error("Failed to send message to topic: {}", topic, e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return future;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 带重试机制的消息发布
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容
|
|
|
|
- * @param qos QoS级别
|
|
|
|
- * @param retain 是否保留
|
|
|
|
- * @param future 完成回调
|
|
|
|
- * @param retryCount 当前重试次数
|
|
|
|
- */
|
|
|
|
- private void publishWithRetry(String topic, byte[] payload, int qos, boolean retain,
|
|
|
|
- CompletableFuture<Void> future, int retryCount) {
|
|
|
|
- try {
|
|
|
|
- MqttMessage message = new MqttMessage(payload);
|
|
|
|
- message.setQos(qos);
|
|
|
|
- message.setRetained(retain);
|
|
|
|
-
|
|
|
|
- if (!mqttClient.isConnected()) {
|
|
|
|
- if (mqttProperties != null && mqttProperties.isAutomaticReconnect() && reconnectManager != null) {
|
|
|
|
- log.warn("MQTT client not connected, triggering reconnect before sending message");
|
|
|
|
- reconnectManager.triggerReconnect();
|
|
|
|
-
|
|
|
|
- // 如果启用了重试,则安排重试
|
|
|
|
- if (shouldRetry(retryCount)) {
|
|
|
|
- retriedMessages.incrementAndGet();
|
|
|
|
- scheduleRetry(topic, payload, qos, retain, future, retryCount + 1);
|
|
|
|
- return;
|
|
|
|
- } else {
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
- future.completeExceptionally(new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
- future.completeExceptionally(new MqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // 应用拦截器
|
|
|
|
- for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
|
- if (!interceptor.preSend(topic, message)) {
|
|
|
|
- log.warn("Message sending cancelled by interceptor for topic: {}", topic);
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
- future.completeExceptionally(new IllegalStateException("Message sending cancelled by interceptor"));
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- mqttClient.publish(topic, message);
|
|
|
|
- future.complete(null);
|
|
|
|
- log.debug("Successfully published message to topic: {}", topic);
|
|
|
|
-
|
|
|
|
- // 通知拦截器
|
|
|
|
- for (MqttMessageInterceptor interceptor : interceptors) {
|
|
|
|
- try {
|
|
|
|
- interceptor.postSend(topic, message);
|
|
|
|
- interceptor.afterPublish(topic, new String(payload));
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("Error in message interceptor after publish", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (MqttException e) {
|
|
|
|
- // 处理异常
|
|
|
|
- String detailedError = MqttExceptionHandler.getDetailedErrorMessage(e);
|
|
|
|
- log.error("Failed to publish message to topic {}: {}", topic, detailedError);
|
|
|
|
-
|
|
|
|
- // 如果是可恢复错误并且启用了重试,则安排重试
|
|
|
|
- if (MqttExceptionHandler.isRecoverableByReconnect(e) && shouldRetry(retryCount)) {
|
|
|
|
- log.info("Will retry publishing message to topic {} (attempt {}/{})",
|
|
|
|
- topic, retryCount + 1, mqttProperties.getMaxPublishRetryAttempts());
|
|
|
|
- retriedMessages.incrementAndGet();
|
|
|
|
- scheduleRetry(topic, payload, qos, retain, future, retryCount + 1);
|
|
|
|
- } else {
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
- future.completeExceptionally(e);
|
|
|
|
-
|
|
|
|
- // 提供解决方案建议
|
|
|
|
- String recommendation = MqttExceptionHandler.getRecommendedAction(e);
|
|
|
|
- log.info("Recommended action: {}", recommendation);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 判断是否应该重试发送消息
|
|
|
|
- *
|
|
|
|
- * @param retryCount 当前重试次数
|
|
|
|
- * @return 是否应该重试
|
|
|
|
- */
|
|
|
|
- private boolean shouldRetry(int retryCount) {
|
|
|
|
- return mqttProperties != null &&
|
|
|
|
- mqttProperties.isRetryOnPublishFailure() &&
|
|
|
|
- retryCount < mqttProperties.getMaxPublishRetryAttempts();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 安排重试发送消息
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容
|
|
|
|
- * @param qos QoS级别
|
|
|
|
- * @param retain 是否保留
|
|
|
|
- * @param future 完成回调
|
|
|
|
- * @param retryCount 当前重试次数
|
|
|
|
- */
|
|
|
|
- private void scheduleRetry(String topic, byte[] payload, int qos, boolean retain,
|
|
|
|
- CompletableFuture<Void> future, int retryCount) {
|
|
|
|
- if (retryExecutor != null && !retryExecutor.isShutdown()) {
|
|
|
|
- long delay = calculateRetryDelay(retryCount);
|
|
|
|
- log.debug("Scheduling retry {} for topic {} in {} ms", retryCount, topic, delay);
|
|
|
|
-
|
|
|
|
- retryExecutor.schedule(
|
|
|
|
- () -> publishWithRetry(topic, payload, qos, retain, future, retryCount),
|
|
|
|
- delay,
|
|
|
|
- TimeUnit.MILLISECONDS
|
|
|
|
- );
|
|
|
|
- } else {
|
|
|
|
- future.completeExceptionally(new IllegalStateException("Retry executor is not available"));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 计算重试延迟时间,使用指数退避算法
|
|
|
|
- *
|
|
|
|
- * @param retryCount 当前重试次数
|
|
|
|
- * @return 延迟时间(毫秒)
|
|
|
|
- */
|
|
|
|
- private long calculateRetryDelay(int retryCount) {
|
|
|
|
- if (mqttProperties == null) {
|
|
|
|
- return 1000; // 默认1秒
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- long baseDelay = mqttProperties.getPublishRetryInterval();
|
|
|
|
- double factor = Math.pow(mqttProperties.getReconnectBackoffMultiplier(), retryCount - 1);
|
|
|
|
- long delay = (long)(baseDelay * factor);
|
|
|
|
-
|
|
|
|
- // 确保不超过最大重试间隔
|
|
|
|
- return Math.min(delay, mqttProperties.getMaxReconnectInterval());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 发送JSON对象消息
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容对象
|
|
|
|
- * @return CompletableFuture<Void>
|
|
|
|
- */
|
|
|
|
- public CompletableFuture<Void> sendJson(String topic, Object payload) {
|
|
|
|
- return sendJson(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 发送JSON对象消息(指定QoS和保留标志)
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容对象
|
|
|
|
- * @param qos QoS级别
|
|
|
|
- * @param retain 是否保留
|
|
|
|
- * @return CompletableFuture<Void>
|
|
|
|
- */
|
|
|
|
- public CompletableFuture<Void> sendJson(String topic, Object payload, int qos, boolean retain) {
|
|
|
|
- CompletableFuture<Void> future = new CompletableFuture<>();
|
|
|
|
- try {
|
|
|
|
- if (jsonConverter != null) {
|
|
|
|
- String json = jsonConverter.toJson(payload);
|
|
|
|
- return send(topic, json, qos, retain);
|
|
|
|
- } else if (messageConverter != null) {
|
|
|
|
- MqttMessage message = messageConverter.toMessage(payload);
|
|
|
|
- message.setQos(qos);
|
|
|
|
- message.setRetained(retain);
|
|
|
|
-
|
|
|
|
- if (mqttClient != null) {
|
|
|
|
- publishWithRetry(topic, message.getPayload(), qos, retain, future, 0);
|
|
|
|
- return future;
|
|
|
|
- } else {
|
|
|
|
- throw new IllegalStateException("No MQTT client available");
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- throw new IllegalStateException("No message converter available");
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- failedMessages.incrementAndGet();
|
|
|
|
- future.completeExceptionally(e);
|
|
|
|
- log.error("Failed to convert object to JSON: {}", payload, e);
|
|
|
|
- return future;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 同步发送消息
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容
|
|
|
|
- * @throws Exception 发送异常
|
|
|
|
- */
|
|
|
|
- public void sendSync(String topic, String payload) throws Exception {
|
|
|
|
- sendSync(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 同步发送消息(指定QoS和保留标志)
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容
|
|
|
|
- * @param qos QoS级别
|
|
|
|
- * @param retain 是否保留
|
|
|
|
- * @throws Exception 发送异常
|
|
|
|
- */
|
|
|
|
- public void sendSync(String topic, String payload, int qos, boolean retain) throws Exception {
|
|
|
|
- try {
|
|
|
|
- send(topic, payload, qos, retain).get(
|
|
|
|
- mqttProperties != null ? mqttProperties.getCompletionTimeout() : 5000,
|
|
|
|
- TimeUnit.MILLISECONDS
|
|
|
|
- );
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e);
|
|
|
|
- } catch (ExecutionException e) {
|
|
|
|
- if (e.getCause() instanceof MqttException) {
|
|
|
|
- throw (MqttException) e.getCause();
|
|
|
|
- } else {
|
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e.getCause());
|
|
|
|
- }
|
|
|
|
- } catch (TimeoutException e) {
|
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT, e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 同步发送JSON对象消息
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容对象
|
|
|
|
- * @throws Exception 发送异常
|
|
|
|
- */
|
|
|
|
- public void sendJsonSync(String topic, Object payload) throws Exception {
|
|
|
|
- sendJsonSync(topic, payload, mqttProperties != null ? mqttProperties.getDefaultQos() : 0, false);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 同步发送JSON对象消息(指定QoS和保留标志)
|
|
|
|
- *
|
|
|
|
- * @param topic 主题
|
|
|
|
- * @param payload 消息内容对象
|
|
|
|
- * @param qos QoS级别
|
|
|
|
- * @param retain 是否保留
|
|
|
|
- * @throws Exception 发送异常
|
|
|
|
- */
|
|
|
|
- public void sendJsonSync(String topic, Object payload, int qos, boolean retain) throws Exception {
|
|
|
|
- try {
|
|
|
|
- sendJson(topic, payload, qos, retain).get(
|
|
|
|
- mqttProperties != null ? mqttProperties.getCompletionTimeout() : 5000,
|
|
|
|
- TimeUnit.MILLISECONDS
|
|
|
|
- );
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e);
|
|
|
|
- } catch (ExecutionException e) {
|
|
|
|
- if (e.getCause() instanceof MqttException) {
|
|
|
|
- throw (MqttException) e.getCause();
|
|
|
|
- } else {
|
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION, e.getCause());
|
|
|
|
- }
|
|
|
|
- } catch (TimeoutException e) {
|
|
|
|
- throw new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT, e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 获取消息发送统计信息
|
|
|
|
- *
|
|
|
|
- * @return 统计信息
|
|
|
|
- */
|
|
|
|
- public String getStatistics() {
|
|
|
|
- int total = totalMessagesSent.get();
|
|
|
|
- int failed = failedMessages.get();
|
|
|
|
- int retried = retriedMessages.get();
|
|
|
|
- int success = total - failed;
|
|
|
|
- double successRate = total > 0 ? (double)success / total * 100 : 0;
|
|
|
|
-
|
|
|
|
- return String.format(
|
|
|
|
- "Messages - Total: %d, Success: %d (%.2f%%), Failed: %d, Retried: %d",
|
|
|
|
- total, success, successRate, failed, retried
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 重置统计信息
|
|
|
|
- */
|
|
|
|
- public void resetStatistics() {
|
|
|
|
- totalMessagesSent.set(0);
|
|
|
|
- failedMessages.set(0);
|
|
|
|
- retriedMessages.set(0);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * 关闭资源
|
|
|
|
- */
|
|
|
|
- private void shutdown() {
|
|
|
|
- if (retryExecutor != null && !retryExecutor.isShutdown()) {
|
|
|
|
- log.info("Shutting down MQTT template retry executor");
|
|
|
|
- retryExecutor.shutdown();
|
|
|
|
- try {
|
|
|
|
- if (!retryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
|
|
- retryExecutor.shutdownNow();
|
|
|
|
- }
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- retryExecutor.shutdownNow();
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void destroy() throws Exception {
|
|
|
|
- shutdown();
|
|
|
|
- }
|
|
|
|
-}
|
|
|