MqttGatewayImpl.java 36 KB


  1. package com.hfln.device.infrastructure.gateway;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
  5. import com.hfln.device.domain.constant.DeviceConstants;
  6. import com.hfln.device.domain.entity.Device;
  7. import com.hfln.device.domain.gateway.MqttGateway;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.jetbrains.annotations.NotNull;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.context.annotation.Primary;
  12. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  13. import org.springframework.integration.mqtt.support.MqttHeaders;
  14. import org.springframework.messaging.Message;
  15. import org.springframework.messaging.support.MessageBuilder;
  16. import org.springframework.stereotype.Component;
  17. import java.util.ArrayList;
  18. import java.util.HashMap;
  19. import java.util.List;
  20. import java.util.Map;
  21. /**
  22. * MQTT网关实现类 - 基于Spring Integration MQTT
  23. * 合并了原MqttGatewayDefaultImpl的优秀特性
  24. */
  25. @Component
  26. @Primary
  27. @Slf4j
  28. public class MqttGatewayImpl implements MqttGateway {
  29. @Autowired
  30. private MqttPahoMessageHandler mqttOutbound;
  31. @Autowired
  32. private ObjectMapper objectMapper;
  33. @Override
  34. public void initialize() {
  35. log.info("MQTT Gateway initialized with Spring Integration");
  36. }
  37. @Override
  38. public void publish(String topic, Object payload) {
  39. publish(topic, payload, 0, false);
  40. }
  41. @Override
  42. public void publish(String topic, Object payload, int qos, boolean retain) {
  43. try {
  44. String jsonPayload = objectMapper.writeValueAsString(payload);
  45. sendToMqtt(topic, jsonPayload, qos, retain);
  46. } catch (JsonProcessingException e) {
  47. log.error("Failed to serialize payload for topic: {}, error: {}", topic, e.getMessage());
  48. }
  49. }
  50. @Override
  51. public void sendMessage(String topic, String message) {
  52. sendMessage(topic, message, 0, false);
  53. }
  54. @Override
  55. public void sendMessage(String topic, String message, int qos, boolean retain) {
  56. sendToMqtt(topic, message, qos, retain);
  57. }
  58. @Override
  59. public void publishJson(String topic, Object payload) {
  60. publishJson(topic, payload, 0, false);
  61. }
  62. @Override
  63. public void publishJson(String topic, Object payload, int qos, boolean retain) {
  64. try {
  65. String jsonPayload = objectMapper.writeValueAsString(payload);
  66. sendToMqtt(topic, jsonPayload, qos, retain);
  67. } catch (JsonProcessingException e) {
  68. log.error("Failed to serialize JSON payload for topic: {}, error: {}", topic, e.getMessage());
  69. }
  70. }
  71. @Override
  72. public void sendSync(String topic, Object payload) throws Exception {
  73. String jsonPayload = objectMapper.writeValueAsString(payload);
  74. sendToMqtt(topic, jsonPayload, 0, false);
  75. }
  76. @Override
  77. public void subscribe(String topic, int qos) {
  78. log.info("Subscription managed by Spring Integration configuration for topic: {}", topic);
  79. }
  80. @Override
  81. public void unsubscribe(String topic) {
  82. log.info("Unsubscription managed by Spring Integration configuration for topic: {}", topic);
  83. }
  84. @Override
  85. public boolean isConnected() {
  86. return true; // Spring Integration handles connection management
  87. }
  88. @Override
  89. public void disconnect() {
  90. log.info("Disconnect managed by Spring Integration");
  91. }
  92. @Override
  93. public void sendDeviceStatusMessage(Device device) {
  94. try {
  95. Map<String, Object> message = new HashMap<>();
  96. message.put("message", "notify");
  97. message.put("timestamp", System.currentTimeMillis());
  98. message.put("dev_id", device.getDevId());
  99. message.put("online", device.getOnline());
  100. message.put("dev_type", device.getDevType());
  101. message.put("software", device.getSoftware());
  102. message.put("hardware", device.getHardware());
  103. // 网络信息
  104. if (device.getNetwork() != null) {
  105. Map<String, Object> network = new HashMap<>();
  106. network.put("ssid", device.getNetwork().getSsid());
  107. network.put("password", device.getNetwork().getPassword());
  108. network.put("ip", device.getNetwork().getIp());
  109. message.put("network", network);
  110. }
  111. // 雷达参数
  112. if (device.getInstallParam() != null) {
  113. Map<String, Object> radarParam = new HashMap<>();
  114. radarParam.put("mount_plain", device.getInstallParam().getMountPlain());
  115. radarParam.put("height", device.getInstallParam().getHeight());
  116. if (device.getInstallParam().getTrackingRegion() != null) {
  117. Map<String, Object> trackingRegion = new HashMap<>();
  118. trackingRegion.put("start_x", device.getInstallParam().getTrackingRegion().getStartX());
  119. trackingRegion.put("start_y", device.getInstallParam().getTrackingRegion().getStartY());
  120. trackingRegion.put("start_z", device.getInstallParam().getTrackingRegion().getStartZ());
  121. trackingRegion.put("stop_x", device.getInstallParam().getTrackingRegion().getStopX());
  122. trackingRegion.put("stop_y", device.getInstallParam().getTrackingRegion().getStopY());
  123. trackingRegion.put("stop_z", device.getInstallParam().getTrackingRegion().getStopZ());
  124. radarParam.put("tracking_region", trackingRegion);
  125. }
  126. message.put("radar_param", radarParam);
  127. }
  128. String topic = "/das/status";
  129. publishJson(topic, message, 1, false);
  130. log.debug("发送设备状态消息: deviceId={}, topic={}", device.getDevId(), topic);
  131. } catch (Exception e) {
  132. log.error("发送设备状态消息失败: deviceId={}, error={}", device.getDevId(), e.getMessage(), e);
  133. }
  134. }
  135. /**
  136. * 发送设备信息更新通知
  137. * 对应Python版本的mqtt_send.update_dev_info_msg(device)
  138. */
  139. @Override
  140. public void sendDeviceInfoUpdateNotification(Device device) {
  141. try {
  142. Map<String, Object> message = new HashMap<>();
  143. message.put("dev_id", device.getDevId());
  144. message.put("dev_type", device.getDevType());
  145. message.put("software", device.getSoftware());
  146. message.put("hardware", device.getHardware());
  147. message.put("online", device.getOnline());
  148. if (device.getNetwork() != null) {
  149. Map<String, Object> network = new HashMap<>();
  150. network.put("ssid", device.getNetwork().getSsid());
  151. network.put("ip", device.getNetwork().getIp());
  152. message.put("network", network);
  153. }
  154. if (device.getInstallParam() != null) {
  155. Map<String, Object> installParam = new HashMap<>();
  156. installParam.put("mount_plain", device.getInstallParam().getMountPlain());
  157. installParam.put("height", device.getInstallParam().getHeight());
  158. installParam.put("is_ceiling", device.getInstallParam().getIsCeiling());
  159. if (device.getInstallParam().getTrackingRegion() != null) {
  160. Map<String, Object> trackingRegion = new HashMap<>();
  161. trackingRegion.put("start_x", device.getInstallParam().getTrackingRegion().getStartX());
  162. trackingRegion.put("start_y", device.getInstallParam().getTrackingRegion().getStartY());
  163. trackingRegion.put("start_z", device.getInstallParam().getTrackingRegion().getStartZ());
  164. trackingRegion.put("stop_x", device.getInstallParam().getTrackingRegion().getStopX());
  165. trackingRegion.put("stop_y", device.getInstallParam().getTrackingRegion().getStopY());
  166. trackingRegion.put("stop_z", device.getInstallParam().getTrackingRegion().getStopZ());
  167. installParam.put("tracking_region", trackingRegion);
  168. }
  169. message.put("install_param", installParam);
  170. }
  171. String topic = "/mps/update_dev_info";
  172. publishJson(topic, message);
  173. log.info("Device info update notification sent: deviceId={}", device.getDevId());
  174. } catch (Exception e) {
  175. log.error("Failed to send device info update notification: deviceId={}, error={}",
  176. device.getDevId(), e.getMessage(), e);
  177. }
  178. }
  179. @Override
  180. public void sendRealtimePoseMessage(String deviceId, int pose, Object targetPoint) {
  181. try {
  182. Map<String, Object> payload = new HashMap<>();
  183. payload.put("message", "notify");
  184. payload.put("message_type", DeviceConstants.MessageType.MSG_REALTIME_TARGET.getCode());
  185. payload.put("timestamp", System.currentTimeMillis());
  186. payload.put("dev_id", deviceId);
  187. payload.put("pose", pose);
  188. payload.put("target_point", targetPoint);
  189. sendMessage(MqttTopics.DAS_REALTIME_POS, payload);
  190. } catch (Exception e) {
  191. log.error("Error sending realtime pose message: {}", deviceId, e);
  192. }
  193. }
  194. @Override
  195. public void sendAlarmMessage(String deviceId, String alarmType, Map<String, Object> data) {
  196. try {
  197. Map<String, Object> payload = new HashMap<>(data);
  198. payload.put("message", "notify");
  199. payload.put("message_type", DeviceConstants.MessageType.MSG_ALARM_EVENT.getCode());
  200. payload.put("dev_id", deviceId);
  201. payload.put("timestamp", System.currentTimeMillis());
  202. payload.put("alarmType", alarmType);
  203. sendMessage(MqttTopics.DAS_ALARM_EVENT, payload);
  204. log.info("Alarm message sent: {}, type: {}", deviceId, alarmType);
  205. } catch (Exception e) {
  206. log.error("Error sending alarm message: {}, type: {}", deviceId, alarmType, e);
  207. }
  208. }
  209. @Override
  210. public void sendBehaviorAnalysisResult(String deviceId, Object behaviorPattern) {
  211. try {
  212. Map<String, Object> payload = new HashMap<>();
  213. payload.put("message", "notify");
  214. payload.put("dev_id", deviceId);
  215. payload.put("behaviorPattern", behaviorPattern);
  216. payload.put("timestamp", System.currentTimeMillis());
  217. sendMessage(MqttTopics.DAS_BEHAVIOR_ANALYSIS, payload);
  218. log.debug("Behavior analysis result sent: {}", deviceId);
  219. } catch (Exception e) {
  220. log.error("Error sending behavior analysis result: {}", deviceId, e);
  221. }
  222. }
  223. @Override
  224. public boolean sendCommandToDevice(String deviceId, String command, Object payload) {
  225. try {
  226. Map<String, Object> message = new HashMap<>();
  227. message.put("command", command);
  228. message.put("payload", payload);
  229. message.put("timestamp", System.currentTimeMillis());
  230. String topic = String.format(MqttTopics.DEV_COMMAND, deviceId);
  231. sendMessage(topic, message);
  232. log.info("Command sent to device: {}, command: {}", deviceId, command);
  233. return true;
  234. } catch (Exception e) {
  235. log.error("Failed to send command to device: {}, command: {}, error: {}", deviceId, command, e.getMessage());
  236. return false;
  237. }
  238. }
  239. @Override
  240. public void sendFallAlarmMessage(String deviceId, int pose, List<Float> targetPoint) {
  241. try {
  242. Map<String, Object> payload = new HashMap<>();
  243. payload.put("message", "notify");
  244. payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_FALL.getCode());
  245. payload.put("dev_id", deviceId);
  246. payload.put("pose", pose);
  247. payload.put("target_point", targetPoint);
  248. payload.put("alarmType", "fall");
  249. payload.put("timestamp", System.currentTimeMillis());
  250. // 跌倒告警使用QoS 2确保可靠传输
  251. sendMessage(MqttTopics.DAS_ALARM_EVENT, payload, 2);
  252. log.info("Fall alarm message sent: {}", deviceId);
  253. } catch (Exception e) {
  254. log.error("Error sending fall alarm message: {}", deviceId, e);
  255. }
  256. }
  257. @Override
  258. public void sendDeviceRebootCommand(String deviceId) {
  259. try {
  260. Map<String, Object> payload = new HashMap<>();
  261. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/reboot";
  262. sendMessage(topic, payload);
  263. log.info("Device reboot command sent: {}", deviceId);
  264. } catch (Exception e) {
  265. log.error("Error sending device reboot command: {}", deviceId, e);
  266. }
  267. }
  268. @Override
  269. public void sendDeviceResetCommand(String deviceId) {
  270. try {
  271. Map<String, Object> payload = new HashMap<>();
  272. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/reset";
  273. sendMessage(topic, payload);
  274. log.info("Device reset command sent: {}", deviceId);
  275. } catch (Exception e) {
  276. log.error("Error sending device reset command: {}", deviceId, e);
  277. }
  278. }
  279. @Override
  280. public void sendDeviceCommand(String deviceId, String command, Map<String, Object> params) {
  281. try {
  282. Map<String, Object> payload = new HashMap<>();
  283. payload.put("command", command);
  284. payload.put("params", params);
  285. payload.put("timestamp", System.currentTimeMillis());
  286. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/" + command;
  287. sendMessage(topic, payload);
  288. log.info("Device command sent: {}, command: {}", deviceId, command);
  289. } catch (Exception e) {
  290. log.error("Error sending device command: {}, command: {}", deviceId, command, e);
  291. }
  292. }
  293. @Override
  294. public void sendDeviceKeepAliveResponse(String deviceId, int status) {
  295. try {
  296. Map<String, Object> payload = new HashMap<>();
  297. payload.put("code", status);
  298. String topic = DeviceConstants.MqttConstant.TOPIC_DAS_PREFIX + deviceId + "/keepalive";
  299. sendMessage(topic, payload);
  300. log.debug("Device keepalive response sent: {}, code: {}", deviceId, status);
  301. } catch (Exception e) {
  302. log.error("Error sending device keepalive response: {}", deviceId, e);
  303. }
  304. }
  305. @Override
  306. public void sendDeviceNotFoundResponse(String deviceId) {
  307. try {
  308. Map<String, Object> payload = new HashMap<>();
  309. payload.put("code", 404);
  310. payload.put("message", "Device not found");
  311. String topic = MqttTopics.APP_DEVICE_INFO_RESPONSE;
  312. sendMessage(topic, payload);
  313. log.debug("Device not found response sent: {}", deviceId);
  314. } catch (Exception e) {
  315. log.error("Error sending device not found response: {}", deviceId, e);
  316. }
  317. }
  318. @Override
  319. public void sendAlarmAckMessage(String deviceId, Long eventId) {
  320. try {
  321. Map<String, Object> payload = new HashMap<>();
  322. payload.put("dev_id", deviceId);
  323. payload.put("event_id", eventId);
  324. payload.put("timestamp", System.currentTimeMillis());
  325. sendMessage(MqttTopics.APP_FALL_EVENT_ACK, payload);
  326. log.debug("Alarm acknowledgment sent: {}, eventId: {}", deviceId, eventId);
  327. } catch (Exception e) {
  328. log.error("Error sending alarm acknowledgment: {}, eventId: {}", deviceId, eventId, e);
  329. }
  330. }
  331. @Override
  332. public void sendDeviceParamSetCommand(String deviceId, String paramType, String paramName, float value) {
  333. try {
  334. Map<String, Object> payload = new HashMap<>();
  335. payload.put("param_type", paramType);
  336. payload.put("param_name", paramName);
  337. payload.put("value", value);
  338. payload.put("timestamp", System.currentTimeMillis());
  339. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/set_param";
  340. sendMessage(topic, payload);
  341. log.info("Device parameter set command sent: {}, {}={}", deviceId, paramName, value);
  342. } catch (Exception e) {
  343. log.error("Error sending device parameter set command: {}", deviceId, e);
  344. }
  345. }
  346. @Override
  347. public void sendSetDeviceParamCommand(String deviceId, String paramType, String paramName, Float value) {
  348. sendDeviceParamSetCommand(deviceId, paramType, paramName, value != null ? value : 0.0f);
  349. }
  350. @Override
  351. public void sendUpdateNetworkCommand(String deviceId, String ssid, String password) {
  352. try {
  353. Map<String, Object> payload = new HashMap<>();
  354. payload.put("ssid", ssid);
  355. payload.put("password", password);
  356. payload.put("timestamp", System.currentTimeMillis());
  357. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/network";
  358. sendMessage(topic, payload);
  359. log.info("Network update command sent: {}", deviceId);
  360. } catch (Exception e) {
  361. log.error("Error sending network update command: {}", deviceId, e);
  362. }
  363. }
  364. @Override
  365. public void sendDeviceLoginResponse(String deviceId, int code) {
  366. try {
  367. Map<String, Object> payload = new HashMap<>();
  368. payload.put("code", code);
  369. payload.put("expires", 90); // 过期时间,单位秒
  370. String topic = DeviceConstants.MqttConstant.TOPIC_DAS_PREFIX + deviceId + "/login";
  371. sendMessage(topic, payload);
  372. log.debug("Device login response sent: {}, code: {}", deviceId, code);
  373. } catch (Exception e) {
  374. log.error("Error sending device login response: {}", deviceId, e);
  375. }
  376. }
  377. @Override
  378. public void sendEventMessage(String deviceId, List<List<Float>> rawPoints, int pose, List<List<Float>> targets, String event) {
  379. try {
  380. Map<String, Object> payload = new HashMap<>();
  381. payload.put("message", "notify");
  382. payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_FALL.getCode());
  383. payload.put("dev_id", deviceId);
  384. payload.put("event", event);
  385. payload.put("timestamp", System.currentTimeMillis());
  386. payload.put("pose", pose);
  387. payload.put("RawPoints", rawPoints != null ? rawPoints : new ArrayList<>()); // 对应Python版本的RawPoints参数
  388. payload.put("targets", targets != null ? targets : new ArrayList<>()); // 对应Python版本的targets参数
  389. // 对于确认的跌倒事件,使用QoS 2
  390. int qos = "fall_confirmed".equals(event) ? 2 : 0;
  391. sendMessage(MqttTopics.DAS_EVENT, payload, qos);
  392. log.debug("Event message sent: deviceId={}, event={}, pose={}, targetsCount={}",
  393. deviceId, event, pose, targets != null ? targets.size() : 0);
  394. } catch (Exception e) {
  395. log.error("Error sending event message: {}, event: {}", deviceId, event, e);
  396. }
  397. }
  398. @Override
  399. public void sendAlarmEventMessage(String deviceId, String description, String table, int tableId) {
  400. try {
  401. Map<String, Object> payload = new HashMap<>();
  402. payload.put("message", "notify");
  403. payload.put("message_type", DeviceConstants.MessageType.MSG_ALARM_EVENT.getCode());
  404. payload.put("dev_id", deviceId);
  405. payload.put("timestamp", System.currentTimeMillis());
  406. payload.put("desc", description);
  407. payload.put("table", table);
  408. payload.put("table_id", tableId);
  409. sendMessage(MqttTopics.DAS_ALARM_EVENT, payload);
  410. } catch (Exception e) {
  411. log.error("Error sending alarm event message: {}, desc: {}", deviceId, description, e);
  412. }
  413. }
  414. @Override
  415. public void sendExistenceMessage(String deviceId, String event) {
  416. try {
  417. Map<String, Object> payload = new HashMap<>();
  418. payload.put("message", "notify");
  419. payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_EXIST.getCode());
  420. payload.put("dev_id", deviceId);
  421. payload.put("event", event);
  422. payload.put("timestamp", System.currentTimeMillis());
  423. sendMessage(MqttTopics.DAS_EXIST_EVENT, payload);
  424. } catch (Exception e) {
  425. log.error("Error sending existence message: {}, event: {}", deviceId, event, e);
  426. }
  427. }
  428. @Override
  429. public void sendNetworkConfigUpdate(String deviceId, Device.NetworkInfo networkInfo) {
  430. try {
  431. Map<String, Object> payload = new HashMap<>();
  432. payload.put("dev_id", deviceId);
  433. if (networkInfo != null) {
  434. payload.put("ssid", networkInfo.getSsid());
  435. payload.put("password", networkInfo.getPassword());
  436. payload.put("ip", networkInfo.getIp());
  437. }
  438. payload.put("timestamp", System.currentTimeMillis());
  439. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/network_config";
  440. sendMessage(topic, payload);
  441. log.info("Network config update sent: {}", deviceId);
  442. } catch (Exception e) {
  443. log.error("Error sending network config update: {}", deviceId, e);
  444. }
  445. }
  446. @Override
  447. public void sendInstallParamUpdate(String deviceId, Device.InstallParam installParam) {
  448. try {
  449. Map<String, Object> payload = new HashMap<>();
  450. payload.put("dev_id", deviceId);
  451. if (installParam != null) {
  452. payload.put("mount_plain", installParam.getMountPlain());
  453. payload.put("height", installParam.getHeight());
  454. // 跟踪区域
  455. if (installParam.getTrackingRegion() != null) {
  456. Map<String, Object> trackingRegion = getStringObjectMap(installParam.getTrackingRegion());
  457. payload.put("tracking_region", trackingRegion);
  458. }
  459. }
  460. payload.put("timestamp", System.currentTimeMillis());
  461. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/install_param";
  462. sendMessage(topic, payload);
  463. log.info("Install parameter update sent: {}", deviceId);
  464. } catch (Exception e) {
  465. log.error("Error sending install parameter update: {}", deviceId, e);
  466. }
  467. }
  468. @Override
  469. public void sendTrackingRegionUpdate(String deviceId, Device.TrackingRegion trackingRegion) {
  470. try {
  471. Map<String, Object> payload = new HashMap<>();
  472. payload.put("dev_id", deviceId);
  473. if (trackingRegion != null) {
  474. Map<String, Object> regionMap = getStringObjectMap(trackingRegion);
  475. payload.put("tracking_region", regionMap);
  476. }
  477. payload.put("timestamp", System.currentTimeMillis());
  478. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/tracking_region";
  479. sendMessage(topic, payload);
  480. log.info("Tracking region update sent: {}", deviceId);
  481. } catch (Exception e) {
  482. log.error("Error sending tracking region update: {}", deviceId, e);
  483. }
  484. }
  485. @Override
  486. public void sendAlarmScheduleUpdate(String deviceId, Map<String, Object> alarmSchedule) {
  487. try {
  488. Map<String, Object> payload = new HashMap<>();
  489. payload.put("dev_id", deviceId);
  490. payload.put("alarm_schedule", alarmSchedule);
  491. payload.put("timestamp", System.currentTimeMillis());
  492. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/alarm_schedule";
  493. sendMessage(topic, payload);
  494. log.info("Alarm schedule update sent: {}", deviceId);
  495. } catch (Exception e) {
  496. log.error("Error sending alarm schedule update: {}", deviceId, e);
  497. }
  498. }
  499. @Override
  500. public void sendDeviceInfoResponse(String deviceId, Device device) {
  501. try {
  502. Map<String, Object> payload = new HashMap<>();
  503. payload.put("dev_id", deviceId);
  504. payload.put("device", device);
  505. payload.put("timestamp", System.currentTimeMillis());
  506. sendMessage(MqttTopics.APP_DEVICE_INFO_RESPONSE, payload);
  507. log.debug("Device info response sent: {}", deviceId);
  508. } catch (Exception e) {
  509. log.error("Error sending device info response: {}", deviceId, e);
  510. }
  511. }
  512. @Override
  513. public void sendStatusMessage(String deviceId, String status, Map<String, Object> data) {
  514. try {
  515. Map<String, Object> payload = new HashMap<>(data);
  516. payload.put("dev_id", deviceId);
  517. payload.put("status", status);
  518. payload.put("timestamp", System.currentTimeMillis());
  519. sendMessage(MqttTopics.DAS_STATUS, payload);
  520. log.debug("Device status message sent: {}, status: {}", deviceId, status);
  521. } catch (Exception e) {
  522. log.error("Error sending status message: {}, status: {}", deviceId, status, e);
  523. }
  524. }
  525. @Override
  526. public void sendBehaviorMessage(String deviceId, String behaviorType, Map<String, Object> data) {
  527. try {
  528. Map<String, Object> payload = new HashMap<>(data);
  529. payload.put("dev_id", deviceId);
  530. payload.put("behaviorType", behaviorType);
  531. payload.put("timestamp", System.currentTimeMillis());
  532. sendMessage(MqttTopics.DAS_BEHAVIOR_ANALYSIS, payload);
  533. log.debug("Behavior message sent: {}, type: {}", deviceId, behaviorType);
  534. } catch (Exception e) {
  535. log.error("Error sending behavior message: {}, type: {}", deviceId, behaviorType, e);
  536. }
  537. }
  538. @Override
  539. public void sendAlarmParamResponse(int code, Map<String, Object> globalConfig) {
  540. try {
  541. Map<String, Object> payload = new HashMap<>();
  542. payload.put("code", code);
  543. payload.put("global", globalConfig);
  544. payload.put("timestamp", System.currentTimeMillis());
  545. sendMessage(MqttTopics.DAS_REPORT_ALARM_PARAM, payload);
  546. log.debug("Alarm parameter response sent: code={}, config={}", code, globalConfig);
  547. } catch (Exception e) {
  548. log.error("Failed to send alarm parameter response", e);
  549. }
  550. }
  551. @Override
  552. public void sendSetAlarmParamAck(int code, Map<String, Object> globalConfig) {
  553. try {
  554. Map<String, Object> payload = new HashMap<>();
  555. payload.put("code", code);
  556. payload.put("global", globalConfig);
  557. payload.put("timestamp", System.currentTimeMillis());
  558. sendMessage(MqttTopics.DAS_SET_ALARM_PARAM_ACK, payload);
  559. log.debug("Set alarm parameter acknowledgment sent: code={}, config={}", code, globalConfig);
  560. } catch (Exception e) {
  561. log.error("Failed to send set alarm parameter acknowledgment", e);
  562. }
  563. }
  564. @Override
  565. public void sendResponse(String topic, int code, Map<String, Object> data) {
  566. try {
  567. Map<String, Object> payload = new HashMap<>(data);
  568. payload.put("code", code);
  569. payload.put("timestamp", System.currentTimeMillis());
  570. sendMessage(topic, payload);
  571. log.debug("Response sent to topic: {}, code: {}", topic, code);
  572. } catch (Exception e) {
  573. log.error("Error sending response to topic: {}", topic, e);
  574. }
  575. }
  576. @Override
  577. public void sendCommand(String topic, String command, Map<String, Object> params) {
  578. try {
  579. Map<String, Object> payload = new HashMap<>();
  580. payload.put("command", command);
  581. payload.put("params", params);
  582. payload.put("timestamp", System.currentTimeMillis());
  583. sendMessage(topic, payload);
  584. log.debug("Command sent to topic: {}, command: {}", topic, command);
  585. } catch (Exception e) {
  586. log.error("Error sending command to topic: {}", topic, e);
  587. }
  588. }
  589. @Override
  590. public void sendGenericMessage(String topic, String messageType, Map<String, Object> messageData) {
  591. try {
  592. Map<String, Object> payload = new HashMap<>(messageData);
  593. payload.put("message_type", messageType);
  594. payload.put("timestamp", System.currentTimeMillis());
  595. sendMessage(topic, payload);
  596. log.debug("Generic message sent to topic: {}, type: {}", topic, messageType);
  597. } catch (Exception e) {
  598. log.error("Error sending generic message to topic: {}", topic, e);
  599. }
  600. }
  601. @Override
  602. public void sendToMqtt(String topic, String payload) {
  603. sendToMqtt(topic, payload, 0, false);
  604. }
  605. /**
  606. * 发送消息到MQTT服务器 - 增强版
  607. * 支持动态QoS和消息保留设置
  608. * @param topic 主题
  609. * @param payload 负载
  610. * @param qos 质量等级
  611. * @param retain 是否保留
  612. */
  613. private void sendToMqtt(String topic, String payload, int qos, boolean retain) {
  614. try {
  615. Message<String> message = MessageBuilder
  616. .withPayload(payload)
  617. .setHeader(MqttHeaders.TOPIC, topic)
  618. .setHeader(MqttHeaders.QOS, qos)
  619. .setHeader(MqttHeaders.RETAINED, retain)
  620. .build();
  621. mqttOutbound.handleMessage(message);
  622. log.trace("MQTT message sent to topic: {}", topic);
  623. } catch (Exception e) {
  624. log.error("Failed to send MQTT message to topic: {}, error: {}", topic, e.getMessage());
  625. }
  626. }
  627. /**
  628. * 发送消息 - 私有方法
  629. * 默认QoS 1,适合大多数业务场景
  630. */
  631. private void sendMessage(String topic, Object payload) {
  632. sendMessage(topic, payload, 1);
  633. }
  634. /**
  635. * 发送消息 - 支持动态QoS
  636. */
  637. private void sendMessage(String topic, Object payload, int qos) {
  638. try {
  639. String json = objectMapper.writeValueAsString(payload);
  640. sendToMqtt(topic, json, qos, false);
  641. log.debug("MQTT message sent to topic: {}", topic);
  642. } catch (Exception e) {
  643. log.error("Error sending MQTT message to topic: {}", topic, e);
  644. }
  645. }
  646. @NotNull
  647. private static Map<String, Object> getStringObjectMap(Device.TrackingRegion trackingRegion) {
  648. Map<String, Object> trackingRegionMap = new HashMap<>();
  649. if (trackingRegion != null) {
  650. trackingRegionMap.put("start_x", trackingRegion.getStartX());
  651. trackingRegionMap.put("start_y", trackingRegion.getStartY());
  652. trackingRegionMap.put("start_z", trackingRegion.getStartZ());
  653. trackingRegionMap.put("stop_x", trackingRegion.getStopX());
  654. trackingRegionMap.put("stop_y", trackingRegion.getStopY());
  655. trackingRegionMap.put("stop_z", trackingRegion.getStopZ());
  656. }
  657. return trackingRegionMap;
  658. }
  659. @Override
  660. public void sendRealtimePositionMessage(String deviceId, List<List<Float>> rawPoints, List<Integer> pose, List<List<Float>> targets) {
  661. try {
  662. Map<String, Object> message = new HashMap<>();
  663. message.put("dev_id", deviceId);
  664. message.put("timestamp", System.currentTimeMillis());
  665. // 原始点云数据 (对应Python版本的raw_points)
  666. if (rawPoints != null) {
  667. message.put("raw_points", rawPoints);
  668. } else {
  669. message.put("raw_points", new ArrayList<>());
  670. }
  671. // 姿态信息 (对应Python版本的pose)
  672. if (pose != null && !pose.isEmpty()) {
  673. message.put("pose", pose.get(0)); // 取第一个姿态值
  674. } else {
  675. message.put("pose", 0); // 默认姿态
  676. }
  677. // 目标位置 (对应Python版本的targets)
  678. if (targets != null) {
  679. message.put("targets", targets);
  680. } else {
  681. message.put("targets", new ArrayList<>());
  682. }
  683. // 发送到实时位置主题 (对应Python版本的MQTT主题)
  684. String topic = "/mps/realtime_pos";
  685. publishJson(topic, message);
  686. log.trace("Realtime position message sent: deviceId={}, targetCount={}",
  687. deviceId, targets != null ? targets.size() : 0);
  688. } catch (Exception e) {
  689. log.error("Failed to send realtime position message: deviceId={}, error={}",
  690. deviceId, e.getMessage(), e);
  691. }
  692. }
  693. @Override
  694. public void sendExistEventMessage(String deviceId, String event) {
  695. try {
  696. Map<String, Object> payload = new HashMap<>();
  697. payload.put("message", "notify");
  698. payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_EXIST.getCode());
  699. payload.put("dev_id", deviceId);
  700. payload.put("event", event);
  701. payload.put("timestamp", System.currentTimeMillis());
  702. sendMessage(MqttTopics.DAS_EXIST_EVENT, payload);
  703. log.debug("Exist event message sent: deviceId={}, event={}", deviceId, event);
  704. } catch (Exception e) {
  705. log.error("Error sending exist event message: {}, event: {}", deviceId, event, e);
  706. }
  707. }
  708. @Override
  709. public void sendDebugParamResponse(String deviceId, Map<String, Object> debugParams) {
  710. try {
  711. Map<String, Object> payload = new HashMap<>();
  712. payload.put("message", "response");
  713. payload.put("dev_id", deviceId);
  714. payload.put("debug_params", debugParams != null ? debugParams : new HashMap<>());
  715. payload.put("timestamp", System.currentTimeMillis());
  716. String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/debug_params";
  717. sendMessage(topic, payload);
  718. log.debug("Debug param response sent: deviceId={}, paramsCount={}",
  719. deviceId, debugParams != null ? debugParams.size() : 0);
  720. } catch (Exception e) {
  721. log.error("Error sending debug param response: {}", deviceId, e);
  722. }
  723. }
  724. @Override
  725. public void reportAlarmParam(int code, Map<String, Object> alarmConfig) {
  726. try {
  727. Map<String, Object> payload = new HashMap<>();
  728. payload.put("code", code);
  729. payload.put("global", alarmConfig != null ? alarmConfig : new HashMap<>());
  730. payload.put("timestamp", System.currentTimeMillis());
  731. sendMessage(MqttTopics.DAS_REPORT_ALARM_PARAM, payload);
  732. log.debug("Alarm param report sent: code={}, configSize={}",
  733. code, alarmConfig != null ? alarmConfig.size() : 0);
  734. } catch (Exception e) {
  735. log.error("Error sending alarm param report: code={}", code, e);
  736. }
  737. }
  738. }