|
@@ -0,0 +1,301 @@
|
|
|
+package com.hfln.device.infrastructure.service;
|
|
|
+
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
+import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
+import org.springframework.messaging.Message;
|
|
|
+import org.springframework.messaging.support.MessageBuilder;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+
|
|
|
+/**
|
|
|
+ * MQTT消息缓冲服务
|
|
|
+ * 专门处理实时点位数据,确保数据不丢失
|
|
|
+ * 使用多级缓冲策略和智能重试机制
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+public class MqttMessageBufferService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MqttPahoMessageHandler mqttOutbound;
|
|
|
+
|
|
|
+ // 高优先级队列 - 用于实时点位数据
|
|
|
+ private final BlockingQueue<MqttMessage> highPriorityQueue = new LinkedBlockingQueue<>(10000);
|
|
|
+
|
|
|
+ // 普通优先级队列 - 用于其他消息
|
|
|
+ private final BlockingQueue<MqttMessage> normalPriorityQueue = new LinkedBlockingQueue<>(5000);
|
|
|
+
|
|
|
+ // 死信队列 - 存储发送失败的消息,用于后续重试
|
|
|
+ private final BlockingQueue<MqttMessage> deadLetterQueue = new LinkedBlockingQueue<>(1000);
|
|
|
+
|
|
|
+ // 线程池 - 专门处理MQTT消息发送
|
|
|
+ private final ExecutorService mqttExecutor = Executors.newFixedThreadPool(3, r -> {
|
|
|
+ Thread t = new Thread(r, "mqtt-sender-" + System.currentTimeMillis());
|
|
|
+ t.setDaemon(true);
|
|
|
+ return t;
|
|
|
+ });
|
|
|
+
|
|
|
+ // 控制标志
|
|
|
+ private final AtomicBoolean running = new AtomicBoolean(true);
|
|
|
+
|
|
|
+ // 统计信息
|
|
|
+ private final AtomicLong totalMessages = new AtomicLong(0);
|
|
|
+ private final AtomicLong sentMessages = new AtomicLong(0);
|
|
|
+ private final AtomicLong failedMessages = new AtomicLong(0);
|
|
|
+ private final AtomicLong bufferedMessages = new AtomicLong(0);
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ // 启动消息处理线程
|
|
|
+ startMessageProcessors();
|
|
|
+ log.info("MQTT Message Buffer Service initialized");
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void shutdown() {
|
|
|
+ running.set(false);
|
|
|
+ mqttExecutor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!mqttExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
|
|
|
+ mqttExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ mqttExecutor.shutdownNow();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ log.info("MQTT Message Buffer Service shutdown completed");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送高优先级消息(如实时点位数据)
|
|
|
+ * 如果队列满,会阻塞等待,确保数据不丢失
|
|
|
+ */
|
|
|
+ public void sendHighPriorityMessage(String topic, String payload, int qos, boolean retain) {
|
|
|
+ MqttMessage message = new MqttMessage(topic, payload, qos, retain, System.currentTimeMillis(), true);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 对于高优先级消息,使用阻塞方式,确保不丢失
|
|
|
+ highPriorityQueue.put(message);
|
|
|
+ totalMessages.incrementAndGet();
|
|
|
+ bufferedMessages.incrementAndGet();
|
|
|
+
|
|
|
+ if (log.isTraceEnabled()) {
|
|
|
+ log.trace("High priority message queued: topic={}, queueSize={}", topic, highPriorityQueue.size());
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ log.error("Failed to queue high priority message: {}", topic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送普通优先级消息
|
|
|
+ * 如果队列满,会尝试非阻塞方式,避免阻塞高优先级消息
|
|
|
+ */
|
|
|
+ public boolean sendNormalPriorityMessage(String topic, String payload, int qos, boolean retain) {
|
|
|
+ MqttMessage message = new MqttMessage(topic, payload, qos, retain, System.currentTimeMillis(), false);
|
|
|
+
|
|
|
+ // 对于普通消息,使用非阻塞方式
|
|
|
+ boolean offered = normalPriorityQueue.offer(message);
|
|
|
+ if (offered) {
|
|
|
+ totalMessages.incrementAndGet();
|
|
|
+ bufferedMessages.incrementAndGet();
|
|
|
+
|
|
|
+ if (log.isTraceEnabled()) {
|
|
|
+ log.trace("Normal priority message queued: topic={}, queueSize={}", topic, normalPriorityQueue.size());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("Normal priority queue is full, message dropped: topic={}", topic);
|
|
|
+ }
|
|
|
+
|
|
|
+ return offered;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动消息处理器
|
|
|
+ */
|
|
|
+ private void startMessageProcessors() {
|
|
|
+ // 高优先级消息处理器
|
|
|
+ mqttExecutor.submit(() -> processHighPriorityMessages());
|
|
|
+
|
|
|
+ // 普通优先级消息处理器
|
|
|
+ mqttExecutor.submit(() -> processNormalPriorityMessages());
|
|
|
+
|
|
|
+ // 死信队列重试处理器
|
|
|
+ mqttExecutor.submit(() -> processDeadLetterQueue());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理高优先级消息
|
|
|
+ */
|
|
|
+ private void processHighPriorityMessages() {
|
|
|
+ while (running.get()) {
|
|
|
+ try {
|
|
|
+ MqttMessage message = highPriorityQueue.take();
|
|
|
+ if (sendMessageWithRetry(message)) {
|
|
|
+ sentMessages.incrementAndGet();
|
|
|
+ bufferedMessages.decrementAndGet();
|
|
|
+ } else {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ // 高优先级消息失败后放入死信队列重试
|
|
|
+ deadLetterQueue.offer(message);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error processing high priority message", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理普通优先级消息
|
|
|
+ */
|
|
|
+ private void processNormalPriorityMessages() {
|
|
|
+ while (running.get()) {
|
|
|
+ try {
|
|
|
+ MqttMessage message = normalPriorityQueue.poll(100, TimeUnit.MILLISECONDS);
|
|
|
+ if (message != null) {
|
|
|
+ if (sendMessageWithRetry(message)) {
|
|
|
+ sentMessages.incrementAndGet();
|
|
|
+ bufferedMessages.decrementAndGet();
|
|
|
+ } else {
|
|
|
+ failedMessages.incrementAndGet();
|
|
|
+ // 普通消息失败后也放入死信队列重试
|
|
|
+ deadLetterQueue.offer(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error processing normal priority message", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理死信队列
|
|
|
+ */
|
|
|
+ private void processDeadLetterQueue() {
|
|
|
+ while (running.get()) {
|
|
|
+ try {
|
|
|
+ MqttMessage message = deadLetterQueue.poll(5000, TimeUnit.MILLISECONDS);
|
|
|
+ if (message != null) {
|
|
|
+ // 延迟重试,避免立即重试
|
|
|
+ if (System.currentTimeMillis() - message.getTimestamp() > 10000) { // 10秒后重试
|
|
|
+ if (sendMessageWithRetry(message)) {
|
|
|
+ sentMessages.incrementAndGet();
|
|
|
+ log.info("Dead letter message retry successful: topic={}", message.getTopic());
|
|
|
+ } else {
|
|
|
+ // 重试失败,重新放入死信队列
|
|
|
+ deadLetterQueue.offer(message);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 时间未到,重新放入队列
|
|
|
+ deadLetterQueue.offer(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error processing dead letter queue", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息并重试
|
|
|
+ */
|
|
|
+ private boolean sendMessageWithRetry(MqttMessage message) {
|
|
|
+ int maxRetries = message.isHighPriority() ? 5 : 3; // 高优先级消息重试更多次
|
|
|
+
|
|
|
+ for (int attempt = 1; attempt <= maxRetries; attempt++) {
|
|
|
+ try {
|
|
|
+ Message<String> springMessage = MessageBuilder
|
|
|
+ .withPayload(message.getPayload())
|
|
|
+ .setHeader(MqttHeaders.TOPIC, message.getTopic())
|
|
|
+ .setHeader(MqttHeaders.QOS, message.getQos())
|
|
|
+ .setHeader(MqttHeaders.RETAINED, message.isRetain())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ mqttOutbound.handleMessage(springMessage);
|
|
|
+ return true;
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (attempt < maxRetries) {
|
|
|
+ try {
|
|
|
+ // 指数退避重试
|
|
|
+ long delay = Math.min(1000L * (1L << (attempt - 1)), 10000L);
|
|
|
+ Thread.sleep(delay);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.error("Failed to send message after {} attempts: topic={}, error={}",
|
|
|
+ maxRetries, message.getTopic(), e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取队列状态信息
|
|
|
+ */
|
|
|
+ public String getQueueStatus() {
|
|
|
+ return String.format("Queue Status - HighPriority: %d, Normal: %d, DeadLetter: %d, Total: %d, Sent: %d, Failed: %d, Buffered: %d",
|
|
|
+ highPriorityQueue.size(), normalPriorityQueue.size(), deadLetterQueue.size(),
|
|
|
+ totalMessages.get(), sentMessages.get(), failedMessages.get(), bufferedMessages.get());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定时输出队列状态
|
|
|
+ */
|
|
|
+ @Scheduled(fixedRate = 30000) // 每30秒输出一次状态
|
|
|
+ public void logQueueStatus() {
|
|
|
+ if (log.isInfoEnabled()) {
|
|
|
+ log.info(getQueueStatus());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MQTT消息内部类
|
|
|
+ */
|
|
|
+ private static class MqttMessage {
|
|
|
+ private final String topic;
|
|
|
+ private final String payload;
|
|
|
+ private final int qos;
|
|
|
+ private final boolean retain;
|
|
|
+ private final long timestamp;
|
|
|
+ private final boolean highPriority;
|
|
|
+
|
|
|
+ public MqttMessage(String topic, String payload, int qos, boolean retain, long timestamp, boolean highPriority) {
|
|
|
+ this.topic = topic;
|
|
|
+ this.payload = payload;
|
|
|
+ this.qos = qos;
|
|
|
+ this.retain = retain;
|
|
|
+ this.timestamp = timestamp;
|
|
|
+ this.highPriority = highPriority;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Getters
|
|
|
+ public String getTopic() { return topic; }
|
|
|
+ public String getPayload() { return payload; }
|
|
|
+ public int getQos() { return qos; }
|
|
|
+ public boolean isRetain() { return retain; }
|
|
|
+ public long getTimestamp() { return timestamp; }
|
|
|
+ public boolean isHighPriority() { return highPriority; }
|
|
|
+ }
|
|
|
+}
|