Browse Source

线程池 改写

chejianzheng 2 months ago
parent
commit
60361c5b98

+ 5 - 0
device-service-common/src/main/java/com/hfln/device/common/constant/mqtt/topic/MqttTopics.java

@@ -17,14 +17,19 @@ public class MqttTopics {
      */
     public static final String DEV_ALL = "/dev/#";
     public static final String DEV_LOGIN = "/dev/+/login";
+    public static final String SHARE_DEV_LOGIN = "$share/device//dev/+/login";
     public static final String DEV_KEEPALIVE = "/dev/+/keepalive";
+    public static final String SHARE_DEV_KEEPALIVE = "$share/device//dev/+/keepalive";
     public static final String DEV_REP_DEV_INFO = "/dev/+/report_device_info";
     public static final String DEV_REP_DEV_PARAM = "/dev/+/report_device_param";
+    public static final String SHARE_DEV_REP_DEV_PARAM = "$share/device//dev/+/report_device_param";
     public static final String DEV_CLOUDPOINT = "/dev/+/cloudpoint";
     public static final String DEV_REP_FALL_EVENT = "/dev/+/report_falling_event";
     public static final String DEV_REP_PRES_EVENT = "/dev/+/report_presence_event";
     public static final String DEV_DSP_DATA = "/dev/+/dsp_data";
+    public static final String SHARE_DEV_DSP_DATA = "$share/device//dev/+/dsp_data";
     public static final String DEV_DISCONNECT = "/dev/+/disconnect";
+    public static final String SHARE_DEV_DISCONNECT = "$share/device//dev/+/disconnect";
     public static final String DEV_UPDATE_FIRMWARE = "/dev/+/update_firmware";
     public static final String DEV_REBOOT = "/dev/+/reboot";
     public static final String DEV_REP_DEBUG_PARAM = "/dev/+/report_debug_param";

+ 6 - 0
device-service-infrastructure/pom.xml

@@ -75,6 +75,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.phantomthief</groupId>
+            <artifactId>more-lambdas</artifactId>
+            <version>0.1.55</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 11 - 5
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/config/MqttConfig.java

@@ -117,13 +117,19 @@ public class MqttConfig {
     @Bean
     public MessageProducer deviceInbound() {
         String[] topics = {
-            MqttTopics.DEV_LOGIN,
-            MqttTopics.DEV_KEEPALIVE,
-            MqttTopics.DEV_DSP_DATA,
+//            MqttTopics.DEV_LOGIN,
+//            MqttTopics.DEV_KEEPALIVE,
+//            MqttTopics.DEV_DSP_DATA,
+//            MqttTopics.DEV_REP_DEV_PARAM,
+//            MqttTopics.DEV_DISCONNECT,
+
+            MqttTopics.SHARE_DEV_LOGIN,
+            MqttTopics.SHARE_DEV_KEEPALIVE,
+            MqttTopics.SHARE_DEV_DSP_DATA,
+            MqttTopics.SHARE_DEV_REP_DEV_PARAM,
+            MqttTopics.SHARE_DEV_DISCONNECT,
 //            MqttTopics.DEV_CLOUDPOINT,
 //            MqttTopics.DEV_REP_DEV_INFO,
-            MqttTopics.DEV_REP_DEV_PARAM,
-            MqttTopics.DEV_DISCONNECT,
 //            MqttTopics.DEV_REP_FALL_EVENT,
 //            MqttTopics.DEV_REP_PRES_EVENT,
 //            MqttTopics.DEV_SET_DEBUG,

+ 35 - 0
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/config/TopicDealExecutor.java

@@ -0,0 +1,35 @@
+package com.hfln.device.infrastructure.config;
+import com.github.phantomthief.pool.KeyAffinityExecutor;
+import com.github.phantomthief.util.ThrowableRunnable;
+import org.springframework.stereotype.Component;
+
+// 优化配置(适用于万级Key+10万QPS)
+// 处理设备实时点位数据
+@Component
+public class TopicDealExecutor {
+
+    // 核心配置参数
+    private static final int PARALLELISM = 6;   // 并行线程数(建议CPU核心数×4)
+    private static final int QUEUE_SIZE = 256;   // 每个线程队列容量 (建议: 线程数×500)
+    private static final int MAX_KEY_PER_THREAD = 100; // 单线程处理Key上限
+
+    private final KeyAffinityExecutor executor = KeyAffinityExecutor.newSerializingExecutor(
+            PARALLELISM,
+            QUEUE_SIZE,
+            "TopicSub-%d"
+    );
+
+    public void submitTask(Object key, ThrowableRunnable<Exception> task) {
+
+        executor.executeEx(key, task);
+    }
+
+    public void close() {
+        try {
+            executor.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

+ 105 - 31
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/DeviceMessageHandler.java

@@ -3,6 +3,7 @@ package com.hfln.device.infrastructure.mqtt.handler;
 import com.hfln.device.common.util.JsonUtil;
 import com.hfln.device.domain.port.DeviceEventPort;
 import com.hfln.device.domain.service.DeviceManagerService;
+import com.hfln.device.infrastructure.config.TopicDealExecutor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -12,6 +13,7 @@ import org.springframework.stereotype.Component;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -51,6 +53,9 @@ public class DeviceMessageHandler {
     @Autowired
     private DeviceManagerService deviceManagerService;
 
+    @Autowired
+    private TopicDealExecutor topicDealExecutor;
+
     /**
      * MQTT消息统一入口处理方法
      * 
@@ -72,32 +77,105 @@ public class DeviceMessageHandler {
                 return;
             }
 
-            log.debug("Received device message: topic={}, payload={}", topic, payload);
+//            log.debug("Received device message: topic={}, payload={}", topic, payload);
 
             // 根据主题路由到不同的处理方法
             // 提取主题的操作部分(最后一段)
-            String action = extractActionFromTopic(topic);
-            if (action != null) {
-                switch (action) {
-                    case "login":
-                        handleDeviceLogin(topic, payload);
-                        break;
-                    case "keepalive":
-                        handleDeviceKeepAlive(topic, payload);
-                        break;
-                    case "report_device_info":
-                        handleDeviceReportDeviceInfo(topic, payload);
-                        break;
-                    case "report_device_param":
-                        handleDeviceReportDeviceParam(topic, payload);
-                        break;
-                    case "dsp_data":
-                        handleDeviceDspData(topic, payload);
-                        break;
+            String[] parts = topic.split("/");
+            String action = parts[parts.length - 1];
+            String devId = parts[parts.length - 2];
+
+            try {
+
+                topicDealExecutor.submitTask(devId, () -> this.handleMessage(action, topic, payload));
+            } catch (RejectedExecutionException e) {
+
+                log.error("Rejected execution message, devId:{}, topic:{}", devId, topic, e);
+                try {
+                    this.handleMessage(action, topic, payload);
+                } catch (Exception ex) {
+                    log.error("Error handling message", ex);
+                }
+            } catch (Exception e) {
+                log.error("Error handling message, devId:{}, topic:{}", devId, topic, e);
+            }
+//
+//            if (action != null) {
+//                switch (action) {
+//                    case "login":
+//                        handleDeviceLogin(topic, payload);
+//                        break;
+//                    case "keepalive":
+//                        handleDeviceKeepAlive(topic, payload);
+//                        break;
+//                    case "report_device_info":
+//                        handleDeviceReportDeviceInfo(topic, payload);
+//                        break;
+//                    case "report_device_param":
+//                        handleDeviceReportDeviceParam(topic, payload);
+//                        break;
+//                    case "dsp_data":
+//                        handleDeviceDspData(topic, payload);
+//                        break;
+//
+//                    case "disconnect":
+//                        handleDeviceDisconnect(topic, payload);
+//                        break;
+////                    case "cloudpoint":
+////                        // todo 目前没有 lna 设备,暂不考虑点云数据改造
+////                        handleDeviceCloudPoint(topic, payload);
+////                        break;
+////                    case "report_falling_event":
+////                        // todo 这个 主题 待确认是否废弃
+////                        handleDeviceReportFallEvent(topic, payload);
+////                        break;
+////                    case "report_presence_event":
+////                        handleDeviceReportPresenceEvent(topic, payload);
+////                        break;
+////
+////                        // todo 待确认是否废弃
+////                    case "set_debug_param":
+////                        handleSetDebugParam(topic, payload);
+////                        break;
+////                    case "get_debug_param":
+////                        handleGetDebugParam(topic, payload);
+////                        break;
+//                    default:
+//                        log.debug("Unhandled device topic action: {} for topic: {}", action, topic);
+//                        break;
+//                }
+//            } else {
+//                log.debug("Could not extract action from device topic: {}", topic);
+//            }
+
+        } catch (Exception e) {
+            log.error("Error handling device message: {}", e.getMessage(), e);
+        }
+    }
 
-                    case "disconnect":
-                        handleDeviceDisconnect(topic, payload);
-                        break;
+    private void handleMessage(String action, String topic, String payload) {
+
+        if (action != null) {
+            switch (action) {
+                case "login":
+                    handleDeviceLogin(topic, payload);
+                    break;
+                case "keepalive":
+                    handleDeviceKeepAlive(topic, payload);
+                    break;
+                case "report_device_info":
+                    handleDeviceReportDeviceInfo(topic, payload);
+                    break;
+                case "report_device_param":
+                    handleDeviceReportDeviceParam(topic, payload);
+                    break;
+                case "dsp_data":
+                    handleDeviceDspData(topic, payload);
+                    break;
+
+                case "disconnect":
+                    handleDeviceDisconnect(topic, payload);
+                    break;
 //                    case "cloudpoint":
 //                        // todo 目前没有 lna 设备,暂不考虑点云数据改造
 //                        handleDeviceCloudPoint(topic, payload);
@@ -117,16 +195,12 @@ public class DeviceMessageHandler {
 //                    case "get_debug_param":
 //                        handleGetDebugParam(topic, payload);
 //                        break;
-                    default:
-                        log.debug("Unhandled device topic action: {} for topic: {}", action, topic);
-                        break;
-                }
-            } else {
-                log.debug("Could not extract action from device topic: {}", topic);
+                default:
+                    log.debug("Unhandled device topic action: {} for topic: {}", action, topic);
+                    break;
             }
-
-        } catch (Exception e) {
-            log.error("Error handling device message: {}", e.getMessage(), e);
+        } else {
+            log.debug("Could not extract action from device topic: {}", topic);
         }
     }
 

+ 30 - 0
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/ExecutorTest.java

@@ -0,0 +1,30 @@
+package com.hfln.device.infrastructure.mqtt.handler;
+
+
+import com.hfln.device.infrastructure.config.TopicDealExecutor;
+
+import java.time.LocalDateTime;
+
+public class ExecutorTest {
+
+    public static void main(String[] args) {
+
+        TopicDealExecutor executor = new TopicDealExecutor();
+
+        for (int i = 0; i < 100; i++) {
+
+            int key = (int) (10 * Math.random());
+            executor.submitTask(key, () -> {
+
+                Thread thread = Thread.currentThread();
+                System.out.println("key = " + key + " thread = " + thread.getName() + " time = " + LocalDateTime.now());
+                Thread.sleep(50000);
+            });
+        }
+
+        System.out.println("main thread end , time = " + LocalDateTime.now());
+        executor.close();
+        System.out.println("main thread executor end , time = " + LocalDateTime.now());
+
+    }
+}