Ver Fonte

feat: 合并ServiceActivator相关的业务处理

yangliu há 4 meses atrás
pai
commit
684ce896a2

+ 215 - 0
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/AppMessageHandler.java

@@ -0,0 +1,215 @@
+package com.hfln.device.infrastructure.mqtt.handler;
+
+import com.hfln.device.domain.port.DeviceEventPort;
+import com.hfln.device.domain.entity.Device;
+import com.hfln.device.domain.service.DeviceManagerService;
+import com.hfln.device.application.service.DeviceCommandService;
+import com.hfln.device.common.util.JsonUtil;
+import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * 应用消息处理器
+ * 使用Spring Integration MQTT替代@MqttSubscriber注解
+ * 处理来自移动应用的MQTT消息
+ */
+@Component
+@Slf4j
+public class AppMessageHandler {
+    
+    @Autowired
+    @Qualifier("deviceEventServiceImpl")
+    private DeviceEventPort deviceEventPort;
+    
+    @Autowired
+    private DeviceCommandService deviceCommandService;
+    
+    @Autowired
+    private DeviceManagerService deviceManagerService;
+
+    public void handleMessage(Message<?> message) {
+        try {
+            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+            String payload = message.getPayload().toString();
+            
+            if (topic == null) {
+                log.warn("Received message without topic header");
+                return;
+            }
+
+            log.debug("Received app message: topic={}, payload={}", topic, payload);
+
+            // 根据主题路由到不同的处理方法
+            switch (topic) {
+                case MqttTopics.APP_FALL_EVENT_ACK:
+                    handleFallEventAck(topic, payload);
+                    break;
+                case MqttTopics.APP_BIND_DEVICE:
+                    handleAddDevice(topic, payload);
+                    break;
+                case MqttTopics.APP_UNBIND_DEVICE:
+                    handleDeleteDevice(topic, payload);
+                    break;
+                case MqttTopics.APP_SET_DEVICE_PARAM:
+                    handleSetDeviceParam(topic, payload);
+                    break;
+                default:
+                    log.debug("Unhandled app topic: {}", topic);
+                    break;
+            }
+
+        } catch (Exception e) {
+            log.error("Error handling app message: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理跌倒事件确认
+     */
+    private void handleFallEventAck(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            log.debug("Received fall event ack: {}", payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                log.debug("error: invalid param, {}", topic);
+                return;
+            }
+            
+            // 获取设备
+            Optional<Device> deviceOpt = deviceManagerService.getDeviceFromCache(devId);
+            if (!deviceOpt.isPresent()) {
+                log.debug("error: deal_fall_event_ack, no device: {}", devId);
+                return;
+            }
+            
+            Device device = deviceOpt.get();
+            
+            // 检查设备是否已确认
+            if (Boolean.TRUE.equals(device.getAlarmAck())) {
+                return; // 已确认,直接返回
+            }
+            
+            // 设置告警确认状态和时间
+            long now = System.currentTimeMillis();
+            device.setAlarmAck(true);
+            device.setLastAlarmAckTime(now);
+            
+            // 更新设备缓存
+            deviceManagerService.updateDeviceInCache(device);
+            
+        } catch (Exception e) {
+            log.error("Error handling fall event ack: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理添加设备请求
+     */
+    private void handleAddDevice(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            log.debug("Received add device request: {}", payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                return;
+            }
+            
+            // 查询数据库
+            log.debug("TODO: Query device info from database for devId: {}", devId);
+            
+        } catch (Exception e) {
+            log.debug("deal_add_device error: {}", e.getMessage());
+        }
+    }
+    
+    /**
+     * 处理删除设备请求
+     */
+    private void handleDeleteDevice(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            log.debug("Received delete device request: {}", payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                return;
+            }
+            
+            // 检查设备是否存在
+            Optional<Device> deviceOpt = deviceManagerService.getDeviceFromCache(devId);
+            if (!deviceOpt.isPresent()) {
+                log.debug("error: deal_del_device, no device: {}", devId);
+                return;
+            }
+            
+            // 删除设备
+            deviceManagerService.removeDeviceFromCache(devId);
+            
+        } catch (Exception e) {
+            log.error("deal_del_device error: {}", e.getMessage());
+        }
+    }
+    
+    /**
+     * 处理设备参数设置请求
+     */
+    private void handleSetDeviceParam(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            log.debug("Received set device param request: {}", payload);
+            
+            // 严格验证所有必需字段
+            String devId = (String) messageData.get("dev_id");
+            String mountingPlain = (String) messageData.get("mounting_plain");
+            Number heightNum = (Number) messageData.get("height");
+            Map<String, Object> area = (Map<String, Object>) messageData.get("area");
+            
+            if (devId == null || mountingPlain == null || heightNum == null || area == null ||
+                !area.containsKey("start_x") || !area.containsKey("start_y") || !area.containsKey("start_z") ||
+                !area.containsKey("stop_x") || !area.containsKey("stop_y") || !area.containsKey("stop_z")) {
+                log.debug("error: invalid param, {}", topic);
+                return;
+            }
+            
+            // 检查设备是否存在
+            Optional<Device> deviceOpt = deviceManagerService.getDeviceFromCache(devId);
+            if (!deviceOpt.isPresent()) {
+                log.debug("error: no device: {}, {}", devId, topic);
+                return;
+            }
+            
+            Float height = heightNum.floatValue();
+            Number startX = (Number) area.get("start_x");
+            Number startY = (Number) area.get("start_y");
+            Number startZ = (Number) area.get("start_z");
+            Number stopX = (Number) area.get("stop_x");
+            Number stopY = (Number) area.get("stop_y");
+            Number stopZ = (Number) area.get("stop_z");
+            
+            // 格式化区域字符串
+            String areaStr = String.format("%s,%s,%s,%s,%s,%s", 
+                startX, stopX, startY, stopY, startZ, stopZ);
+            
+            // 向设备端发送参数设置命令
+            log.debug("TODO: Send device param setting command for devId: {}, mountingPlain: {}, area: {}, height: {}", 
+                     devId, mountingPlain, areaStr, height);
+            
+        } catch (Exception e) {
+            log.error("deal_set_dev_param error: {}", e.getMessage());
+        }
+    }
+} 

+ 371 - 0
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/DeviceMessageHandler.java

@@ -0,0 +1,371 @@
+package com.hfln.device.infrastructure.mqtt.handler;
+
+import com.hfln.device.domain.port.DeviceEventPort;
+import com.hfln.device.common.util.JsonUtil;
+import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * 设备消息处理器
+ * 使用Spring Integration MQTT替代@MqttSubscriber注解
+ * 处理设备相关的MQTT消息
+ */
+@Component
+@Slf4j
+public class DeviceMessageHandler {
+
+    private static final Pattern DEV_ID_PATTERN = Pattern.compile("^/dev/([^/]+)/.*$");
+    
+    @Autowired
+    @Qualifier("deviceEventServiceImpl")
+    private DeviceEventPort deviceEventPort;
+
+    public void handleMessage(Message<?> message) {
+        try {
+            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+            String payload = message.getPayload().toString();
+            
+            if (topic == null) {
+                log.warn("Received message without topic header");
+                return;
+            }
+
+            log.debug("Received device message: topic={}, payload={}", topic, payload);
+
+            // 根据主题路由到不同的处理方法
+            // 提取主题的操作部分(最后一段)
+            String action = extractActionFromTopic(topic);
+            if (action != null) {
+                switch (action) {
+                    case "login":
+                        handleDeviceLogin(topic, payload);
+                        break;
+                    case "keepalive":
+                        handleDeviceKeepAlive(topic, payload);
+                        break;
+                    case "report_device_info":
+                        handleDeviceReportDeviceInfo(topic, payload);
+                        break;
+                    case "report_device_param":
+                        handleDeviceReportDeviceParam(topic, payload);
+                        break;
+                    case "dsp_data":
+                        handleDeviceDspData(topic, payload);
+                        break;
+                    case "cloudpoint":
+                        handleDeviceCloudPoint(topic, payload);
+                        break;
+                    case "report_falling_event":
+                        handleDeviceReportFallEvent(topic, payload);
+                        break;
+                    case "report_presence_event":
+                        handleDeviceReportPresenceEvent(topic, payload);
+                        break;
+                    case "set_debug_param":
+                        handleSetDebugParam(topic, payload);
+                        break;
+                    case "get_debug_param":
+                        handleGetDebugParam(topic, payload);
+                        break;
+                    default:
+                        log.debug("Unhandled device topic action: {} for topic: {}", action, topic);
+                        break;
+                }
+            } else {
+                log.debug("Could not extract action from device topic: {}", topic);
+            }
+
+        } catch (Exception e) {
+            log.error("Error handling device message: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备登录消息
+     */
+    private void handleDeviceLogin(String topic, String payload) {
+        try {
+            log.info("Received device login message: {}", topic);
+            
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            Map<String, Object> deviceInfo = (Map<String, Object>) messageData.get("device_info");
+            
+            if (deviceInfo == null) {
+                log.warn("Invalid device login message, missing device_info: {}", payload);
+                return;
+            }
+            
+            String deviceId = (String) deviceInfo.get("deviceid");
+            
+            // 委托给应用层服务处理
+            deviceEventPort.handleDeviceLogin(deviceId, deviceInfo, messageData);
+            
+        } catch (Exception e) {
+            log.error("Error handling device login: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备保活消息
+     */
+    private void handleDeviceKeepAlive(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                log.debug("Received device keepalive: {}", deviceId);
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleDeviceKeepAlive(deviceId);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device keepalive: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备上报设备信息
+     */
+    private void handleDeviceReportDeviceInfo(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.info("Device info report: {}, payload: {}", deviceId, payload);
+                
+                // 验证必要字段
+                if (!messageData.containsKey("deviceid") || !messageData.containsKey("device_type") ||
+                    !messageData.containsKey("firmware") || !messageData.containsKey("device_ip")) {
+                    log.warn("Invalid device info message, missing required fields: {}", payload);
+                    return;
+                }
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleDeviceLogin(deviceId, messageData, messageData);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device info report: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备上报设备参数
+     */
+    private void handleDeviceReportDeviceParam(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.info("Device parameter report: {}, payload: {}", deviceId, payload);
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleDeviceParamReport(deviceId, messageData);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device parameter report: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备实时数据
+     */
+    private void handleDeviceDspData(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.debug("Device dsp data: {}", deviceId);
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleDspData(deviceId, messageData);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device dsp data: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备点云数据
+     */
+    private void handleDeviceCloudPoint(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.debug("Device cloud point data: {}", deviceId);
+                
+                // 获取点云数据和目标点
+                List<List<Number>> cloudPoints = (List<List<Number>>) messageData.get("cloud_points");
+                List<List<Number>> trackerTargets = (List<List<Number>>) messageData.get("tracker_targets");
+                
+                // 转换数据类型
+                List<List<Float>> cloudPointsFloat = null;
+                if (cloudPoints != null) {
+                    cloudPointsFloat = cloudPoints.stream()
+                            .map(point -> point.stream()
+                                    .map(number -> number.floatValue())
+                                    .collect(Collectors.toList()))
+                            .collect(Collectors.toList());
+                }
+                
+                List<List<Float>> trackerTargetsFloat = null;
+                if (trackerTargets != null) {
+                    trackerTargetsFloat = trackerTargets.stream()
+                            .map(target -> target.stream()
+                                    .map(number -> number.floatValue())
+                                    .collect(Collectors.toList()))
+                            .collect(Collectors.toList());
+                }
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleCloudPoint(deviceId, cloudPointsFloat, trackerTargetsFloat);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device cloud point data: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备上报跌倒事件
+     */
+    private void handleDeviceReportFallEvent(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.info("Device fall event: {}, payload: {}", deviceId, payload);
+                
+                // 验证必要字段
+                if (!messageData.containsKey("timestamp") || !messageData.containsKey("type") ||
+                    !messageData.containsKey("event") || !messageData.containsKey("fallLocX") ||
+                    !messageData.containsKey("fallLocY") || !messageData.containsKey("fallLocZ") ||
+                    !messageData.containsKey("tarHeightEst")) {
+                    log.warn("Invalid fall event message, missing required fields: {}", payload);
+                    return;
+                }
+                
+                Long timestamp = ((Number) messageData.get("timestamp")).longValue();
+                String type = (String) messageData.get("type");
+                String event = (String) messageData.get("event");
+                // 坐标单位转换:厘米转米
+                Float fallLocX = ((Number) messageData.get("fallLocX")).floatValue() / 100.0f;
+                Float fallLocY = ((Number) messageData.get("fallLocY")).floatValue() / 100.0f;
+                Float fallLocZ = ((Number) messageData.get("fallLocZ")).floatValue() / 100.0f;
+                Float tarHeightEst = ((Number) messageData.get("tarHeightEst")).floatValue();
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleFallEvent(deviceId, timestamp, type, event, 
+                    fallLocX, fallLocY, fallLocZ, tarHeightEst);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device fall event: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设备上报存在事件
+     */
+    private void handleDeviceReportPresenceEvent(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.info("Device presence event: {}, payload: {}", deviceId, payload);
+                
+                // 验证必要字段
+                if (!messageData.containsKey("timestamp") || !messageData.containsKey("type") ||
+                    !messageData.containsKey("event")) {
+                    log.warn("Invalid presence event message, missing required fields: {}", payload);
+                    return;
+                }
+                
+                Long timestamp = ((Number) messageData.get("timestamp")).longValue();
+                String type = (String) messageData.get("type");
+                String event = (String) messageData.get("event");
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleExistEvent(deviceId, timestamp, type, event);
+            }
+        } catch (Exception e) {
+            log.error("Error handling device presence event: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理设置调试参数
+     */
+    private void handleSetDebugParam(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.info("Set debug param: {}, payload: {}", deviceId, payload);
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleSetDebugParam(deviceId, messageData);
+            }
+        } catch (Exception e) {
+            log.error("Error handling set debug param: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理获取调试参数
+     */
+    private void handleGetDebugParam(String topic, String payload) {
+        try {
+            String deviceId = extractDeviceIdFromTopic(topic);
+            if (deviceId != null) {
+                Map<String, Object> messageData = JsonUtil.parseMap(payload);
+                
+                log.info("Get debug param: {}, payload: {}", deviceId, payload);
+                
+                // 委托给应用层服务处理
+                deviceEventPort.handleGetDebugParam(deviceId, messageData);
+            }
+        } catch (Exception e) {
+            log.error("Error handling get debug param: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 从主题中提取设备ID
+     */
+    private String extractDeviceIdFromTopic(String topic) {
+        Matcher matcher = DEV_ID_PATTERN.matcher(topic);
+        if (matcher.find()) {
+            return matcher.group(1);
+        }
+        return null;
+    }
+    
+    /**
+     * 从主题中提取操作名称(最后一段)
+     * 格式:/dev/{device_id}/{action}
+     */
+    private String extractActionFromTopic(String topic) {
+        if (topic != null && topic.startsWith("/dev/")) {
+            String[] parts = topic.split("/");
+            if (parts.length >= 3) {
+                return parts[parts.length - 1]; // 返回最后一段
+            }
+        }
+        return null;
+    }
+} 

+ 258 - 0
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/MpsMessageHandler.java

@@ -0,0 +1,258 @@
+package com.hfln.device.infrastructure.mqtt.handler;
+
+import com.hfln.device.application.service.DeviceCommandService;
+import com.hfln.device.common.util.JsonUtil;
+import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * MPS消息处理器
+ * 使用Spring Integration MQTT替代@MqttSubscriber注解
+ * 处理小程序服务(Mini Program Service)相关的MQTT消息
+ */
+@Component
+@Slf4j
+public class MpsMessageHandler {
+
+    private static final Pattern DEV_REBOOT_PATTERN = Pattern.compile("^/mps/([^/]+)/reboot$");
+    
+    @Autowired
+    private DeviceCommandService deviceCommandService;
+
+    public void handleMessage(Message<?> message) {
+        try {
+            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+            String payload = message.getPayload().toString();
+            
+            if (topic == null) {
+                log.warn("Received message without topic header");
+                return;
+            }
+
+            log.debug("Received mps message: topic={}, payload={}", topic, payload);
+
+            // 根据主题路由到不同的处理方法
+            if (topic.matches("^/mps/[^/]+/reboot$")) {
+                // 特殊处理动态主题,因为switch不能处理正则表达式
+                handleDeviceReboot(topic, payload);
+            } else {
+                switch (topic) {
+                    case MqttTopics.MPS_GET_DEV_INFO:
+                        handleGetDeviceInfo(topic, payload);
+                        break;
+                    case MqttTopics.MPS_GET_DEV_PARAM:
+                        handleGetDeviceParam(topic, payload);
+                        break;
+                    case MqttTopics.MPS_SET_DEV_PARAM:
+                        handleSetDeviceParam(topic, payload);
+                        break;
+                    case MqttTopics.MPS_ADD_DEVICE:
+                        handleAddDevice(topic, payload);
+                        break;
+                    case MqttTopics.MPS_DEL_DEVICE:
+                        handleDeleteDevice(topic, payload);
+                        break;
+                    case MqttTopics.MPS_FALL_EVENT_ACK:
+                        handleFallEventAck(topic, payload);
+                        break;
+                    default:
+                        log.debug("Unhandled mps topic: {}", topic);
+                        break;
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("Error handling mps message: {}", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 处理获取设备信息请求
+     */
+    private void handleGetDeviceInfo(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                log.warn("Invalid get device info message, missing dev_id: {}", payload);
+                return;
+            }
+            
+            log.info("Received get device info request: {}", devId);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleGetDeviceInfo(devId);
+            
+        } catch (Exception e) {
+            log.error("Error handling get device info request: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理获取设备参数请求
+     */
+    private void handleGetDeviceParam(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                log.warn("Invalid get device param message, missing dev_id: {}", payload);
+                return;
+            }
+            
+            log.info("Received get device param request: {}", devId);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleGetDeviceParam(devId);
+            
+        } catch (Exception e) {
+            log.error("Error handling get device param request: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理设置设备参数请求
+     */
+    private void handleSetDeviceParam(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            String mountingPlain = (String) messageData.get("mounting_plain");
+            Number height = (Number) messageData.get("height");
+            Map<String, Object> area = (Map<String, Object>) messageData.get("area");
+            
+            if (devId == null || mountingPlain == null || height == null || area == null) {
+                log.warn("Invalid set device param message, missing required fields: {}", payload);
+                return;
+            }
+            
+            // 验证area对象中的坐标字段
+            if (!area.containsKey("start_x") || !area.containsKey("start_y") || !area.containsKey("start_z") ||
+                !area.containsKey("stop_x") || !area.containsKey("stop_y") || !area.containsKey("stop_z")) {
+                log.warn("Invalid area parameters in set device param message: {}", payload);
+                return;
+            }
+            
+            log.info("Received set device param request: {}, payload: {}", devId, payload);
+            
+            // 提取area坐标并构建area_str
+            Number startX = (Number) area.get("start_x");
+            Number startY = (Number) area.get("start_y");
+            Number startZ = (Number) area.get("start_z");
+            Number stopX = (Number) area.get("stop_x");
+            Number stopY = (Number) area.get("stop_y");
+            Number stopZ = (Number) area.get("stop_z");
+            
+            String areaStr = String.format("%s,%s,%s,%s,%s,%s", 
+                startX, stopX, startY, stopY, startZ, stopZ);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleSetDeviceParam(devId, mountingPlain, areaStr, height.floatValue());
+            
+        } catch (Exception e) {
+            log.error("Error handling set device param request: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理设备重启请求
+     */
+    private void handleDeviceReboot(String topic, String payload) {
+        try {
+            // 从topic解析设备ID
+            Matcher matcher = DEV_REBOOT_PATTERN.matcher(topic);
+            if (!matcher.find()) {
+                log.warn("Invalid reboot topic format: {}", topic);
+                return;
+            }
+            
+            String devId = matcher.group(1);
+            log.info("Received device reboot request: {}", devId);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleDeviceReboot(devId);
+            
+        } catch (Exception e) {
+            log.error("Error handling device reboot request: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理添加设备请求
+     */
+    private void handleAddDevice(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                log.warn("Invalid add device message, missing dev_id: {}", payload);
+                return;
+            }
+            
+            log.info("Received add device request: {}", devId);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleAddDevice(devId);
+            
+        } catch (Exception e) {
+            log.error("Error handling add device request: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理删除设备请求
+     */
+    private void handleDeleteDevice(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                log.warn("Invalid delete device message, missing dev_id: {}", payload);
+                return;
+            }
+            
+            log.info("Received delete device request: {}", devId);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleDeleteDevice(devId);
+            
+        } catch (Exception e) {
+            log.error("Error handling delete device request: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 处理跌倒事件确认
+     */
+    private void handleFallEventAck(String topic, String payload) {
+        try {
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            String devId = (String) messageData.get("dev_id");
+            if (devId == null) {
+                log.warn("Invalid fall event ack message, missing dev_id: {}", payload);
+                return;
+            }
+            
+            log.info("Received fall event ack: {}", devId);
+            
+            // 委托给应用层服务处理
+            deviceCommandService.handleFallEventAck(devId);
+            
+        } catch (Exception e) {
+            log.error("Error handling fall event ack: {}", e.getMessage(), e);
+        }
+    }
+} 

+ 148 - 0
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/OpcMessageHandler.java

@@ -0,0 +1,148 @@
+package com.hfln.device.infrastructure.mqtt.handler;
+
+import com.hfln.device.common.util.JsonUtil;
+import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
+import com.hfln.device.domain.port.DeviceEventPort;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * OPC消息处理器
+ * 使用Spring Integration MQTT替代@MqttSubscriber注解
+ * 负责处理OPC相关的MQTT消息
+ */
+@Component
+@Slf4j
+public class OpcMessageHandler {
+
+    @Autowired
+    @Qualifier("deviceEventServiceImpl")
+    private DeviceEventPort deviceEventPort;
+
+    public void handleMessage(Message<?> message) {
+        try {
+            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
+            String payload = message.getPayload().toString();
+            
+            if (topic == null) {
+                log.warn("Received message without topic header");
+                return;
+            }
+
+            log.debug("Received opc message: topic={}, payload={}", topic, payload);
+
+            // 根据主题路由到不同的处理方法
+            switch (topic) {
+                case MqttTopics.OPC_GET_ALARM_PARAM:
+                    handleGetAlarmParam(topic, payload);
+                    break;
+                case MqttTopics.OPC_SET_ALARM_PARAM:
+                    handleSetAlarmParam(topic, payload);
+                    break;
+                default:
+                    log.debug("Unhandled opc topic: {}", topic);
+                    break;
+            }
+
+        } catch (Exception e) {
+            log.error("Error handling opc message: {}", e.getMessage(), e);
+        }
+    }
+    
+    /**
+     * 获取告警参数消息处理
+     */
+    private void handleGetAlarmParam(String topic, String payload) {
+        log.debug("收到获取告警参数请求: topic={}, payload={}", topic, payload);
+        
+        try {
+            // 解析消息内容
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            // 检查请求类型并处理
+            if (messageData.containsKey("global")) {
+                // 获取全局告警参数
+                log.debug("处理全局告警参数获取请求");
+                deviceEventPort.handleGetGlobalAlarmParam(payload);
+            } else if (messageData.containsKey("toilet")) {
+                // 获取厕所告警参数
+                log.debug("处理厕所告警参数获取请求");
+                deviceEventPort.handleGetToiletAlarmParam(payload);
+            } else {
+                log.warn("无效的告警参数获取请求: {}", payload);
+            }
+        } catch (Exception e) {
+            log.error("处理获取告警参数请求失败: topic={}, payload={}", topic, payload, e);
+        }
+    }
+    
+    /**
+     * 设置告警参数消息处理
+     */
+    private void handleSetAlarmParam(String topic, String payload) {
+        log.debug("收到设置告警参数请求: topic={}, payload={}", topic, payload);
+        
+        try {
+            // 解析消息内容
+            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+            
+            // 检查参数类型并处理
+            if (messageData.containsKey("global")) {
+                // 设置全局告警参数
+                Map<String, Object> globalParam = (Map<String, Object>) messageData.get("global");
+                log.debug("处理全局告警参数设置请求: {}", globalParam);
+                
+                // 验证必要参数
+                if (validateAlarmParams(globalParam)) {
+                    deviceEventPort.handleSetGlobalAlarmParam(payload);
+                } else {
+                    log.warn("全局告警参数验证失败: {}", globalParam);
+                    deviceEventPort.sendSetAlarmParamAck(-1, "{}");
+                }
+                
+            } else if (messageData.containsKey("toilet")) {
+                // 设置厕所告警参数
+                Map<String, Object> toiletParam = (Map<String, Object>) messageData.get("toilet");
+                log.debug("处理厕所告警参数设置请求: {}", toiletParam);
+                
+                // 验证必要参数
+                if (validateAlarmParams(toiletParam)) {
+                    deviceEventPort.handleSetToiletAlarmParam(payload);
+                } else {
+                    log.warn("厕所告警参数验证失败: {}", toiletParam);
+                    deviceEventPort.sendSetAlarmParamAck(-1, "{}");
+                }
+                
+            } else {
+                log.warn("无效的告警参数设置请求,缺少global或toilet字段: {}", payload);
+                deviceEventPort.sendSetAlarmParamAck(-1, "{}");
+            }
+        } catch (Exception e) {
+            log.error("处理设置告警参数请求失败: topic={}, payload={}", topic, payload, e);
+            try {
+                deviceEventPort.sendSetAlarmParamAck(-1, "{}");
+            } catch (Exception ex) {
+                log.error("发送错误响应失败", ex);
+            }
+        }
+    }
+    
+    /**
+     * 验证告警参数的完整性
+     */
+    private boolean validateAlarmParams(Map<String, Object> alarmParams) {
+        if (alarmParams == null) {
+            return false;
+        }
+        
+        // 检查必要字段
+        return alarmParams.containsKey("retention_time") &&
+               alarmParams.containsKey("retention_keep_time") &&
+               alarmParams.containsKey("retention_alarm_time");
+    }
+} 

+ 83 - 83
device-service-server/src/main/resources/application.yml

@@ -1,83 +1,83 @@
-spring:
-  application:
-    name: device-service
-  # 数据库配置
-  datasource:
-    type: com.zaxxer.hikari.HikariDataSource
-    driver-class-name: com.mysql.cj.jdbc.Driver
-    url: jdbc:mysql://localhost:3306/hfln_device?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
-    username: root
-    password: 123456
-    hikari:
-      minimum-idle: 5
-      maximum-pool-size: 20
-      auto-commit: true
-      idle-timeout: 30000
-      pool-name: HikariCP
-      max-lifetime: 1800000
-      connection-timeout: 30000
-      connection-test-query: SELECT 1
-  redis:
-    host: localhost
-    port: 6379
-    password:
-    database: 0
-  jackson:
-    date-format: yyyy-MM-dd HH:mm:ss
-    time-zone: GMT+8
-
-# MQTT配置
-mqtt:
-  broker:
-    url: tcp://localhost:1883
-  client:
-    id: hfln-device-service
-  username: admin
-  password: public
-  topics: hfln/device/#
-  qos: 1
-  completion:
-    timeout: 5000
-
-# 告警配置
-alarm:
-  retention-time: 60
-  retention-keep-time: 30
-  retention-alarm-time: 180
-
-# MyBatis配置
-mybatis:
-  mapper-locations: classpath:mapper/*.xml
-  type-aliases-package: com.hfln.device.infrastructure.po
-  configuration:
-    map-underscore-to-camel-case: true
-    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
-
-# 日志配置
-logging:
-  level:
-    root: info
-    com.hfln: debug
-  file:
-    name: logs/device-service.log
-    max-size: 10MB
-    max-history: 30
-
-# 服务器配置
-server:
-  port: 8080
-  servlet:
-    context-path: /device-service
-
-# 设备配置
-device:
-  check-interval: 30
-  offline-timeout: 180
-  # 设备保活超时时间(毫秒)
-  keepalive-timeout: 60000
-  # 设备告警确认超时时间(毫秒)
-  alarm-ack-timeout: 300000
-  # 设备停留时间阈值(毫秒)
-  stay-time-threshold: 600000
-  # 设备滞留时间阈值(毫秒)
-  retention-time-threshold: 1800000 
+#spring:
+#  application:
+#    name: device-service
+#  # 数据库配置
+#  datasource:
+#    type: com.zaxxer.hikari.HikariDataSource
+#    driver-class-name: com.mysql.cj.jdbc.Driver
+#    url: jdbc:mysql://localhost:3306/hfln_device?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
+#    username: root
+#    password: 123456
+#    hikari:
+#      minimum-idle: 5
+#      maximum-pool-size: 20
+#      auto-commit: true
+#      idle-timeout: 30000
+#      pool-name: HikariCP
+#      max-lifetime: 1800000
+#      connection-timeout: 30000
+#      connection-test-query: SELECT 1
+#  redis:
+#    host: localhost
+#    port: 6379
+#    password:
+#    database: 0
+#  jackson:
+#    date-format: yyyy-MM-dd HH:mm:ss
+#    time-zone: GMT+8
+#
+## MQTT配置
+#mqtt:
+#  broker:
+#    url: tcp://localhost:1883
+#  client:
+#    id: hfln-device-service
+#  username: admin
+#  password: public
+#  topics: hfln/device/#
+#  qos: 1
+#  completion:
+#    timeout: 5000
+#
+## 告警配置
+#alarm:
+#  retention-time: 60
+#  retention-keep-time: 30
+#  retention-alarm-time: 180
+#
+## MyBatis配置
+#mybatis:
+#  mapper-locations: classpath:mapper/*.xml
+#  type-aliases-package: com.hfln.device.infrastructure.po
+#  configuration:
+#    map-underscore-to-camel-case: true
+#    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+#
+## 日志配置
+#logging:
+#  level:
+#    root: info
+#    com.hfln: debug
+#  file:
+#    name: logs/device-service.log
+#    max-size: 10MB
+#    max-history: 30
+#
+## 服务器配置
+#server:
+#  port: 8080
+#  servlet:
+#    context-path: /device-service
+#
+## 设备配置
+#device:
+#  check-interval: 30
+#  offline-timeout: 180
+#  # 设备保活超时时间(毫秒)
+#  keepalive-timeout: 60000
+#  # 设备告警确认超时时间(毫秒)
+#  alarm-ack-timeout: 300000
+#  # 设备停留时间阈值(毫秒)
+#  stay-time-threshold: 600000
+#  # 设备滞留时间阈值(毫秒)
+#  retention-time-threshold: 1800000

+ 7 - 28
device-service-server/src/main/resources/bootstrap-local.yml

@@ -14,7 +14,6 @@ spring:
     host: 8.130.28.21
     port: 6379
     database: 5
-    password: Hfln@1024
     timeout: 10s
     lettuce:
       pool:
@@ -58,37 +57,17 @@ lnxx:
       sdkAppId: 1400966707
 
 
-emqx:
+mqtt:
   enabled: true
-  server-uris:
-    - tcp://8.130.28.21:1883
-  client-id: ${random.uuid}
+  broker: tcp://8.130.28.21:1883
+  client:
+    id: hfln-device-service-${random.uuid}
   username: admin
   password: public
-  connection-timeout: 30
-  keep-alive-interval: 60
-  automatic-reconnect: true
-  max-reconnect-delay: 10000
-  clean-session: true
-  max-inflight: 100
-  monitor:
-    enabled: true
-    interval: 30
-    max-reconnect-attempts: 10
-    reconnect-cooldown: 60
-  message:
-    process-timeout: 30
-
-# MQTT配置
-mqtt:
-  broker: 119.45.12.173
-  port: 1883
-  username: lnradar
-  password: lnradar
-  client:
-    id: device-service-
+  topics: hfln/device/#
+  qos: 1
   completion:
-    timeout: 30000
+    timeout: 5000
 
 # 设备配置
 device: