123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 |
- package com.hfln.device.infrastructure.mqtt;
- import com.alibaba.fastjson2.JSON;
- import com.alibaba.fastjson2.JSONObject;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.hfln.device.domain.constant.DeviceConstants;
- import com.hfln.device.domain.entity.Device;
- import com.hfln.device.domain.gateway.DeviceGateway;
- import com.hfln.device.domain.gateway.MqttGateway;
- import com.hfln.device.domain.service.DeviceManagerService;
- import com.hfln.device.domain.port.DeviceEventPort;
- import com.hfln.device.infrastructure.mapper.FallEventMapper;
- import com.hfln.device.infrastructure.po.FallEvent;
- import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessageHandler;
- import org.springframework.messaging.MessagingException;
- import org.springframework.stereotype.Component;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import java.util.stream.Collectors;
- import java.time.LocalDateTime;
- /**
- * MQTT消息处理器
- *
- * ⚠️ 注意:为避免与@MqttSubscriber注解方式产生重复消费,
- * 此类中的@ServiceActivator方法已被禁用。
- *
- * 现在统一使用各个Subscriber类处理MQTT消息:
- * - DeviceMessageSubscriber: 处理设备相关消息
- * - MpsMessageSubscriber: 处理小程序消息
- * - AppMessageSubscriber: 处理应用消息
- * - OpcMessageSubscriber: 处理OPC消息
- */
- @Component
- @Slf4j
- public class MqttMessageHandler {
- @Autowired
- private DeviceManagerService deviceManagerService;
- @Autowired
- private DeviceGateway deviceGateway;
- @Autowired
- private MqttGateway mqttGateway;
- @Autowired
- private FallEventMapper fallEventMapper;
- @Autowired
- private DeviceEventPort deviceEventPort;
- private final ObjectMapper objectMapper = new ObjectMapper();
- // 设备登录主题匹配模式
- private static final Pattern DEV_LOGIN_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_LOGIN);
-
- // 设备保活主题匹配模式
- private static final Pattern DEV_KEEPALIVE_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_KEEPALIVE);
-
- // 设备上报信息主题匹配模式
- private static final Pattern DEV_REPORT_INFO_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_DEV_INFO);
-
- // 设备上报参数主题匹配模式
- private static final Pattern DEV_REPORT_PARAM_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_DEV_PARAM);
-
- // 设备上报跌倒事件主题匹配模式
- private static final Pattern DEV_REPORT_FALL_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_FALL_EVENT);
- // 设备上报点云数据主题匹配模式
- private static final Pattern DEV_CLOUDPOINT_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_CLOUDPOINT);
- // 设备DSP数据主题匹配模式
- private static final Pattern DEV_DSP_DATA_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_DSP_DATA);
- // 小程序跌倒确认主题匹配模式
- private static final Pattern MPS_FALL_EVENT_ACK_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_FALL_EVENT_ACK);
- // 设备重启主题匹配模式
- private static final Pattern DEV_REBOOT_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_DEV_REBOOT);
- /**
- * ⚠️ 已禁用:处理MQTT入站消息
- *
- * 为避免与@MqttSubscriber注解方式产生重复消费,此方法已被注释。
- * 现在统一使用各个Subscriber类处理MQTT消息。
- *
- * 如果需要重新启用,请确保:
- * 1. 移除相应的@MqttSubscriber注解方法
- * 2. 或者为此handler配置不同的MQTT客户端
- */
- /*
- @Bean
- @ServiceActivator(inputChannel = "mqttInputChannel")
- public MessageHandler handleMessage() {
- return new MessageHandler() {
- @Override
- public void handleMessage(Message<?> message) throws MessagingException {
- try {
- String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
- String payload = (String) message.getPayload();
-
- log.debug("Received MQTT message: topic={}, payload={}", topic, payload);
-
- if (topic == null) {
- return;
- }
-
- // 处理不同类型的消息
- if (matchTopic(DEV_LOGIN_PATTERN, topic)) {
- handleDeviceLogin(topic, payload);
- } else if (matchTopic(DEV_KEEPALIVE_PATTERN, topic)) {
- handleDeviceKeepAlive(topic, payload);
- } else if (matchTopic(DEV_REPORT_INFO_PATTERN, topic)) {
- handleDeviceReportInfo(topic, payload);
- } else if (matchTopic(DEV_REPORT_PARAM_PATTERN, topic)) {
- handleDeviceReportParam(topic, payload);
- } else if (matchTopic(DEV_REPORT_FALL_PATTERN, topic)) {
- handleDeviceReportFall(topic, payload);
- } else if (matchTopic(DEV_CLOUDPOINT_PATTERN, topic)) {
- handleDeviceCloudPoint(topic, payload);
- } else if (matchTopic(DEV_DSP_DATA_PATTERN, topic)) {
- handleDeviceDspData(topic, payload);
- } else if (matchTopic(MPS_FALL_EVENT_ACK_PATTERN, topic)) {
- handleFallEventAck(topic, payload);
- } else if (matchTopic(DEV_REBOOT_PATTERN, topic)) {
- handleDeviceReboot(topic, payload);
- }
-
- } catch (Exception e) {
- log.error("Error handling MQTT message: {}", e.getMessage(), e);
- }
- }
- };
- }
- */
-
- /**
- * ⚠️ 以下方法保留用于工具类用途,不再直接处理MQTT消息
- * 如果需要在Subscriber中复用这些逻辑,可以将此类改为@Service
- */
-
- /**
- * 处理设备登录消息
- */
- private void handleDeviceLogin(String topic, String payload) {
- try {
- JSONObject jsonObject = JSON.parseObject(payload);
- Map<String, Object> deviceInfo = (Map<String, Object>) jsonObject.get("device_info");
-
- if (deviceInfo == null) {
- log.warn("Invalid device login message, missing device_info: {}", payload);
- return;
- }
-
- String deviceId = (String) deviceInfo.get("deviceid");
-
- // 委托给应用层服务处理,传入完整的payload作为第三个参数
- Map<String, Object> fullPayload = objectMapper.readValue(payload, HashMap.class);
- deviceEventPort.handleDeviceLogin(deviceId, deviceInfo, fullPayload);
-
- } catch (Exception e) {
- log.error("Error handling device login: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备保活消息
- */
- private void handleDeviceKeepAlive(String topic, String payload) {
- try {
- Matcher matcher = DEV_KEEPALIVE_PATTERN.matcher(topic);
- if (matcher.find()) {
- String deviceId = matcher.group(1);
-
- // 委托给应用层服务处理
- deviceEventPort.handleDeviceKeepAlive(deviceId);
- }
- } catch (Exception e) {
- log.error("Error handling device keepalive: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备上报信息
- *
- * 注意:Python版本中,设备信息上报不会自动发送状态消息
- * 仅在设备状态发生变化(如从离线变为在线)时才发送状态通知
- */
- private void handleDeviceReportInfo(String topic, String payload) {
- try {
- JSONObject message = JSON.parseObject(payload);
-
- if (!message.containsKey("deviceid") ||
- !message.containsKey("device_type") ||
- !message.containsKey("firmware") ||
- !message.containsKey("device_ip")) {
- log.warn("Invalid device info report: {}", payload);
- return;
- }
-
- String deviceId = (String) message.get("deviceid");
- String deviceType = (String) message.get("device_type");
- String software = (String) message.get("firmware");
- String deviceIp = (String) message.get("device_ip");
-
- // 检查设备是否已注册
- Optional<Device> existingDevice = deviceManagerService.getDeviceFromCache(deviceId);
-
- if (existingDevice.isPresent()) {
- Device device = existingDevice.get();
- boolean statusChanged = false;
-
- // 更新设备信息
- device.setDevType(deviceType);
- device.setSoftware(software);
- if (device.getNetwork() != null) {
- device.getNetwork().setIp(deviceIp);
- }
-
- // 仅在设备状态发生变化时才标记需要发送状态消息
- if (device.getOnline() != 1) {
- device.updateOnlineStatus(1);
- device.updateKeepAliveTime(System.currentTimeMillis());
- statusChanged = true;
-
- // 更新数据库
- deviceGateway.updateDeviceOnlineStatus(deviceId, 1);
- }
-
- deviceManagerService.updateDeviceInCache(device);
-
- // 只有在状态发生变化时才发送设备状态通知
- if (statusChanged) {
- mqttGateway.sendDeviceStatusMessage(device);
- log.info("Device status changed to online: {}", deviceId);
- } else {
- log.debug("Device info updated without status change: {}", deviceId);
- }
- }
-
- } catch (Exception e) {
- log.error("Error handling device report info: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备上报参数
- *
- * 注意:Python版本中,参数更新不会自动发送状态消息
- * 仅更新设备信息,不进行额外的消息发送
- */
- private void handleDeviceReportParam(String topic, String payload) {
- try {
- Matcher matcher = DEV_REPORT_PARAM_PATTERN.matcher(topic);
- if (matcher.find()) {
- String deviceId = matcher.group(1);
- JSONObject message = JSON.parseObject(payload);
-
- log.debug("Device param report: {}, payload: {}", deviceId, payload);
-
- // 获取设备参数
- Map<String, Object> deviceParams = (Map<String, Object>) message.get("device_param");
- if (deviceParams != null) {
- Optional<Device> existingDevice = deviceManagerService.getDeviceFromCache(deviceId);
-
- if (existingDevice.isPresent()) {
- Device device = existingDevice.get();
-
- // 更新设备参数
- if (deviceParams.containsKey("mounting_plain")) {
- String mountPlain = (String) deviceParams.get("mounting_plain");
- if (device.getInstallParam() != null) {
- device.getInstallParam().setMountPlain(mountPlain);
- }
- }
-
- if (deviceParams.containsKey("sensor_height")) {
- Float height = ((Number) deviceParams.get("sensor_height")).floatValue();
- if (device.getInstallParam() != null) {
- device.getInstallParam().setHeight(height);
- }
- }
-
- // 更新跟踪区域
- Map<String, Object> trackingRegion = (Map<String, Object>) deviceParams.get("tracking_region");
- if (trackingRegion != null && device.getInstallParam() != null &&
- device.getInstallParam().getTrackingRegion() != null) {
-
- Device.TrackingRegion region = device.getInstallParam().getTrackingRegion();
-
- if (trackingRegion.containsKey("start_x")) {
- region.setStartX(((Number) trackingRegion.get("start_x")).intValue());
- }
- if (trackingRegion.containsKey("start_y")) {
- region.setStartY(((Number) trackingRegion.get("start_y")).intValue());
- }
- if (trackingRegion.containsKey("start_z")) {
- region.setStartZ(((Number) trackingRegion.get("start_z")).intValue());
- }
- if (trackingRegion.containsKey("stop_x")) {
- region.setStopX(((Number) trackingRegion.get("stop_x")).intValue());
- }
- if (trackingRegion.containsKey("stop_y")) {
- region.setStopY(((Number) trackingRegion.get("stop_y")).intValue());
- }
- if (trackingRegion.containsKey("stop_z")) {
- region.setStopZ(((Number) trackingRegion.get("stop_z")).intValue());
- }
- }
-
- // 更新设备缓存
- deviceManagerService.updateDeviceInCache(device);
-
- // 更新数据库
- deviceGateway.saveDevice(device);
-
- // Python版本不在此处发送设备状态消息,仅记录日志
- log.info("Device parameters updated: {}", deviceId);
- }
- }
- }
- } catch (Exception e) {
- log.error("Error handling device report param: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备上报跌倒事件
- */
- private void handleDeviceReportFall(String topic, String payload) {
- try {
- Matcher matcher = DEV_REPORT_FALL_PATTERN.matcher(topic);
- if (matcher.find()) {
- String deviceId = matcher.group(1);
- Map<String, Object> message = objectMapper.readValue(payload, HashMap.class);
-
- log.info("Device fall event: {}, payload: {}", deviceId, payload);
-
- // 获取跌倒事件信息
- String event = (String) message.get("event");
- Integer pose = ((Number) message.get("pose")).intValue();
- List<Number> targetPointList = (List<Number>) message.get("target_point");
-
- // 转换目标点数据
- List<Float> targetPoint = targetPointList.stream()
- .map(number -> number.floatValue())
- .collect(Collectors.toList());
-
- // 委托给应用层服务处理
- // 转换为新的方法签名:handleFallEvent(deviceId, timestamp, type, event, fallLocX, fallLocY, fallLocZ, tarHeightEst)
- Long timestamp = System.currentTimeMillis();
- Float fallLocX = targetPoint.size() > 0 ? targetPoint.get(0) : 0.0f;
- Float fallLocY = targetPoint.size() > 1 ? targetPoint.get(1) : 0.0f;
- Float fallLocZ = targetPoint.size() > 2 ? targetPoint.get(2) : 0.0f;
- Float tarHeightEst = targetPoint.size() > 3 ? targetPoint.get(3) : 0.0f;
-
- deviceEventPort.handleFallEvent(deviceId, timestamp, "fall", event, fallLocX, fallLocY, fallLocZ, tarHeightEst);
-
- // 保存事件到数据库
- if ("fall_confirmed".equals(event)) {
- saveFallEvent(deviceId, pose, targetPoint);
- }
- }
- } catch (Exception e) {
- log.error("Error handling device fall event: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 保存跌倒事件
- */
- private void saveFallEvent(String deviceId, int pose, List<Float> targetPoint) {
- try {
- // 创建跌倒事件记录
- FallEvent fallEvent = new FallEvent();
- fallEvent.setDevId(deviceId);
- fallEvent.setEventTime(new Date());
- fallEvent.setEventType("fall");
- fallEvent.setPose(pose);
-
- // 设置目标点坐标
- if (targetPoint != null && targetPoint.size() >= 3) {
- fallEvent.setTargetX(targetPoint.get(0));
- fallEvent.setTargetY(targetPoint.get(1));
- fallEvent.setTargetZ(targetPoint.get(2));
- }
-
- // 设置状态为未处理
- fallEvent.setStatus(0);
-
- // 保存到数据库
- fallEventMapper.insert(fallEvent);
-
- log.info("Fall event saved: {}", deviceId);
- } catch (Exception e) {
- log.error("Error saving fall event: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备上报点云数据
- *
- * 注意:Python版本中,点云数据的处理更加谨慎
- * 不会自动发送实时姿态消息,仅进行数据记录和处理
- */
- private void handleDeviceCloudPoint(String topic, String payload) {
- try {
- Matcher matcher = DEV_CLOUDPOINT_PATTERN.matcher(topic);
- if (matcher.find()) {
- String deviceId = matcher.group(1);
- Map<String, Object> message = objectMapper.readValue(payload, HashMap.class);
-
- log.debug("Device cloud point: {}", deviceId);
-
- // 获取点云数据
- List<List<Number>> pointCloud = (List<List<Number>>) message.get("point_cloud");
-
- // Python版本中,点云数据主要用于算法处理
- // 不会自动转发实时姿态消息,仅进行内部处理
- // 这里可以添加算法处理,如姿态识别等
-
- // 注释掉自动转发功能,避免重复消息
- // mqttGateway.sendRealtimePoseMessage(deviceId,
- // DeviceConstants.PoseEnum.POSE_STANDING.getCode(), // 默认姿态
- // pointCloud.get(0)); // 使用第一个点作为目标点
-
- log.debug("Cloud point data processed for device: {}", deviceId);
- }
- } catch (Exception e) {
- log.error("Error handling device cloud point: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备DSP数据
- */
- private void handleDeviceDspData(String topic, String payload) {
- try {
- Matcher matcher = DEV_DSP_DATA_PATTERN.matcher(topic);
- if (matcher.find()) {
- String deviceId = matcher.group(1);
-
- log.debug("Device DSP data: {}", deviceId);
-
- // 这里可以添加DSP数据处理逻辑
- // Python版本中,DSP数据主要用于内部算法处理
- }
- } catch (Exception e) {
- log.error("Error handling device DSP data: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理跌倒事件确认
- */
- private void handleFallEventAck(String topic, String payload) {
- try {
- Map<String, Object> message = objectMapper.readValue(payload, HashMap.class);
-
- String deviceId = (String) message.get("deviceId");
- Long eventId = ((Number) message.get("eventId")).longValue();
-
- // 委托给应用层服务处理
- deviceEventPort.handleAlarmAck(deviceId, eventId);
-
- } catch (Exception e) {
- log.error("Error handling fall event acknowledgement: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 处理设备重启命令
- */
- private void handleDeviceReboot(String topic, String payload) {
- try {
- Matcher matcher = DEV_REBOOT_PATTERN.matcher(topic);
- if (matcher.find()) {
- String deviceId = matcher.group(1);
-
- log.info("Device reboot command: {}", deviceId);
-
- // 向设备发送重启命令
- mqttGateway.sendDeviceRebootCommand(deviceId);
- }
- } catch (Exception e) {
- log.error("Error handling device reboot command: {}", e.getMessage(), e);
- }
- }
-
- /**
- * 匹配主题
- */
- private boolean matchTopic(Pattern pattern, String topic) {
- return pattern.matcher(topic).matches();
- }
-
- /**
- * 从主题中提取设备ID
- */
- private String extractDeviceId(Pattern pattern, String topic) {
- Matcher matcher = pattern.matcher(topic);
- if (matcher.find()) {
- return matcher.group(1);
- }
- return null;
- }
- }
|