MqttMessageHandler.java 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. package com.hfln.device.infrastructure.mqtt;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.hfln.device.domain.constant.DeviceConstants;
  6. import com.hfln.device.domain.entity.Device;
  7. import com.hfln.device.domain.gateway.DeviceGateway;
  8. import com.hfln.device.domain.gateway.MqttGateway;
  9. import com.hfln.device.domain.service.DeviceManagerService;
  10. import com.hfln.device.domain.port.DeviceEventPort;
  11. import com.hfln.device.infrastructure.mapper.FallEventMapper;
  12. import com.hfln.device.infrastructure.po.FallEvent;
  13. import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.context.annotation.Bean;
  17. import org.springframework.integration.annotation.ServiceActivator;
  18. import org.springframework.messaging.Message;
  19. import org.springframework.messaging.MessageHandler;
  20. import org.springframework.messaging.MessagingException;
  21. import org.springframework.stereotype.Component;
  22. import java.util.Date;
  23. import java.util.HashMap;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Optional;
  27. import java.util.regex.Matcher;
  28. import java.util.regex.Pattern;
  29. import java.util.stream.Collectors;
  30. import java.time.LocalDateTime;
  31. /**
  32. * MQTT消息处理器
  33. *
  34. * ⚠️ 注意:为避免与@MqttSubscriber注解方式产生重复消费,
  35. * 此类中的@ServiceActivator方法已被禁用。
  36. *
  37. * 现在统一使用各个Subscriber类处理MQTT消息:
  38. * - DeviceMessageSubscriber: 处理设备相关消息
  39. * - MpsMessageSubscriber: 处理小程序消息
  40. * - AppMessageSubscriber: 处理应用消息
  41. * - OpcMessageSubscriber: 处理OPC消息
  42. */
  43. @Component
  44. @Slf4j
  45. public class MqttMessageHandler {
  46. @Autowired
  47. private DeviceManagerService deviceManagerService;
  48. @Autowired
  49. private DeviceGateway deviceGateway;
  50. @Autowired
  51. private MqttGateway mqttGateway;
  52. @Autowired
  53. private FallEventMapper fallEventMapper;
  54. @Autowired
  55. private DeviceEventPort deviceEventPort;
  56. private final ObjectMapper objectMapper = new ObjectMapper();
  57. // 设备登录主题匹配模式
  58. private static final Pattern DEV_LOGIN_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_LOGIN);
  59. // 设备保活主题匹配模式
  60. private static final Pattern DEV_KEEPALIVE_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_KEEPALIVE);
  61. // 设备上报信息主题匹配模式
  62. private static final Pattern DEV_REPORT_INFO_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_DEV_INFO);
  63. // 设备上报参数主题匹配模式
  64. private static final Pattern DEV_REPORT_PARAM_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_DEV_PARAM);
  65. // 设备上报跌倒事件主题匹配模式
  66. private static final Pattern DEV_REPORT_FALL_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_REP_FALL_EVENT);
  67. // 设备上报点云数据主题匹配模式
  68. private static final Pattern DEV_CLOUDPOINT_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_CLOUDPOINT);
  69. // 设备DSP数据主题匹配模式
  70. private static final Pattern DEV_DSP_DATA_PATTERN = Pattern.compile(MqttTopics.Pattern.DEV_DSP_DATA);
  71. // 小程序跌倒确认主题匹配模式
  72. private static final Pattern MPS_FALL_EVENT_ACK_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_FALL_EVENT_ACK);
  73. // 设备重启主题匹配模式
  74. private static final Pattern DEV_REBOOT_PATTERN = Pattern.compile(MqttTopics.Pattern.MPS_DEV_REBOOT);
  75. /**
  76. * ⚠️ 已禁用:处理MQTT入站消息
  77. *
  78. * 为避免与@MqttSubscriber注解方式产生重复消费,此方法已被注释。
  79. * 现在统一使用各个Subscriber类处理MQTT消息。
  80. *
  81. * 如果需要重新启用,请确保:
  82. * 1. 移除相应的@MqttSubscriber注解方法
  83. * 2. 或者为此handler配置不同的MQTT客户端
  84. */
  85. /*
  86. @Bean
  87. @ServiceActivator(inputChannel = "mqttInputChannel")
  88. public MessageHandler handleMessage() {
  89. return new MessageHandler() {
  90. @Override
  91. public void handleMessage(Message<?> message) throws MessagingException {
  92. try {
  93. String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
  94. String payload = (String) message.getPayload();
  95. log.debug("Received MQTT message: topic={}, payload={}", topic, payload);
  96. if (topic == null) {
  97. return;
  98. }
  99. // 处理不同类型的消息
  100. if (matchTopic(DEV_LOGIN_PATTERN, topic)) {
  101. handleDeviceLogin(topic, payload);
  102. } else if (matchTopic(DEV_KEEPALIVE_PATTERN, topic)) {
  103. handleDeviceKeepAlive(topic, payload);
  104. } else if (matchTopic(DEV_REPORT_INFO_PATTERN, topic)) {
  105. handleDeviceReportInfo(topic, payload);
  106. } else if (matchTopic(DEV_REPORT_PARAM_PATTERN, topic)) {
  107. handleDeviceReportParam(topic, payload);
  108. } else if (matchTopic(DEV_REPORT_FALL_PATTERN, topic)) {
  109. handleDeviceReportFall(topic, payload);
  110. } else if (matchTopic(DEV_CLOUDPOINT_PATTERN, topic)) {
  111. handleDeviceCloudPoint(topic, payload);
  112. } else if (matchTopic(DEV_DSP_DATA_PATTERN, topic)) {
  113. handleDeviceDspData(topic, payload);
  114. } else if (matchTopic(MPS_FALL_EVENT_ACK_PATTERN, topic)) {
  115. handleFallEventAck(topic, payload);
  116. } else if (matchTopic(DEV_REBOOT_PATTERN, topic)) {
  117. handleDeviceReboot(topic, payload);
  118. }
  119. } catch (Exception e) {
  120. log.error("Error handling MQTT message: {}", e.getMessage(), e);
  121. }
  122. }
  123. };
  124. }
  125. */
  126. /**
  127. * ⚠️ 以下方法保留用于工具类用途,不再直接处理MQTT消息
  128. * 如果需要在Subscriber中复用这些逻辑,可以将此类改为@Service
  129. */
  130. /**
  131. * 处理设备登录消息
  132. */
  133. private void handleDeviceLogin(String topic, String payload) {
  134. try {
  135. JSONObject jsonObject = JSON.parseObject(payload);
  136. Map<String, Object> deviceInfo = (Map<String, Object>) jsonObject.get("device_info");
  137. if (deviceInfo == null) {
  138. log.warn("Invalid device login message, missing device_info: {}", payload);
  139. return;
  140. }
  141. String deviceId = (String) deviceInfo.get("deviceid");
  142. // 委托给应用层服务处理,传入完整的payload作为第三个参数
  143. Map<String, Object> fullPayload = objectMapper.readValue(payload, HashMap.class);
  144. deviceEventPort.handleDeviceLogin(deviceId, deviceInfo, fullPayload);
  145. } catch (Exception e) {
  146. log.error("Error handling device login: {}", e.getMessage(), e);
  147. }
  148. }
  149. /**
  150. * 处理设备保活消息
  151. */
  152. private void handleDeviceKeepAlive(String topic, String payload) {
  153. try {
  154. Matcher matcher = DEV_KEEPALIVE_PATTERN.matcher(topic);
  155. if (matcher.find()) {
  156. String deviceId = matcher.group(1);
  157. // 委托给应用层服务处理
  158. deviceEventPort.handleDeviceKeepAlive(deviceId);
  159. }
  160. } catch (Exception e) {
  161. log.error("Error handling device keepalive: {}", e.getMessage(), e);
  162. }
  163. }
  164. /**
  165. * 处理设备上报信息
  166. *
  167. * 注意:Python版本中,设备信息上报不会自动发送状态消息
  168. * 仅在设备状态发生变化(如从离线变为在线)时才发送状态通知
  169. */
  170. private void handleDeviceReportInfo(String topic, String payload) {
  171. try {
  172. JSONObject message = JSON.parseObject(payload);
  173. if (!message.containsKey("deviceid") ||
  174. !message.containsKey("device_type") ||
  175. !message.containsKey("firmware") ||
  176. !message.containsKey("device_ip")) {
  177. log.warn("Invalid device info report: {}", payload);
  178. return;
  179. }
  180. String deviceId = (String) message.get("deviceid");
  181. String deviceType = (String) message.get("device_type");
  182. String software = (String) message.get("firmware");
  183. String deviceIp = (String) message.get("device_ip");
  184. // 检查设备是否已注册
  185. Optional<Device> existingDevice = deviceManagerService.getDeviceFromCache(deviceId);
  186. if (existingDevice.isPresent()) {
  187. Device device = existingDevice.get();
  188. boolean statusChanged = false;
  189. // 更新设备信息
  190. device.setDevType(deviceType);
  191. device.setSoftware(software);
  192. if (device.getNetwork() != null) {
  193. device.getNetwork().setIp(deviceIp);
  194. }
  195. // 仅在设备状态发生变化时才标记需要发送状态消息
  196. if (device.getOnline() != 1) {
  197. device.updateOnlineStatus(1);
  198. device.updateKeepAliveTime(System.currentTimeMillis());
  199. statusChanged = true;
  200. // 更新数据库
  201. deviceGateway.updateDeviceOnlineStatus(deviceId, 1);
  202. }
  203. deviceManagerService.updateDeviceInCache(device);
  204. // 只有在状态发生变化时才发送设备状态通知
  205. if (statusChanged) {
  206. mqttGateway.sendDeviceStatusMessage(device);
  207. log.info("Device status changed to online: {}", deviceId);
  208. } else {
  209. log.debug("Device info updated without status change: {}", deviceId);
  210. }
  211. }
  212. } catch (Exception e) {
  213. log.error("Error handling device report info: {}", e.getMessage(), e);
  214. }
  215. }
  216. /**
  217. * 处理设备上报参数
  218. *
  219. * 注意:Python版本中,参数更新不会自动发送状态消息
  220. * 仅更新设备信息,不进行额外的消息发送
  221. */
  222. private void handleDeviceReportParam(String topic, String payload) {
  223. try {
  224. Matcher matcher = DEV_REPORT_PARAM_PATTERN.matcher(topic);
  225. if (matcher.find()) {
  226. String deviceId = matcher.group(1);
  227. JSONObject message = JSON.parseObject(payload);
  228. log.debug("Device param report: {}, payload: {}", deviceId, payload);
  229. // 获取设备参数
  230. Map<String, Object> deviceParams = (Map<String, Object>) message.get("device_param");
  231. if (deviceParams != null) {
  232. Optional<Device> existingDevice = deviceManagerService.getDeviceFromCache(deviceId);
  233. if (existingDevice.isPresent()) {
  234. Device device = existingDevice.get();
  235. // 更新设备参数
  236. if (deviceParams.containsKey("mounting_plain")) {
  237. String mountPlain = (String) deviceParams.get("mounting_plain");
  238. if (device.getInstallParam() != null) {
  239. device.getInstallParam().setMountPlain(mountPlain);
  240. }
  241. }
  242. if (deviceParams.containsKey("sensor_height")) {
  243. Float height = ((Number) deviceParams.get("sensor_height")).floatValue();
  244. if (device.getInstallParam() != null) {
  245. device.getInstallParam().setHeight(height);
  246. }
  247. }
  248. // 更新跟踪区域
  249. Map<String, Object> trackingRegion = (Map<String, Object>) deviceParams.get("tracking_region");
  250. if (trackingRegion != null && device.getInstallParam() != null &&
  251. device.getInstallParam().getTrackingRegion() != null) {
  252. Device.TrackingRegion region = device.getInstallParam().getTrackingRegion();
  253. if (trackingRegion.containsKey("start_x")) {
  254. region.setStartX(((Number) trackingRegion.get("start_x")).intValue());
  255. }
  256. if (trackingRegion.containsKey("start_y")) {
  257. region.setStartY(((Number) trackingRegion.get("start_y")).intValue());
  258. }
  259. if (trackingRegion.containsKey("start_z")) {
  260. region.setStartZ(((Number) trackingRegion.get("start_z")).intValue());
  261. }
  262. if (trackingRegion.containsKey("stop_x")) {
  263. region.setStopX(((Number) trackingRegion.get("stop_x")).intValue());
  264. }
  265. if (trackingRegion.containsKey("stop_y")) {
  266. region.setStopY(((Number) trackingRegion.get("stop_y")).intValue());
  267. }
  268. if (trackingRegion.containsKey("stop_z")) {
  269. region.setStopZ(((Number) trackingRegion.get("stop_z")).intValue());
  270. }
  271. }
  272. // 更新设备缓存
  273. deviceManagerService.updateDeviceInCache(device);
  274. // 更新数据库
  275. deviceGateway.saveDevice(device);
  276. // Python版本不在此处发送设备状态消息,仅记录日志
  277. log.info("Device parameters updated: {}", deviceId);
  278. }
  279. }
  280. }
  281. } catch (Exception e) {
  282. log.error("Error handling device report param: {}", e.getMessage(), e);
  283. }
  284. }
  285. /**
  286. * 处理设备上报跌倒事件
  287. */
  288. private void handleDeviceReportFall(String topic, String payload) {
  289. try {
  290. Matcher matcher = DEV_REPORT_FALL_PATTERN.matcher(topic);
  291. if (matcher.find()) {
  292. String deviceId = matcher.group(1);
  293. Map<String, Object> message = objectMapper.readValue(payload, HashMap.class);
  294. log.info("Device fall event: {}, payload: {}", deviceId, payload);
  295. // 获取跌倒事件信息
  296. String event = (String) message.get("event");
  297. Integer pose = ((Number) message.get("pose")).intValue();
  298. List<Number> targetPointList = (List<Number>) message.get("target_point");
  299. // 转换目标点数据
  300. List<Float> targetPoint = targetPointList.stream()
  301. .map(number -> number.floatValue())
  302. .collect(Collectors.toList());
  303. // 委托给应用层服务处理
  304. // 转换为新的方法签名:handleFallEvent(deviceId, timestamp, type, event, fallLocX, fallLocY, fallLocZ, tarHeightEst)
  305. Long timestamp = System.currentTimeMillis();
  306. Float fallLocX = targetPoint.size() > 0 ? targetPoint.get(0) : 0.0f;
  307. Float fallLocY = targetPoint.size() > 1 ? targetPoint.get(1) : 0.0f;
  308. Float fallLocZ = targetPoint.size() > 2 ? targetPoint.get(2) : 0.0f;
  309. Float tarHeightEst = targetPoint.size() > 3 ? targetPoint.get(3) : 0.0f;
  310. deviceEventPort.handleFallEvent(deviceId, timestamp, "fall", event, fallLocX, fallLocY, fallLocZ, tarHeightEst);
  311. // 保存事件到数据库
  312. if ("fall_confirmed".equals(event)) {
  313. saveFallEvent(deviceId, pose, targetPoint);
  314. }
  315. }
  316. } catch (Exception e) {
  317. log.error("Error handling device fall event: {}", e.getMessage(), e);
  318. }
  319. }
  320. /**
  321. * 保存跌倒事件
  322. */
  323. private void saveFallEvent(String deviceId, int pose, List<Float> targetPoint) {
  324. try {
  325. // 创建跌倒事件记录
  326. FallEvent fallEvent = new FallEvent();
  327. fallEvent.setDevId(deviceId);
  328. fallEvent.setEventTime(new Date());
  329. fallEvent.setEventType("fall");
  330. fallEvent.setPose(pose);
  331. // 设置目标点坐标
  332. if (targetPoint != null && targetPoint.size() >= 3) {
  333. fallEvent.setTargetX(targetPoint.get(0));
  334. fallEvent.setTargetY(targetPoint.get(1));
  335. fallEvent.setTargetZ(targetPoint.get(2));
  336. }
  337. // 设置状态为未处理
  338. fallEvent.setStatus(0);
  339. // 保存到数据库
  340. fallEventMapper.insert(fallEvent);
  341. log.info("Fall event saved: {}", deviceId);
  342. } catch (Exception e) {
  343. log.error("Error saving fall event: {}", e.getMessage(), e);
  344. }
  345. }
  346. /**
  347. * 处理设备上报点云数据
  348. *
  349. * 注意:Python版本中,点云数据的处理更加谨慎
  350. * 不会自动发送实时姿态消息,仅进行数据记录和处理
  351. */
  352. private void handleDeviceCloudPoint(String topic, String payload) {
  353. try {
  354. Matcher matcher = DEV_CLOUDPOINT_PATTERN.matcher(topic);
  355. if (matcher.find()) {
  356. String deviceId = matcher.group(1);
  357. Map<String, Object> message = objectMapper.readValue(payload, HashMap.class);
  358. log.debug("Device cloud point: {}", deviceId);
  359. // 获取点云数据
  360. List<List<Number>> pointCloud = (List<List<Number>>) message.get("point_cloud");
  361. // Python版本中,点云数据主要用于算法处理
  362. // 不会自动转发实时姿态消息,仅进行内部处理
  363. // 这里可以添加算法处理,如姿态识别等
  364. // 注释掉自动转发功能,避免重复消息
  365. // mqttGateway.sendRealtimePoseMessage(deviceId,
  366. // DeviceConstants.PoseEnum.POSE_STANDING.getCode(), // 默认姿态
  367. // pointCloud.get(0)); // 使用第一个点作为目标点
  368. log.debug("Cloud point data processed for device: {}", deviceId);
  369. }
  370. } catch (Exception e) {
  371. log.error("Error handling device cloud point: {}", e.getMessage(), e);
  372. }
  373. }
  374. /**
  375. * 处理设备DSP数据
  376. */
  377. private void handleDeviceDspData(String topic, String payload) {
  378. try {
  379. Matcher matcher = DEV_DSP_DATA_PATTERN.matcher(topic);
  380. if (matcher.find()) {
  381. String deviceId = matcher.group(1);
  382. log.debug("Device DSP data: {}", deviceId);
  383. // 这里可以添加DSP数据处理逻辑
  384. // Python版本中,DSP数据主要用于内部算法处理
  385. }
  386. } catch (Exception e) {
  387. log.error("Error handling device DSP data: {}", e.getMessage(), e);
  388. }
  389. }
  390. /**
  391. * 处理跌倒事件确认
  392. */
  393. private void handleFallEventAck(String topic, String payload) {
  394. try {
  395. Map<String, Object> message = objectMapper.readValue(payload, HashMap.class);
  396. String deviceId = (String) message.get("deviceId");
  397. Long eventId = ((Number) message.get("eventId")).longValue();
  398. // 委托给应用层服务处理
  399. deviceEventPort.handleAlarmAck(deviceId, eventId);
  400. } catch (Exception e) {
  401. log.error("Error handling fall event acknowledgement: {}", e.getMessage(), e);
  402. }
  403. }
  404. /**
  405. * 处理设备重启命令
  406. */
  407. private void handleDeviceReboot(String topic, String payload) {
  408. try {
  409. Matcher matcher = DEV_REBOOT_PATTERN.matcher(topic);
  410. if (matcher.find()) {
  411. String deviceId = matcher.group(1);
  412. log.info("Device reboot command: {}", deviceId);
  413. // 向设备发送重启命令
  414. mqttGateway.sendDeviceRebootCommand(deviceId);
  415. }
  416. } catch (Exception e) {
  417. log.error("Error handling device reboot command: {}", e.getMessage(), e);
  418. }
  419. }
  420. /**
  421. * 匹配主题
  422. */
  423. private boolean matchTopic(Pattern pattern, String topic) {
  424. return pattern.matcher(topic).matches();
  425. }
  426. /**
  427. * 从主题中提取设备ID
  428. */
  429. private String extractDeviceId(Pattern pattern, String topic) {
  430. Matcher matcher = pattern.matcher(topic);
  431. if (matcher.find()) {
  432. return matcher.group(1);
  433. }
  434. return null;
  435. }
  436. }