123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843 |
- package com.hfln.device.infrastructure.gateway;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
- import com.hfln.device.domain.constant.DeviceConstants;
- import com.hfln.device.domain.entity.Device;
- import com.hfln.device.domain.gateway.MqttGateway;
- import lombok.extern.slf4j.Slf4j;
- import org.jetbrains.annotations.NotNull;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Primary;
- import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
- import org.springframework.integration.mqtt.support.MqttHeaders;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * MQTT网关实现类 - 基于Spring Integration MQTT
- * 合并了原MqttGatewayDefaultImpl的优秀特性
- */
- @Component
- @Primary
- @Slf4j
- public class MqttGatewayImpl implements MqttGateway {
- @Autowired
- private MqttPahoMessageHandler mqttOutbound;
-
- @Autowired
- private ObjectMapper objectMapper;
-
- @Override
- public void initialize() {
- log.info("MQTT Gateway initialized with Spring Integration");
- }
-
- @Override
- public void publish(String topic, Object payload) {
- publish(topic, payload, 0, false);
- }
-
- @Override
- public void publish(String topic, Object payload, int qos, boolean retain) {
- try {
- String jsonPayload = objectMapper.writeValueAsString(payload);
- sendToMqtt(topic, jsonPayload, qos, retain);
- } catch (JsonProcessingException e) {
- log.error("Failed to serialize payload for topic: {}, error: {}", topic, e.getMessage());
- }
- }
-
- @Override
- public void sendMessage(String topic, String message) {
- sendMessage(topic, message, 0, false);
- }
-
- @Override
- public void sendMessage(String topic, String message, int qos, boolean retain) {
- sendToMqtt(topic, message, qos, retain);
- }
-
- @Override
- public void publishJson(String topic, Object payload) {
- publishJson(topic, payload, 0, false);
- }
-
- @Override
- public void publishJson(String topic, Object payload, int qos, boolean retain) {
- try {
- String jsonPayload = objectMapper.writeValueAsString(payload);
- sendToMqtt(topic, jsonPayload, qos, retain);
- } catch (JsonProcessingException e) {
- log.error("Failed to serialize JSON payload for topic: {}, error: {}", topic, e.getMessage());
- }
- }
-
- @Override
- public void sendSync(String topic, Object payload) throws Exception {
- String jsonPayload = objectMapper.writeValueAsString(payload);
- sendToMqtt(topic, jsonPayload, 0, false);
- }
-
- @Override
- public void subscribe(String topic, int qos) {
- log.info("Subscription managed by Spring Integration configuration for topic: {}", topic);
- }
-
- @Override
- public void unsubscribe(String topic) {
- log.info("Unsubscription managed by Spring Integration configuration for topic: {}", topic);
- }
-
- @Override
- public boolean isConnected() {
- return true; // Spring Integration handles connection management
- }
-
- @Override
- public void disconnect() {
- log.info("Disconnect managed by Spring Integration");
- }
-
- @Override
- public void sendDeviceStatusMessage(Device device) {
- try {
- Map<String, Object> message = new HashMap<>();
- message.put("message", "notify");
- message.put("timestamp", System.currentTimeMillis());
- message.put("dev_id", device.getDevId());
- message.put("online", device.getOnline());
- message.put("dev_type", device.getDevType());
- message.put("software", device.getSoftware());
- message.put("hardware", device.getHardware());
-
- // 网络信息
- if (device.getNetwork() != null) {
- Map<String, Object> network = new HashMap<>();
- network.put("ssid", device.getNetwork().getSsid());
- network.put("password", device.getNetwork().getPassword());
- network.put("ip", device.getNetwork().getIp());
- message.put("network", network);
- }
-
- // 雷达参数
- if (device.getInstallParam() != null) {
- Map<String, Object> radarParam = new HashMap<>();
- radarParam.put("mount_plain", device.getInstallParam().getMountPlain());
- radarParam.put("height", device.getInstallParam().getHeight());
-
- if (device.getInstallParam().getTrackingRegion() != null) {
- Map<String, Object> trackingRegion = new HashMap<>();
- trackingRegion.put("start_x", device.getInstallParam().getTrackingRegion().getStartX());
- trackingRegion.put("start_y", device.getInstallParam().getTrackingRegion().getStartY());
- trackingRegion.put("start_z", device.getInstallParam().getTrackingRegion().getStartZ());
- trackingRegion.put("stop_x", device.getInstallParam().getTrackingRegion().getStopX());
- trackingRegion.put("stop_y", device.getInstallParam().getTrackingRegion().getStopY());
- trackingRegion.put("stop_z", device.getInstallParam().getTrackingRegion().getStopZ());
- radarParam.put("tracking_region", trackingRegion);
- }
- message.put("radar_param", radarParam);
- }
-
- String topic = "/das/status";
- publishJson(topic, message, 1, false);
- log.debug("发送设备状态消息: deviceId={}, topic={}", device.getDevId(), topic);
- } catch (Exception e) {
- log.error("发送设备状态消息失败: deviceId={}, error={}", device.getDevId(), e.getMessage(), e);
- }
- }
-
- /**
- * 发送设备信息更新通知
- * 对应Python版本的mqtt_send.update_dev_info_msg(device)
- */
- @Override
- public void sendDeviceInfoUpdateNotification(Device device) {
- try {
- Map<String, Object> message = new HashMap<>();
- message.put("dev_id", device.getDevId());
- message.put("dev_type", device.getDevType());
- message.put("software", device.getSoftware());
- message.put("hardware", device.getHardware());
- message.put("online", device.getOnline());
-
- if (device.getNetwork() != null) {
- Map<String, Object> network = new HashMap<>();
- network.put("ssid", device.getNetwork().getSsid());
- network.put("ip", device.getNetwork().getIp());
- message.put("network", network);
- }
-
- if (device.getInstallParam() != null) {
- Map<String, Object> installParam = new HashMap<>();
- installParam.put("mount_plain", device.getInstallParam().getMountPlain());
- installParam.put("height", device.getInstallParam().getHeight());
- installParam.put("is_ceiling", device.getInstallParam().getIsCeiling());
-
- if (device.getInstallParam().getTrackingRegion() != null) {
- Map<String, Object> trackingRegion = new HashMap<>();
- trackingRegion.put("start_x", device.getInstallParam().getTrackingRegion().getStartX());
- trackingRegion.put("start_y", device.getInstallParam().getTrackingRegion().getStartY());
- trackingRegion.put("start_z", device.getInstallParam().getTrackingRegion().getStartZ());
- trackingRegion.put("stop_x", device.getInstallParam().getTrackingRegion().getStopX());
- trackingRegion.put("stop_y", device.getInstallParam().getTrackingRegion().getStopY());
- trackingRegion.put("stop_z", device.getInstallParam().getTrackingRegion().getStopZ());
- installParam.put("tracking_region", trackingRegion);
- }
-
- message.put("install_param", installParam);
- }
-
- String topic = "/mps/update_dev_info";
- publishJson(topic, message);
-
- log.info("Device info update notification sent: deviceId={}", device.getDevId());
- } catch (Exception e) {
- log.error("Failed to send device info update notification: deviceId={}, error={}",
- device.getDevId(), e.getMessage(), e);
- }
- }
-
- @Override
- public void sendRealtimePoseMessage(String deviceId, int pose, Object targetPoint) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_REALTIME_TARGET.getCode());
- payload.put("timestamp", System.currentTimeMillis());
- payload.put("dev_id", deviceId);
- payload.put("pose", pose);
- payload.put("target_point", targetPoint);
-
- sendMessage(MqttTopics.DAS_REALTIME_POS, payload);
- } catch (Exception e) {
- log.error("Error sending realtime pose message: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendAlarmMessage(String deviceId, String alarmType, Map<String, Object> data) {
- try {
- Map<String, Object> payload = new HashMap<>(data);
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_ALARM_EVENT.getCode());
- payload.put("dev_id", deviceId);
- payload.put("timestamp", System.currentTimeMillis());
- payload.put("alarmType", alarmType);
-
- sendMessage(MqttTopics.DAS_ALARM_EVENT, payload);
- log.info("Alarm message sent: {}, type: {}", deviceId, alarmType);
- } catch (Exception e) {
- log.error("Error sending alarm message: {}, type: {}", deviceId, alarmType, e);
- }
- }
-
- @Override
- public void sendBehaviorAnalysisResult(String deviceId, Object behaviorPattern) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("dev_id", deviceId);
- payload.put("behaviorPattern", behaviorPattern);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_BEHAVIOR_ANALYSIS, payload);
- log.debug("Behavior analysis result sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending behavior analysis result: {}", deviceId, e);
- }
- }
-
- @Override
- public boolean sendCommandToDevice(String deviceId, String command, Object payload) {
- try {
- Map<String, Object> message = new HashMap<>();
- message.put("command", command);
- message.put("payload", payload);
- message.put("timestamp", System.currentTimeMillis());
-
- String topic = String.format(MqttTopics.DEV_COMMAND, deviceId);
- sendMessage(topic, message);
- log.info("Command sent to device: {}, command: {}", deviceId, command);
- return true;
- } catch (Exception e) {
- log.error("Failed to send command to device: {}, command: {}, error: {}", deviceId, command, e.getMessage());
- return false;
- }
- }
-
- @Override
- public void sendFallAlarmMessage(String deviceId, int pose, List<Float> targetPoint) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_FALL.getCode());
- payload.put("dev_id", deviceId);
- payload.put("pose", pose);
- payload.put("target_point", targetPoint);
- payload.put("alarmType", "fall");
- payload.put("timestamp", System.currentTimeMillis());
-
- // 跌倒告警使用QoS 2确保可靠传输
- sendMessage(MqttTopics.DAS_ALARM_EVENT, payload, 2);
- log.info("Fall alarm message sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending fall alarm message: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendDeviceRebootCommand(String deviceId) {
- try {
- Map<String, Object> payload = new HashMap<>();
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/reboot";
- sendMessage(topic, payload);
- log.info("Device reboot command sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending device reboot command: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendDeviceResetCommand(String deviceId) {
- try {
- Map<String, Object> payload = new HashMap<>();
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/reset";
- sendMessage(topic, payload);
- log.info("Device reset command sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending device reset command: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendDeviceCommand(String deviceId, String command, Map<String, Object> params) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("command", command);
- payload.put("params", params);
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/" + command;
- sendMessage(topic, payload);
- log.info("Device command sent: {}, command: {}", deviceId, command);
- } catch (Exception e) {
- log.error("Error sending device command: {}, command: {}", deviceId, command, e);
- }
- }
-
- @Override
- public void sendDeviceKeepAliveResponse(String deviceId, int status) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("code", status);
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DAS_PREFIX + deviceId + "/keepalive";
- sendMessage(topic, payload);
- log.debug("Device keepalive response sent: {}, code: {}", deviceId, status);
- } catch (Exception e) {
- log.error("Error sending device keepalive response: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendDeviceNotFoundResponse(String deviceId) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("code", 404);
- payload.put("message", "Device not found");
-
- String topic = MqttTopics.APP_DEVICE_INFO_RESPONSE;
- sendMessage(topic, payload);
- log.debug("Device not found response sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending device not found response: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendAlarmAckMessage(String deviceId, Long eventId) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("dev_id", deviceId);
- payload.put("event_id", eventId);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.APP_FALL_EVENT_ACK, payload);
- log.debug("Alarm acknowledgment sent: {}, eventId: {}", deviceId, eventId);
- } catch (Exception e) {
- log.error("Error sending alarm acknowledgment: {}, eventId: {}", deviceId, eventId, e);
- }
- }
-
- @Override
- public void sendDeviceParamSetCommand(String deviceId, String paramType, String paramName, float value) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("param_type", paramType);
- payload.put("param_name", paramName);
- payload.put("value", value);
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/set_param";
- sendMessage(topic, payload);
- log.info("Device parameter set command sent: {}, {}={}", deviceId, paramName, value);
- } catch (Exception e) {
- log.error("Error sending device parameter set command: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendSetDeviceParamCommand(String deviceId, String paramType, String paramName, Float value) {
- sendDeviceParamSetCommand(deviceId, paramType, paramName, value != null ? value : 0.0f);
- }
-
- @Override
- public void sendUpdateNetworkCommand(String deviceId, String ssid, String password) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("ssid", ssid);
- payload.put("password", password);
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/network";
- sendMessage(topic, payload);
- log.info("Network update command sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending network update command: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendDeviceLoginResponse(String deviceId, int code) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("code", code);
- payload.put("expires", 90); // 过期时间,单位秒
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DAS_PREFIX + deviceId + "/login";
- sendMessage(topic, payload);
- log.debug("Device login response sent: {}, code: {}", deviceId, code);
- } catch (Exception e) {
- log.error("Error sending device login response: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendEventMessage(String deviceId, List<List<Float>> rawPoints, int pose, List<List<Float>> targets, String event) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_FALL.getCode());
- payload.put("dev_id", deviceId);
- payload.put("event", event);
- payload.put("timestamp", System.currentTimeMillis());
- payload.put("pose", pose);
- payload.put("RawPoints", rawPoints != null ? rawPoints : new ArrayList<>()); // 对应Python版本的RawPoints参数
- payload.put("targets", targets != null ? targets : new ArrayList<>()); // 对应Python版本的targets参数
-
- // 对于确认的跌倒事件,使用QoS 2
- int qos = "fall_confirmed".equals(event) ? 2 : 0;
- sendMessage(MqttTopics.DAS_EVENT, payload, qos);
- log.debug("Event message sent: deviceId={}, event={}, pose={}, targetsCount={}",
- deviceId, event, pose, targets != null ? targets.size() : 0);
- } catch (Exception e) {
- log.error("Error sending event message: {}, event: {}", deviceId, event, e);
- }
- }
-
- @Override
- public void sendAlarmEventMessage(String deviceId, String description, String table, int tableId) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_ALARM_EVENT.getCode());
- payload.put("dev_id", deviceId);
- payload.put("timestamp", System.currentTimeMillis());
- payload.put("desc", description);
- payload.put("table", table);
- payload.put("table_id", tableId);
-
- sendMessage(MqttTopics.DAS_ALARM_EVENT, payload);
- } catch (Exception e) {
- log.error("Error sending alarm event message: {}, desc: {}", deviceId, description, e);
- }
- }
-
- @Override
- public void sendExistenceMessage(String deviceId, String event) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_EXIST.getCode());
- payload.put("dev_id", deviceId);
- payload.put("event", event);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_EXIST_EVENT, payload);
- } catch (Exception e) {
- log.error("Error sending existence message: {}, event: {}", deviceId, event, e);
- }
- }
-
- @Override
- public void sendNetworkConfigUpdate(String deviceId, Device.NetworkInfo networkInfo) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("dev_id", deviceId);
- if (networkInfo != null) {
- payload.put("ssid", networkInfo.getSsid());
- payload.put("password", networkInfo.getPassword());
- payload.put("ip", networkInfo.getIp());
- }
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/network_config";
- sendMessage(topic, payload);
- log.info("Network config update sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending network config update: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendInstallParamUpdate(String deviceId, Device.InstallParam installParam) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("dev_id", deviceId);
- if (installParam != null) {
- payload.put("mount_plain", installParam.getMountPlain());
- payload.put("height", installParam.getHeight());
-
- // 跟踪区域
- if (installParam.getTrackingRegion() != null) {
- Map<String, Object> trackingRegion = getStringObjectMap(installParam.getTrackingRegion());
- payload.put("tracking_region", trackingRegion);
- }
- }
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/install_param";
- sendMessage(topic, payload);
- log.info("Install parameter update sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending install parameter update: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendTrackingRegionUpdate(String deviceId, Device.TrackingRegion trackingRegion) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("dev_id", deviceId);
- if (trackingRegion != null) {
- Map<String, Object> regionMap = getStringObjectMap(trackingRegion);
- payload.put("tracking_region", regionMap);
- }
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/tracking_region";
- sendMessage(topic, payload);
- log.info("Tracking region update sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending tracking region update: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendAlarmScheduleUpdate(String deviceId, Map<String, Object> alarmSchedule) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("dev_id", deviceId);
- payload.put("alarm_schedule", alarmSchedule);
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/alarm_schedule";
- sendMessage(topic, payload);
- log.info("Alarm schedule update sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending alarm schedule update: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendDeviceInfoResponse(String deviceId, Device device) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("dev_id", deviceId);
- payload.put("device", device);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.APP_DEVICE_INFO_RESPONSE, payload);
- log.debug("Device info response sent: {}", deviceId);
- } catch (Exception e) {
- log.error("Error sending device info response: {}", deviceId, e);
- }
- }
-
- @Override
- public void sendStatusMessage(String deviceId, String status, Map<String, Object> data) {
- try {
- Map<String, Object> payload = new HashMap<>(data);
- payload.put("dev_id", deviceId);
- payload.put("status", status);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_STATUS, payload);
- log.debug("Device status message sent: {}, status: {}", deviceId, status);
- } catch (Exception e) {
- log.error("Error sending status message: {}, status: {}", deviceId, status, e);
- }
- }
-
- @Override
- public void sendBehaviorMessage(String deviceId, String behaviorType, Map<String, Object> data) {
- try {
- Map<String, Object> payload = new HashMap<>(data);
- payload.put("dev_id", deviceId);
- payload.put("behaviorType", behaviorType);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_BEHAVIOR_ANALYSIS, payload);
- log.debug("Behavior message sent: {}, type: {}", deviceId, behaviorType);
- } catch (Exception e) {
- log.error("Error sending behavior message: {}, type: {}", deviceId, behaviorType, e);
- }
- }
-
- @Override
- public void sendAlarmParamResponse(int code, Map<String, Object> globalConfig) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("code", code);
- payload.put("global", globalConfig);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_REPORT_ALARM_PARAM, payload);
- log.debug("Alarm parameter response sent: code={}, config={}", code, globalConfig);
- } catch (Exception e) {
- log.error("Failed to send alarm parameter response", e);
- }
- }
-
- @Override
- public void sendSetAlarmParamAck(int code, Map<String, Object> globalConfig) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("code", code);
- payload.put("global", globalConfig);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_SET_ALARM_PARAM_ACK, payload);
- log.debug("Set alarm parameter acknowledgment sent: code={}, config={}", code, globalConfig);
- } catch (Exception e) {
- log.error("Failed to send set alarm parameter acknowledgment", e);
- }
- }
-
- @Override
- public void sendResponse(String topic, int code, Map<String, Object> data) {
- try {
- Map<String, Object> payload = new HashMap<>(data);
- payload.put("code", code);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(topic, payload);
- log.debug("Response sent to topic: {}, code: {}", topic, code);
- } catch (Exception e) {
- log.error("Error sending response to topic: {}", topic, e);
- }
- }
-
- @Override
- public void sendCommand(String topic, String command, Map<String, Object> params) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("command", command);
- payload.put("params", params);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(topic, payload);
- log.debug("Command sent to topic: {}, command: {}", topic, command);
- } catch (Exception e) {
- log.error("Error sending command to topic: {}", topic, e);
- }
- }
-
- @Override
- public void sendGenericMessage(String topic, String messageType, Map<String, Object> messageData) {
- try {
- Map<String, Object> payload = new HashMap<>(messageData);
- payload.put("message_type", messageType);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(topic, payload);
- log.debug("Generic message sent to topic: {}, type: {}", topic, messageType);
- } catch (Exception e) {
- log.error("Error sending generic message to topic: {}", topic, e);
- }
- }
-
- @Override
- public void sendToMqtt(String topic, String payload) {
- sendToMqtt(topic, payload, 0, false);
- }
-
- /**
- * 发送消息到MQTT服务器 - 增强版
- * 支持动态QoS和消息保留设置
- * @param topic 主题
- * @param payload 负载
- * @param qos 质量等级
- * @param retain 是否保留
- */
- private void sendToMqtt(String topic, String payload, int qos, boolean retain) {
- try {
- Message<String> message = MessageBuilder
- .withPayload(payload)
- .setHeader(MqttHeaders.TOPIC, topic)
- .setHeader(MqttHeaders.QOS, qos)
- .setHeader(MqttHeaders.RETAINED, retain)
- .build();
-
- mqttOutbound.handleMessage(message);
- log.trace("MQTT message sent to topic: {}", topic);
- } catch (Exception e) {
- log.error("Failed to send MQTT message to topic: {}, error: {}", topic, e.getMessage());
- }
- }
-
- /**
- * 发送消息 - 私有方法
- * 默认QoS 1,适合大多数业务场景
- */
- private void sendMessage(String topic, Object payload) {
- sendMessage(topic, payload, 1);
- }
-
- /**
- * 发送消息 - 支持动态QoS
- */
- private void sendMessage(String topic, Object payload, int qos) {
- try {
- String json = objectMapper.writeValueAsString(payload);
- sendToMqtt(topic, json, qos, false);
- log.debug("MQTT message sent to topic: {}", topic);
- } catch (Exception e) {
- log.error("Error sending MQTT message to topic: {}", topic, e);
- }
- }
-
- @NotNull
- private static Map<String, Object> getStringObjectMap(Device.TrackingRegion trackingRegion) {
- Map<String, Object> trackingRegionMap = new HashMap<>();
- if (trackingRegion != null) {
- trackingRegionMap.put("start_x", trackingRegion.getStartX());
- trackingRegionMap.put("start_y", trackingRegion.getStartY());
- trackingRegionMap.put("start_z", trackingRegion.getStartZ());
- trackingRegionMap.put("stop_x", trackingRegion.getStopX());
- trackingRegionMap.put("stop_y", trackingRegion.getStopY());
- trackingRegionMap.put("stop_z", trackingRegion.getStopZ());
- }
- return trackingRegionMap;
- }
- @Override
- public void sendRealtimePositionMessage(String deviceId, List<List<Float>> rawPoints, List<Integer> pose, List<List<Float>> targets) {
- try {
- Map<String, Object> message = new HashMap<>();
- message.put("dev_id", deviceId);
- message.put("timestamp", System.currentTimeMillis());
-
- // 原始点云数据 (对应Python版本的raw_points)
- if (rawPoints != null) {
- message.put("raw_points", rawPoints);
- } else {
- message.put("raw_points", new ArrayList<>());
- }
-
- // 姿态信息 (对应Python版本的pose)
- if (pose != null && !pose.isEmpty()) {
- message.put("pose", pose.get(0)); // 取第一个姿态值
- } else {
- message.put("pose", 0); // 默认姿态
- }
-
- // 目标位置 (对应Python版本的targets)
- if (targets != null) {
- message.put("targets", targets);
- } else {
- message.put("targets", new ArrayList<>());
- }
-
- // 发送到实时位置主题 (对应Python版本的MQTT主题)
- String topic = "/mps/realtime_pos";
- publishJson(topic, message);
-
- log.trace("Realtime position message sent: deviceId={}, targetCount={}",
- deviceId, targets != null ? targets.size() : 0);
- } catch (Exception e) {
- log.error("Failed to send realtime position message: deviceId={}, error={}",
- deviceId, e.getMessage(), e);
- }
- }
- @Override
- public void sendExistEventMessage(String deviceId, String event) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "notify");
- payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_EXIST.getCode());
- payload.put("dev_id", deviceId);
- payload.put("event", event);
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_EXIST_EVENT, payload);
- log.debug("Exist event message sent: deviceId={}, event={}", deviceId, event);
- } catch (Exception e) {
- log.error("Error sending exist event message: {}, event: {}", deviceId, event, e);
- }
- }
-
- @Override
- public void sendDebugParamResponse(String deviceId, Map<String, Object> debugParams) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("message", "response");
- payload.put("dev_id", deviceId);
- payload.put("debug_params", debugParams != null ? debugParams : new HashMap<>());
- payload.put("timestamp", System.currentTimeMillis());
-
- String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/debug_params";
- sendMessage(topic, payload);
- log.debug("Debug param response sent: deviceId={}, paramsCount={}",
- deviceId, debugParams != null ? debugParams.size() : 0);
- } catch (Exception e) {
- log.error("Error sending debug param response: {}", deviceId, e);
- }
- }
-
- @Override
- public void reportAlarmParam(int code, Map<String, Object> alarmConfig) {
- try {
- Map<String, Object> payload = new HashMap<>();
- payload.put("code", code);
- payload.put("global", alarmConfig != null ? alarmConfig : new HashMap<>());
- payload.put("timestamp", System.currentTimeMillis());
-
- sendMessage(MqttTopics.DAS_REPORT_ALARM_PARAM, payload);
- log.debug("Alarm param report sent: code={}, configSize={}",
- code, alarmConfig != null ? alarmConfig.size() : 0);
- } catch (Exception e) {
- log.error("Error sending alarm param report: code={}", code, e);
- }
- }
- }
|