package com.hfln.device.infrastructure.mqtt; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.hfln.device.domain.constant.DeviceConstants; import com.hfln.device.domain.entity.Device; import com.hfln.device.domain.gateway.DeviceGateway; import com.hfln.device.domain.gateway.MqttGateway; import com.hfln.device.domain.service.DeviceManagerService; import com.hfln.device.domain.port.DeviceEventPort; import com.hfln.device.infrastructure.mapper.FallEventMapper; import com.hfln.device.infrastructure.po.FallEvent; import com.hfln.device.common.constant.mqtt.topic.MqttTopics; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.time.LocalDateTime; /** * MQTT消息处理器 * * ⚠️ 注意:为避免与@MqttSubscriber注解方式产生重复消费, * 此类中的@ServiceActivator方法已被禁用。 * * 现在统一使用各个Subscriber类处理MQTT消息: * - DeviceMessageSubscriber: 处理设备相关消息 * - MpsMessageSubscriber: 处理小程序消息 * - AppMessageSubscriber: 处理应用消息 * - OpcMessageSubscriber: 处理OPC消息 */ @Component @Slf4j public class MqttMessageHandler { @Autowired private DeviceManagerService deviceManagerService; @Autowired private DeviceGateway deviceGateway; @Autowired private MqttGateway mqttGateway; @Autowired private FallEventMapper fallEventMapper; @Autowired private DeviceEventPort deviceEventPort; private final ObjectMapper objectMapper = new ObjectMapper(); // 设备登录主题匹配模式 private static final Pattern DEV_LOGIN_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_LOGIN); // 设备保活主题匹配模式 private static final Pattern DEV_KEEPALIVE_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_KEEPALIVE); // 设备上报信息主题匹配模式 private static final Pattern DEV_REPORT_INFO_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_DEV_INFO); // 设备上报参数主题匹配模式 private static final Pattern DEV_REPORT_PARAM_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_DEV_PARAM); // 设备上报跌倒事件主题匹配模式 private static final Pattern DEV_REPORT_FALL_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_FALL_EVENT); // 设备上报点云数据主题匹配模式 private static final Pattern DEV_CLOUDPOINT_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_CLOUDPOINT); // 设备DSP数据主题匹配模式 private static final Pattern DEV_DSP_DATA_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_DSP_DATA); // 小程序跌倒确认主题匹配模式 private static final Pattern MPS_FALL_EVENT_ACK_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_FALL_EVENT_ACK); // 设备重启主题匹配模式 private static final Pattern DEV_REBOOT_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_DEV_REBOOT); /** * ⚠️ 已禁用:处理MQTT入站消息 * * 为避免与@MqttSubscriber注解方式产生重复消费,此方法已被注释。 * 现在统一使用各个Subscriber类处理MQTT消息。 * * 如果需要重新启用,请确保: * 1. 移除相应的@MqttSubscriber注解方法 * 2. 或者为此handler配置不同的MQTT客户端 */ /* @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handleMessage() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { try { String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); String payload = (String) message.getPayload(); log.debug("Received MQTT message: topic={}, payload={}", topic, payload); if (topic == null) { return; } // 处理不同类型的消息 if (matchTopic(DEV_LOGIN_PATTERN, topic)) { handleDeviceLogin(topic, payload); } else if (matchTopic(DEV_KEEPALIVE_PATTERN, topic)) { handleDeviceKeepAlive(topic, payload); } else if (matchTopic(DEV_REPORT_INFO_PATTERN, topic)) { handleDeviceReportInfo(topic, payload); } else if (matchTopic(DEV_REPORT_PARAM_PATTERN, topic)) { handleDeviceReportParam(topic, payload); } else if (matchTopic(DEV_REPORT_FALL_PATTERN, topic)) { handleDeviceReportFall(topic, payload); } else if (matchTopic(DEV_CLOUDPOINT_PATTERN, topic)) { handleDeviceCloudPoint(topic, payload); } else if (matchTopic(DEV_DSP_DATA_PATTERN, topic)) { handleDeviceDspData(topic, payload); } else if (matchTopic(MPS_FALL_EVENT_ACK_PATTERN, topic)) { handleFallEventAck(topic, payload); } else if (matchTopic(DEV_REBOOT_PATTERN, topic)) { handleDeviceReboot(topic, payload); } } catch (Exception e) { log.error("Error handling MQTT message: {}", e.getMessage(), e); } } }; } */ /** * ⚠️ 以下方法保留用于工具类用途,不再直接处理MQTT消息 * 如果需要在Subscriber中复用这些逻辑,可以将此类改为@Service */ /** * 处理设备登录消息 */ private void handleDeviceLogin(String topic, String payload) { try { JSONObject jsonObject = JSON.parseObject(payload); Map deviceInfo = (Map) jsonObject.get("device_info"); if (deviceInfo == null) { log.warn("Invalid device login message, missing device_info: {}", payload); return; } String deviceId = (String) deviceInfo.get("deviceid"); // 委托给应用层服务处理,传入完整的payload作为第三个参数 Map fullPayload = objectMapper.readValue(payload, HashMap.class); deviceEventPort.handleDeviceLogin(deviceId, deviceInfo, fullPayload); } catch (Exception e) { log.error("Error handling device login: {}", e.getMessage(), e); } } /** * 处理设备保活消息 */ private void handleDeviceKeepAlive(String topic, String payload) { try { Matcher matcher = DEV_KEEPALIVE_PATTERN.matcher(topic); if (matcher.find()) { String deviceId = matcher.group(1); // 委托给应用层服务处理 deviceEventPort.handleDeviceKeepAlive(deviceId); } } catch (Exception e) { log.error("Error handling device keepalive: {}", e.getMessage(), e); } } /** * 处理设备上报信息 * * 注意:Python版本中,设备信息上报不会自动发送状态消息 * 仅在设备状态发生变化(如从离线变为在线)时才发送状态通知 */ private void handleDeviceReportInfo(String topic, String payload) { try { JSONObject message = JSON.parseObject(payload); if (!message.containsKey("deviceid") || !message.containsKey("device_type") || !message.containsKey("firmware") || !message.containsKey("device_ip")) { log.warn("Invalid device info report: {}", payload); return; } String deviceId = (String) message.get("deviceid"); String deviceType = (String) message.get("device_type"); String software = (String) message.get("firmware"); String deviceIp = (String) message.get("device_ip"); // 检查设备是否已注册 Optional existingDevice = deviceManagerService.getDeviceFromCache(deviceId); if (existingDevice.isPresent()) { Device device = existingDevice.get(); boolean statusChanged = false; // 更新设备信息 device.setDevType(deviceType); device.setSoftware(software); if (device.getNetwork() != null) { device.getNetwork().setIp(deviceIp); } // 仅在设备状态发生变化时才标记需要发送状态消息 if (device.getOnline() != 1) { device.updateOnlineStatus(1); device.updateKeepAliveTime(System.currentTimeMillis()); statusChanged = true; // 更新数据库 deviceGateway.updateDeviceOnlineStatus(deviceId, 1); } deviceManagerService.updateDeviceInCache(device); // 只有在状态发生变化时才发送设备状态通知 if (statusChanged) { mqttGateway.sendDeviceStatusMessage(device); log.info("Device status changed to online: {}", deviceId); } else { log.debug("Device info updated without status change: {}", deviceId); } } } catch (Exception e) { log.error("Error handling device report info: {}", e.getMessage(), e); } } /** * 处理设备上报参数 * * 注意:Python版本中,参数更新不会自动发送状态消息 * 仅更新设备信息,不进行额外的消息发送 */ private void handleDeviceReportParam(String topic, String payload) { try { Matcher matcher = DEV_REPORT_PARAM_PATTERN.matcher(topic); if (matcher.find()) { String deviceId = matcher.group(1); JSONObject message = JSON.parseObject(payload); log.debug("Device param report: {}, payload: {}", deviceId, payload); // 获取设备参数 Map deviceParams = (Map) message.get("device_param"); if (deviceParams != null) { Optional existingDevice = deviceManagerService.getDeviceFromCache(deviceId); if (existingDevice.isPresent()) { Device device = existingDevice.get(); // 更新设备参数 if (deviceParams.containsKey("mounting_plain")) { String mountPlain = (String) deviceParams.get("mounting_plain"); if (device.getInstallParam() != null) { device.getInstallParam().setMountPlain(mountPlain); } } if (deviceParams.containsKey("sensor_height")) { Float height = ((Number) deviceParams.get("sensor_height")).floatValue(); if (device.getInstallParam() != null) { device.getInstallParam().setHeight(height); } } // 更新跟踪区域 Map trackingRegion = (Map) deviceParams.get("tracking_region"); if (trackingRegion != null && device.getInstallParam() != null && device.getInstallParam().getTrackingRegion() != null) { Device.TrackingRegion region = device.getInstallParam().getTrackingRegion(); if (trackingRegion.containsKey("start_x")) { region.setStartX(((Number) trackingRegion.get("start_x")).intValue()); } if (trackingRegion.containsKey("start_y")) { region.setStartY(((Number) trackingRegion.get("start_y")).intValue()); } if (trackingRegion.containsKey("start_z")) { region.setStartZ(((Number) trackingRegion.get("start_z")).intValue()); } if (trackingRegion.containsKey("stop_x")) { region.setStopX(((Number) trackingRegion.get("stop_x")).intValue()); } if (trackingRegion.containsKey("stop_y")) { region.setStopY(((Number) trackingRegion.get("stop_y")).intValue()); } if (trackingRegion.containsKey("stop_z")) { region.setStopZ(((Number) trackingRegion.get("stop_z")).intValue()); } } // 更新设备缓存 deviceManagerService.updateDeviceInCache(device); // 更新数据库 deviceGateway.saveDevice(device); // Python版本不在此处发送设备状态消息,仅记录日志 log.info("Device parameters updated: {}", deviceId); } } } } catch (Exception e) { log.error("Error handling device report param: {}", e.getMessage(), e); } } /** * 处理设备上报跌倒事件 */ private void handleDeviceReportFall(String topic, String payload) { try { Matcher matcher = DEV_REPORT_FALL_PATTERN.matcher(topic); if (matcher.find()) { String deviceId = matcher.group(1); Map message = objectMapper.readValue(payload, HashMap.class); log.info("Device fall event: {}, payload: {}", deviceId, payload); // 获取跌倒事件信息 String event = (String) message.get("event"); Integer pose = ((Number) message.get("pose")).intValue(); List targetPointList = (List) message.get("target_point"); // 转换目标点数据 List targetPoint = targetPointList.stream() .map(number -> number.floatValue()) .collect(Collectors.toList()); // 委托给应用层服务处理 // 转换为新的方法签名:handleFallEvent(deviceId, timestamp, type, event, fallLocX, fallLocY, fallLocZ, tarHeightEst) Long timestamp = System.currentTimeMillis(); Float fallLocX = targetPoint.size() > 0 ? targetPoint.get(0) : 0.0f; Float fallLocY = targetPoint.size() > 1 ? targetPoint.get(1) : 0.0f; Float fallLocZ = targetPoint.size() > 2 ? targetPoint.get(2) : 0.0f; Float tarHeightEst = targetPoint.size() > 3 ? targetPoint.get(3) : 0.0f; deviceEventPort.handleFallEvent(deviceId, timestamp, "fall", event, fallLocX, fallLocY, fallLocZ, tarHeightEst); // 保存事件到数据库 if ("fall_confirmed".equals(event)) { saveFallEvent(deviceId, pose, targetPoint); } } } catch (Exception e) { log.error("Error handling device fall event: {}", e.getMessage(), e); } } /** * 保存跌倒事件 */ private void saveFallEvent(String deviceId, int pose, List targetPoint) { try { // 创建跌倒事件记录 FallEvent fallEvent = new FallEvent(); fallEvent.setDevId(deviceId); fallEvent.setEventTime(new Date()); fallEvent.setEventType("fall"); fallEvent.setPose(pose); // 设置目标点坐标 if (targetPoint != null && targetPoint.size() >= 3) { fallEvent.setTargetX(targetPoint.get(0)); fallEvent.setTargetY(targetPoint.get(1)); fallEvent.setTargetZ(targetPoint.get(2)); } // 设置状态为未处理 fallEvent.setStatus(0); // 保存到数据库 fallEventMapper.insert(fallEvent); log.info("Fall event saved: {}", deviceId); } catch (Exception e) { log.error("Error saving fall event: {}", e.getMessage(), e); } } /** * 处理设备上报点云数据 * * 注意:Python版本中,点云数据的处理更加谨慎 * 不会自动发送实时姿态消息,仅进行数据记录和处理 */ private void handleDeviceCloudPoint(String topic, String payload) { try { Matcher matcher = DEV_CLOUDPOINT_PATTERN.matcher(topic); if (matcher.find()) { String deviceId = matcher.group(1); Map message = objectMapper.readValue(payload, HashMap.class); log.debug("Device cloud point: {}", deviceId); // 获取点云数据 List> pointCloud = (List>) message.get("point_cloud"); // Python版本中,点云数据主要用于算法处理 // 不会自动转发实时姿态消息,仅进行内部处理 // 这里可以添加算法处理,如姿态识别等 // 注释掉自动转发功能,避免重复消息 // mqttGateway.sendRealtimePoseMessage(deviceId, // DeviceConstants.PoseEnum.POSE_STANDING.getCode(), // 默认姿态 // pointCloud.get(0)); // 使用第一个点作为目标点 log.debug("Cloud point data processed for device: {}", deviceId); } } catch (Exception e) { log.error("Error handling device cloud point: {}", e.getMessage(), e); } } /** * 处理设备DSP数据 */ private void handleDeviceDspData(String topic, String payload) { try { Matcher matcher = DEV_DSP_DATA_PATTERN.matcher(topic); if (matcher.find()) { String deviceId = matcher.group(1); log.debug("Device DSP data: {}", deviceId); // 这里可以添加DSP数据处理逻辑 // Python版本中,DSP数据主要用于内部算法处理 } } catch (Exception e) { log.error("Error handling device DSP data: {}", e.getMessage(), e); } } /** * 处理跌倒事件确认 */ private void handleFallEventAck(String topic, String payload) { try { Map message = objectMapper.readValue(payload, HashMap.class); String deviceId = (String) message.get("deviceId"); Long eventId = ((Number) message.get("eventId")).longValue(); // 委托给应用层服务处理 deviceEventPort.handleAlarmAck(deviceId, eventId); } catch (Exception e) { log.error("Error handling fall event acknowledgement: {}", e.getMessage(), e); } } /** * 处理设备重启命令 */ private void handleDeviceReboot(String topic, String payload) { try { Matcher matcher = DEV_REBOOT_PATTERN.matcher(topic); if (matcher.find()) { String deviceId = matcher.group(1); log.info("Device reboot command: {}", deviceId); // 向设备发送重启命令 mqttGateway.sendDeviceRebootCommand(deviceId); } } catch (Exception e) { log.error("Error handling device reboot command: {}", e.getMessage(), e); } } /** * 匹配主题 */ private boolean matchTopic(Pattern pattern, String topic) { return pattern.matcher(topic).matches(); } /** * 从主题中提取设备ID */ private String extractDeviceId(Pattern pattern, String topic) { Matcher matcher = pattern.matcher(topic); if (matcher.find()) { return matcher.group(1); } return null; } }