浏览代码

portal mqtt消费 --》 线程池
las 对接接口定义

chejianzheng 2 月之前
父节点
当前提交
f2f218811b
共有 20 个文件被更改,包括 491 次插入41 次删除
  1. 45 0
      portal-service-application/src/main/java/com/hfln/portal/application/controller/las/AlarmController.java
  2. 49 0
      portal-service-application/src/main/java/com/hfln/portal/application/controller/web/WebAlarmController.java
  3. 1 0
      portal-service-common/src/main/java/com/hfln/portal/common/constant/mqtt/topic/TopicConstants.java
  4. 48 0
      portal-service-common/src/main/java/com/hfln/portal/common/dto/data/event/AlarmPlanDTO.java
  5. 12 0
      portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmEventNoticeReq.java
  6. 19 0
      portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanDelReq.java
  7. 12 0
      portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanQueryReq.java
  8. 12 0
      portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanReq.java
  9. 12 0
      portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanSaveReq.java
  10. 17 0
      portal-service-domain/src/main/java/com/hfln/portal/domain/gateway/AlarmGateway.java
  11. 16 0
      portal-service-domain/src/main/java/com/hfln/portal/domain/gateway/LasGateway.java
  12. 6 3
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/config/MqttConfig.java
  13. 34 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/gateway/impl/AlarmGatewayImpl.java
  14. 40 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/gateway/impl/LasGatewayImpl.java
  15. 9 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mapper/AlarmPlanMapper.java
  16. 11 38
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttSubHandle.java
  17. 79 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/po/AlarmPlan.java
  18. 8 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/service/AlarmPlanService.java
  19. 12 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/service/impl/AlarmPlanServiceImpl.java
  20. 49 0
      portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/threadpool/ThreadPoolConfig.java

+ 45 - 0
portal-service-application/src/main/java/com/hfln/portal/application/controller/las/AlarmController.java

@@ -0,0 +1,45 @@
+
+package com.hfln.portal.application.controller.las;
+
+
+import cn.hfln.framework.catchlog.CatchAndLog;
+import cn.hfln.framework.dto.ApiResult;
+import com.hfln.portal.common.dto.data.event.AlarmPlanDTO;
+import com.hfln.portal.common.request.event.AlarmEventNoticeReq;
+import com.hfln.portal.common.request.event.AlarmPlanReq;
+import com.hfln.portal.domain.gateway.LasGateway;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.validation.Valid;
+import java.util.List;
+
+@RestController
+@CatchAndLog
+@Tag(name = "告警controller")
+@Slf4j
+@RequestMapping("/alarm")
+public class AlarmController {
+
+    @Autowired
+    private LasGateway lasGateway;
+
+    @Operation(summary = "告警计划查询")
+    @PostMapping("/plan/query")
+    public ApiResult<List<AlarmPlanDTO>> queryPlan(@Valid @RequestBody AlarmPlanReq req) {
+        return ApiResult.success(lasGateway.queryPlan(req));
+    }
+
+    @Operation(summary = "告警事件通知")
+    @PostMapping("/event/notice")
+    public ApiResult<Void> noticeEvent(@Valid @RequestBody AlarmEventNoticeReq req) {
+        return ApiResult.success(lasGateway.noticeEvent(req));
+    }
+
+}

+ 49 - 0
portal-service-application/src/main/java/com/hfln/portal/application/controller/web/WebAlarmController.java

@@ -0,0 +1,49 @@
+package com.hfln.portal.application.controller.web;
+
+
+import cn.hfln.framework.catchlog.CatchAndLog;
+import cn.hfln.framework.dto.ApiResult;
+import com.hfln.portal.common.dto.data.event.AlarmPlanDTO;
+import com.hfln.portal.common.request.event.AlarmPlanDelReq;
+import com.hfln.portal.common.request.event.AlarmPlanQueryReq;
+import com.hfln.portal.common.request.event.AlarmPlanSaveReq;
+import com.hfln.portal.domain.gateway.AlarmGateway;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import javax.validation.Valid;
+import java.util.List;
+
+@RestController
+@CatchAndLog
+@Tag(name = "web端告警相关")
+@Slf4j
+@RequestMapping("/web/alarm")
+public class WebAlarmController {
+
+    @Autowired
+    private AlarmGateway alarmGateway;
+
+    @GetMapping("/plan/query")
+    @Operation(summary = "告警计划查询")
+    public ApiResult<List<AlarmPlanDTO>> queryPlan(@Valid @RequestBody AlarmPlanQueryReq req) {
+        return ApiResult.success(alarmGateway.queryPlan(req));
+    }
+
+    @PostMapping("/plan/save")
+    @Operation(summary = "告警计划保存")
+    public ApiResult<Void> savePlan(@Valid @RequestBody AlarmPlanSaveReq req) {
+        alarmGateway.savePlan(req);
+        return ApiResult.success();
+    }
+
+    @PostMapping("/plan/del")
+    @Operation(summary = "告警计划删除")
+    public ApiResult<Void> savePlan(@Valid @RequestBody AlarmPlanDelReq req) {
+        alarmGateway.delPlan(req);
+        return ApiResult.success();
+    }
+}

+ 1 - 0
portal-service-common/src/main/java/com/hfln/portal/common/constant/mqtt/topic/TopicConstants.java

@@ -35,6 +35,7 @@ public interface TopicConstants {
     String TOPIC_DAS_ALARM_EVENT = "/das/alarm_event";
     // 实时点位
     String TOPIC_DAS_REALTIME_POS = "/das/realtime_pos";
+    String TOPIC_DAS_REALTIME_POS_2 = "/das/+/realtime_pos";
     // 信息更新
     String TOPIC_DAS_DEV_STATUS = "/das/dev_status";
     // 存在事件

+ 48 - 0
portal-service-common/src/main/java/com/hfln/portal/common/dto/data/event/AlarmPlanDTO.java

@@ -0,0 +1,48 @@
+package com.hfln.portal.common.dto.data.event;
+
+import com.hfln.portal.common.vo.BaseDto;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class AlarmPlanDTO extends BaseDto {
+
+    /**
+     * 告警表主键ID
+     */
+    @Schema(description = "告警表主键ID")
+    private Long alarmEventId;
+
+    /**
+     * 设备表主键ID
+     */
+    @Schema(description = "设备表主键ID")
+    private Long devId;
+
+    /**
+     * 姿势
+     */
+    @Schema(description = "姿势 字典值 person_pose")
+    private Byte pose;
+
+    /**
+     * 停留时间表主键id
+     */
+    @Schema(description = "事件表主键id")
+    private Long stayTimeId;
+
+    /**
+     * 告警类型
+     */
+    @Schema(description = "告警类型 字典值 alarm_event_type")
+    private Integer eventType;
+
+
+    /**
+     * 是否处理
+     */
+    @Schema(description = "是否处理:0-未处理,1-已处理")
+    private Integer isHandle;
+} 

+ 12 - 0
portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmEventNoticeReq.java

@@ -0,0 +1,12 @@
+package com.hfln.portal.common.request.event;
+
+import com.hfln.portal.common.vo.BaseVO;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class AlarmEventNoticeReq extends BaseVO {
+
+}

+ 19 - 0
portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanDelReq.java

@@ -0,0 +1,19 @@
+package com.hfln.portal.common.request.event;
+
+import com.hfln.portal.common.vo.BaseVO;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class AlarmPlanDelReq extends BaseVO {
+
+    @Schema(description = "主键ID")
+    private Long alarmPlanId;
+
+    @Schema(description = "设备ID")
+    private Long devId;
+
+}

+ 12 - 0
portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanQueryReq.java

@@ -0,0 +1,12 @@
+package com.hfln.portal.common.request.event;
+
+import com.hfln.portal.common.vo.BaseVO;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class AlarmPlanQueryReq extends BaseVO {
+
+}

+ 12 - 0
portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanReq.java

@@ -0,0 +1,12 @@
+package com.hfln.portal.common.request.event;
+
+import com.hfln.portal.common.vo.BaseVO;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class AlarmPlanReq extends BaseVO {
+
+}

+ 12 - 0
portal-service-common/src/main/java/com/hfln/portal/common/request/event/AlarmPlanSaveReq.java

@@ -0,0 +1,12 @@
+package com.hfln.portal.common.request.event;
+
+import com.hfln.portal.common.vo.BaseVO;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@EqualsAndHashCode(callSuper = true)
+@Data
+public class AlarmPlanSaveReq extends BaseVO {
+
+}

+ 17 - 0
portal-service-domain/src/main/java/com/hfln/portal/domain/gateway/AlarmGateway.java

@@ -0,0 +1,17 @@
+package com.hfln.portal.domain.gateway;
+
+import com.hfln.portal.common.dto.data.event.AlarmPlanDTO;
+import com.hfln.portal.common.request.event.AlarmPlanDelReq;
+import com.hfln.portal.common.request.event.AlarmPlanQueryReq;
+import com.hfln.portal.common.request.event.AlarmPlanSaveReq;
+
+import java.util.List;
+
+public interface AlarmGateway {
+
+    List<AlarmPlanDTO> queryPlan(AlarmPlanQueryReq req);
+
+    void savePlan(AlarmPlanSaveReq req);
+
+    void delPlan(AlarmPlanDelReq req);
+}

+ 16 - 0
portal-service-domain/src/main/java/com/hfln/portal/domain/gateway/LasGateway.java

@@ -0,0 +1,16 @@
+package com.hfln.portal.domain.gateway;
+
+import com.hfln.portal.common.dto.data.event.AlarmPlanDTO;
+import com.hfln.portal.common.request.event.AlarmEventNoticeReq;
+import com.hfln.portal.common.request.event.AlarmPlanReq;
+
+import java.util.List;
+
+public interface LasGateway {
+
+
+    List<AlarmPlanDTO> queryPlan(AlarmPlanReq req);
+
+    Void noticeEvent(AlarmEventNoticeReq req);
+}
+

+ 6 - 3
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/config/MqttConfig.java

@@ -4,6 +4,7 @@ import com.hfln.portal.common.constant.mqtt.topic.TopicConstants;
 import com.hfln.portal.infrastructure.mqtt.MqttSubHandle;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -21,6 +22,7 @@ import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.MessageHandler;
 import org.springframework.messaging.MessagingException;
 import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 /**
@@ -116,7 +118,8 @@ public class MqttConfig {
                 TopicConstants.TOPIC_DAS_ALARM_EVENT,
                 TopicConstants.TOPIC_DAS_DEV_STATUS,
                 TopicConstants.TOPIC_DAS_EXIST,
-                TopicConstants.TOPIC_DAS_REALTIME_POS
+//                TopicConstants.TOPIC_DAS_REALTIME_POS
+                TopicConstants.TOPIC_DAS_REALTIME_POS_2
 
                 , TopicConstants.TOPIC_MQTT_CLIENT_CONNECT
         };
@@ -134,11 +137,11 @@ public class MqttConfig {
 
     @Bean
     @ServiceActivator(inputChannel = "inputChannel")
-    public MessageHandler dasMqttMessageHandler(MqttSubHandle handler) {
+    public MessageHandler dasMqttMessageHandler(MqttSubHandle handler, @Qualifier("mqttSubExecutor") ThreadPoolTaskExecutor mqttSubExecutor) {
         return new MessageHandler() {
             @Override
             public void handleMessage(Message<?> message) throws MessagingException {
-                handler.handleMessage(message);
+                mqttSubExecutor.execute(() -> handler.handleMessage(message));
             }
         };
     }

+ 34 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/gateway/impl/AlarmGatewayImpl.java

@@ -0,0 +1,34 @@
+package com.hfln.portal.infrastructure.gateway.impl;
+
+import com.hfln.portal.common.dto.data.event.AlarmPlanDTO;
+import com.hfln.portal.common.request.event.AlarmPlanDelReq;
+import com.hfln.portal.common.request.event.AlarmPlanQueryReq;
+import com.hfln.portal.common.request.event.AlarmPlanSaveReq;
+import com.hfln.portal.domain.gateway.AlarmGateway;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+@Service
+public class AlarmGatewayImpl implements AlarmGateway {
+
+
+
+    @Override
+    public List<AlarmPlanDTO> queryPlan(AlarmPlanQueryReq req) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void savePlan(AlarmPlanSaveReq req) {
+
+    }
+
+    @Override
+    public void delPlan(AlarmPlanDelReq req) {
+
+    }
+}

+ 40 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/gateway/impl/LasGatewayImpl.java

@@ -0,0 +1,40 @@
+package com.hfln.portal.infrastructure.gateway.impl;
+
+import com.hfln.portal.common.dto.data.event.AlarmPlanDTO;
+import com.hfln.portal.common.request.event.AlarmEventNoticeReq;
+import com.hfln.portal.common.request.event.AlarmPlanReq;
+import com.hfln.portal.domain.gateway.LasGateway;
+import com.hfln.portal.infrastructure.service.AlarmEventService;
+import com.hfln.portal.infrastructure.service.AlarmPlanService;
+import com.hfln.portal.infrastructure.service.StayTimeService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+@Service
+public class LasGatewayImpl implements LasGateway {
+
+    @Autowired
+    private AlarmPlanService alarmPlanService;
+
+    @Autowired
+    private StayTimeService stayTimeService;
+
+    @Autowired
+    private AlarmEventService alarmEventService;
+
+
+    @Override
+    public List<AlarmPlanDTO> queryPlan(AlarmPlanReq req) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Void noticeEvent(AlarmEventNoticeReq req) {
+        return null;
+    }
+}

+ 9 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mapper/AlarmPlanMapper.java

@@ -0,0 +1,9 @@
+package com.hfln.portal.infrastructure.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.hfln.portal.infrastructure.po.AlarmPlan;
+import org.apache.ibatis.annotations.Mapper;
+
+@Mapper
+public interface AlarmPlanMapper extends BaseMapper<AlarmPlan> {
+} 

+ 11 - 38
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/mqtt/MqttSubHandle.java

@@ -85,24 +85,28 @@ public class MqttSubHandle {
                 return;
             }
 
+            String[] parts = topic.split("/");
+            String action = parts[parts.length - 1];
+//            String devId = parts[parts.length - 2];
+
             // 根据主题路由到不同的处理方法
-            switch (topic) {
-                case TopicConstants.TOPIC_DAS_EVENT:
+            switch (action) {
+                case "event":
                     subDasEvent(topic, payload);
                     break;
-                case TopicConstants.TOPIC_DAS_ALARM_EVENT:
+                case "alarm_event":
                     subDasAlarmEvent(topic, payload);
                     break;
-                case TopicConstants.TOPIC_DAS_REALTIME_POS:
+                case "realtime_pos":
                     subDasRealtimePos(topic, payload);
                     break;
-                case TopicConstants.TOPIC_DAS_DEV_STATUS:
+                case "dev_status":
                     subDasDevStatus(topic, payload);
                     break;
-                case TopicConstants.TOPIC_DAS_EXIST:
+                case "exist":
                     subDasExist(topic, payload);
                     break;
-                case TopicConstants.TOPIC_MQTT_CLIENT_CONNECT:
+                case "connect":
                     subMqttClientConnect(topic, payload);
                     break;
                 default:
@@ -438,37 +442,6 @@ public class MqttSubHandle {
             targetPoints[i] = new BigDecimal[]{targetPoint.getBigDecimal(0), targetPoint.getBigDecimal(1), targetPoint.getBigDecimal(2)};
         }
 
-//        String targetPointsStr = JSON.toJSONString(targetPoints);
-//        DevInfo dev = devInfoService.queryByClientId(clientId);
-//        if (dev != null) {
-//            if (!targetPointsStr.equals(dev.getTargetPoints())) {
-//                //  存储跌倒事件
-//                if (messageType == 3) {
-//
-//                    String event = obj.getString("event");
-//                    if (event.equals("fall_confirmed")) {
-//                        EventList eventListVO = new EventList();
-//                        eventListVO.setDevId(dev.getDevId());
-//                        eventListVO.setPose(pose);
-//                        eventListVO.setIsHandle(0);
-//                        eventListVO.setTargetPoints(targetPointsStr);
-//                        eventListVO.setEventType(messageType);
-//                        eventService.save(eventListVO);
-//                    }
-//                }
-//
-//                // todo 当前设备 监测 的最后一次 点位信息
-////                redisservice.hget(RedisCacheConstant.KEY_DEVICE_pre + clientId, "lastTargetTime“)
-////                redisservice.hget(RedisCacheConstant.KEY_DEVICE_pre + clientId, "lastTargetStr“)
-////                DevInfo devInfo = new DevInfo();
-////                devInfo.setTargetPoints(targetPointsStr);
-////                devInfo.setSignalTime(LocalDateTime.now());
-////                devInfoService.update(devInfo, new LambdaUpdateWrapper<DevInfo>().eq(DevInfo::getClientId, clientId));
-//            } else {
-//                log.info("此次targetPoints与上次相同!");
-//            }
-//        }
-
         //向前端发送数据
         JSONObject msg = new JSONObject();
         msg.put("targetPoints", targetPointArray);

+ 79 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/po/AlarmPlan.java

@@ -0,0 +1,79 @@
+package com.hfln.portal.infrastructure.po;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.math.BigDecimal;
+
+/**
+ * 告警计划表
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@TableName("alarm_plan")
+public class AlarmPlan extends BasePO {
+    /**
+     * 主键ID
+     */
+    @TableId(type = IdType.ASSIGN_ID)
+    private Long alarmPlanId;
+
+    /**
+     * 计划类型
+     */
+    private String planType;
+
+    /**
+     * 生效区域 startXx
+     */
+    private BigDecimal startXx;
+
+    /**
+     * 生效区域 endXx
+     */
+    private BigDecimal endXx;
+
+    /**
+     * 生效区域 startYy
+     */
+    private BigDecimal startYy;
+
+    /**
+     * 生效区域 endYy
+     */
+    private BigDecimal endYy;
+
+    /**
+     * 生效区域 startZz
+     */
+    private BigDecimal startZz;
+
+    /**
+     * 生效区域 endZz
+     */
+    private BigDecimal endZz;
+
+    /**
+     * 生效时间段
+     */
+    private String timeZone;
+
+    /**
+     * 一般滞留时间
+     */
+    private Long retentionTime;
+
+    /**
+     * 归并保持时间
+     */
+    private Long retentionKeepTime;
+
+    /**
+     * 异常滞留告警时间
+     */
+    private Long retentionAlarmTime;
+
+} 

+ 8 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/service/AlarmPlanService.java

@@ -0,0 +1,8 @@
+package com.hfln.portal.infrastructure.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.hfln.portal.infrastructure.po.AlarmPlan;
+
+public interface AlarmPlanService extends IService<AlarmPlan> {
+
+}

+ 12 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/service/impl/AlarmPlanServiceImpl.java

@@ -0,0 +1,12 @@
+package com.hfln.portal.infrastructure.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.hfln.portal.infrastructure.mapper.AlarmPlanMapper;
+import com.hfln.portal.infrastructure.po.AlarmPlan;
+import com.hfln.portal.infrastructure.service.AlarmPlanService;
+import org.springframework.stereotype.Service;
+
+@Service
+public class AlarmPlanServiceImpl extends ServiceImpl<AlarmPlanMapper, AlarmPlan> implements AlarmPlanService {
+
+}

+ 49 - 0
portal-service-infrastructure/src/main/java/com/hfln/portal/infrastructure/threadpool/ThreadPoolConfig.java

@@ -0,0 +1,49 @@
+package com.hfln.portal.infrastructure.threadpool;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Configuration
+public class ThreadPoolConfig {
+
+    // 从配置文件中注入参数(application.yml/properties)
+    @Value("${thread.pool.core-size:6}")
+    private int corePoolSize;
+
+    @Value("${thread.pool.max-size:6}")
+    private int maxPoolSize;
+
+    @Value("${thread.pool.queue-capacity:256}")
+    private int queueCapacity;
+
+    @Value("${thread.pool.keep-alive-seconds:60}")
+    private int keepAliveSeconds;
+
+    @Value("${thread.pool.thread-name-prefix:mqtt-sub-}")
+    private String threadNamePrefix;
+
+    @Value("${thread.pool.shutdown-timeout:30}")
+    private int shutdownTimeout;
+
+    @Bean("mqttSubExecutor")
+    public ThreadPoolTaskExecutor mqttSubExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(corePoolSize);
+        executor.setMaxPoolSize(maxPoolSize);
+        executor.setQueueCapacity(queueCapacity);
+        executor.setKeepAliveSeconds(keepAliveSeconds);
+        executor.setThreadNamePrefix(threadNamePrefix);
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+
+        // 优雅停机配置
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setAwaitTerminationSeconds(shutdownTimeout);  // 等待任务完成的最长时间
+
+        executor.initialize();
+        return executor;
+    }
+}