|
@@ -2,11 +2,14 @@ package com.hfln.device.infrastructure.gateway;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.hfln.device.common.constant.mqtt.topic.MqttTopics;
|
|
|
+import com.hfln.device.domain.constant.DeviceConstants;
|
|
|
import com.hfln.device.domain.entity.Device;
|
|
|
import com.hfln.device.domain.gateway.MqttGateway;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.jetbrains.annotations.NotNull;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.context.annotation.Primary;
|
|
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
|
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
import org.springframework.messaging.Message;
|
|
@@ -19,17 +22,16 @@ import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* MQTT网关实现类 - 基于Spring Integration MQTT
|
|
|
+ * 合并了原MqttGatewayDefaultImpl的优秀特性
|
|
|
*/
|
|
|
@Component
|
|
|
+@Primary
|
|
|
@Slf4j
|
|
|
public class MqttGatewayImpl implements MqttGateway {
|
|
|
|
|
|
@Autowired
|
|
|
private MqttPahoMessageHandler mqttOutbound;
|
|
|
|
|
|
- @Value("${mqtt.topic.prefix:hfln/device/}")
|
|
|
- private String topicPrefix;
|
|
|
-
|
|
|
@Autowired
|
|
|
private ObjectMapper objectMapper;
|
|
|
|
|
@@ -106,342 +108,460 @@ public class MqttGatewayImpl implements MqttGateway {
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceStatusMessage(Device device) {
|
|
|
- String deviceId = device.getDevId();
|
|
|
- String topic = topicPrefix + deviceId + "/status";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("online", device.getOnline());
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Device status message sent: {}", topic);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+ payload.put("dev_id", device.getDevId());
|
|
|
+ payload.put("online", device.getOnline());
|
|
|
+ payload.put("dev_type", device.getDevType());
|
|
|
+ payload.put("software", device.getSoftware());
|
|
|
+ payload.put("hardware", device.getHardware());
|
|
|
+
|
|
|
+ // 网络信息
|
|
|
+ Map<String, Object> network = new HashMap<>();
|
|
|
+ if (device.getNetwork() != null) {
|
|
|
+ network.put("ssid", device.getNetwork().getSsid());
|
|
|
+ network.put("password", device.getNetwork().getPassword());
|
|
|
+ network.put("ip", device.getNetwork().getIp());
|
|
|
+ }
|
|
|
+ payload.put("network", network);
|
|
|
+
|
|
|
+ // 雷达参数
|
|
|
+ Map<String, Object> radarParam = new HashMap<>();
|
|
|
+ if (device.getInstallParam() != null) {
|
|
|
+ radarParam.put("mount_plain", device.getInstallParam().getMountPlain());
|
|
|
+ radarParam.put("height", device.getInstallParam().getHeight());
|
|
|
+
|
|
|
+ // 跟踪区域
|
|
|
+ Map<String, Object> trackingRegion = getStringObjectMap(device.getInstallParam().getTrackingRegion());
|
|
|
+ radarParam.put("tracking_region", trackingRegion);
|
|
|
+ }
|
|
|
+ payload.put("radar_param", radarParam);
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_STATUS, payload);
|
|
|
+ log.debug("Device status message sent: {}", device.getDevId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device status message: {}", device.getDevId(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @NotNull
|
|
|
+ private static Map<String, Object> getStringObjectMap(Device.TrackingRegion trackingRegion) {
|
|
|
+ Map<String, Object> trackingRegionMap = new HashMap<>();
|
|
|
+ if (trackingRegion != null) {
|
|
|
+ trackingRegionMap.put("start_x", trackingRegion.getStartX());
|
|
|
+ trackingRegionMap.put("start_y", trackingRegion.getStartY());
|
|
|
+ trackingRegionMap.put("start_z", trackingRegion.getStartZ());
|
|
|
+ trackingRegionMap.put("stop_x", trackingRegion.getStopX());
|
|
|
+ trackingRegionMap.put("stop_y", trackingRegion.getStopY());
|
|
|
+ trackingRegionMap.put("stop_z", trackingRegion.getStopZ());
|
|
|
+ }
|
|
|
+ return trackingRegionMap;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendRealtimePoseMessage(String deviceId, int pose, Object targetPoint) {
|
|
|
- String topic = topicPrefix + deviceId + "/pose";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("pose", pose);
|
|
|
- payload.put("targetPoint", targetPoint);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Real-time pose message sent: {}", topic);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("message_type", DeviceConstants.MessageType.MSG_REALTIME_TARGET.getCode());
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("pose", pose);
|
|
|
+ payload.put("target_point", targetPoint);
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_REALTIME_POS, payload);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending realtime pose message: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendAlarmMessage(String deviceId, String alarmType, Map<String, Object> data) {
|
|
|
- String topic = topicPrefix + deviceId + "/alarm";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>(data);
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("alarmType", alarmType);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Alarm message sent: {}, type: {}", topic, alarmType);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>(data);
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("message_type", DeviceConstants.MessageType.MSG_ALARM_EVENT.getCode());
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+ payload.put("alarmType", alarmType);
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_ALARM_EVENT, payload);
|
|
|
+ log.info("Alarm message sent: {}, type: {}", deviceId, alarmType);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending alarm message: {}, type: {}", deviceId, alarmType, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendBehaviorAnalysisResult(String deviceId, Object behaviorPattern) {
|
|
|
- String topic = topicPrefix + deviceId + "/behavior";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("behaviorPattern", behaviorPattern);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Behavior analysis result sent: {}", topic);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("behaviorPattern", behaviorPattern);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_BEHAVIOR_ANALYSIS, payload);
|
|
|
+ log.debug("Behavior analysis result sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending behavior analysis result: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean sendCommandToDevice(String deviceId, String command, Object payload) {
|
|
|
- String topic = topicPrefix + deviceId + "/command";
|
|
|
-
|
|
|
- Map<String, Object> message = new HashMap<>();
|
|
|
- message.put("deviceId", deviceId);
|
|
|
- message.put("command", command);
|
|
|
- message.put("payload", payload);
|
|
|
- message.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- try {
|
|
|
- publish(topic, message);
|
|
|
- log.info("Command sent to device: {}, command: {}", topic, command);
|
|
|
+ try {
|
|
|
+ Map<String, Object> message = new HashMap<>();
|
|
|
+ message.put("command", command);
|
|
|
+ message.put("payload", payload);
|
|
|
+ message.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = String.format(MqttTopics.DEV_COMMAND, deviceId);
|
|
|
+ sendMessage(topic, message);
|
|
|
+ log.info("Command sent to device: {}, command: {}", deviceId, command);
|
|
|
return true;
|
|
|
} catch (Exception e) {
|
|
|
- log.error("Failed to send command to device: {}, command: {}, error: {}", topic, command, e.getMessage());
|
|
|
+ log.error("Failed to send command to device: {}, command: {}, error: {}", deviceId, command, e.getMessage());
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendFallAlarmMessage(String deviceId, int pose, List<Float> targetPoint) {
|
|
|
- String topic = topicPrefix + deviceId + "/fall";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("pose", pose);
|
|
|
- payload.put("targetPoint", targetPoint);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Fall alarm message sent: {}", topic);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_FALL.getCode());
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("pose", pose);
|
|
|
+ payload.put("target_point", targetPoint);
|
|
|
+ payload.put("alarmType", "fall");
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ // 跌倒告警使用QoS 2确保可靠传输
|
|
|
+ sendMessage(MqttTopics.DAS_ALARM_EVENT, payload, 2);
|
|
|
+ log.info("Fall alarm message sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending fall alarm message: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceRebootCommand(String deviceId) {
|
|
|
- String topic = topicPrefix + deviceId + "/command";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("command", "reboot");
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Device reboot command sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/reboot";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Device reboot command sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device reboot command: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceResetCommand(String deviceId) {
|
|
|
- String topic = topicPrefix + deviceId + "/command";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("command", "reset");
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Device reset command sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/reset";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Device reset command sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device reset command: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceCommand(String deviceId, String command, Map<String, Object> params) {
|
|
|
- String topic = topicPrefix + deviceId + "/command";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("command", command);
|
|
|
- payload.put("params", params);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Device command sent: {}, command: {}", deviceId, command);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("command", command);
|
|
|
+ payload.put("params", params);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/" + command;
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Device command sent: {}, command: {}", deviceId, command);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device command: {}, command: {}", deviceId, command, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceKeepAliveResponse(String deviceId, int status) {
|
|
|
- String topic = topicPrefix + deviceId + "/keepalive/response";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("status", status);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Keep-alive response sent: {}, status: {}", deviceId, status);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("code", status);
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DAS_PREFIX + deviceId + "/keepalive";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.debug("Device keepalive response sent: {}, code: {}", deviceId, status);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device keepalive response: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceNotFoundResponse(String deviceId) {
|
|
|
- String topic = topicPrefix + deviceId + "/response";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("status", "error");
|
|
|
- payload.put("message", "Device not found");
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.warn("Device not found response sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("code", 404);
|
|
|
+ payload.put("message", "Device not found");
|
|
|
+
|
|
|
+ String topic = MqttTopics.APP_DEVICE_INFO_RESPONSE;
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.debug("Device not found response sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device not found response: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendAlarmAckMessage(String deviceId, Long eventId) {
|
|
|
- String topic = topicPrefix + deviceId + "/alarm/ack";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("eventId", eventId);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Alarm acknowledgment message sent: {}, eventId: {}", deviceId, eventId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("event_id", eventId);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.APP_FALL_EVENT_ACK, payload);
|
|
|
+ log.debug("Alarm acknowledgment sent: {}, eventId: {}", deviceId, eventId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending alarm acknowledgment: {}, eventId: {}", deviceId, eventId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceParamSetCommand(String deviceId, String paramType, String paramName, float value) {
|
|
|
- String topic = topicPrefix + deviceId + "/param/set";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("paramType", paramType);
|
|
|
- payload.put("paramName", paramName);
|
|
|
- payload.put("value", value);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Parameter set command sent: {}, type: {}, name: {}, value: {}", deviceId, paramType, paramName, value);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("param_type", paramType);
|
|
|
+ payload.put("param_name", paramName);
|
|
|
+ payload.put("value", value);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/set_param";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Device parameter set command sent: {}, {}={}", deviceId, paramName, value);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device parameter set command: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendSetDeviceParamCommand(String deviceId, String paramType, String paramName, Float value) {
|
|
|
- sendDeviceParamSetCommand(deviceId, paramType, paramName, value);
|
|
|
+ sendDeviceParamSetCommand(deviceId, paramType, paramName, value != null ? value : 0.0f);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendUpdateNetworkCommand(String deviceId, String ssid, String password) {
|
|
|
- String topic = topicPrefix + deviceId + "/network/update";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("ssid", ssid);
|
|
|
- payload.put("password", password);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Network update command sent: {}, SSID: {}", deviceId, ssid);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("ssid", ssid);
|
|
|
+ payload.put("password", password);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/network";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Network update command sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending network update command: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceLoginResponse(String deviceId, int code) {
|
|
|
- String topic = topicPrefix + deviceId + "/login/response";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("code", code);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Device login response sent: {}, code: {}", deviceId, code);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("code", code);
|
|
|
+ payload.put("expires", 90); // 过期时间,单位秒
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DAS_PREFIX + deviceId + "/login";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.debug("Device login response sent: {}, code: {}", deviceId, code);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device login response: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendEventMessage(String deviceId, int pose, Object targetPoint, String event) {
|
|
|
- String topic = topicPrefix + deviceId + "/event";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("pose", pose);
|
|
|
- payload.put("targetPoint", targetPoint);
|
|
|
- payload.put("event", event);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Event message sent: {}, event: {}", deviceId, event);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_FALL.getCode());
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("event", event);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+ payload.put("pose", pose);
|
|
|
+ payload.put("target_point", targetPoint);
|
|
|
+
|
|
|
+ // 对于确认的跌倒事件,使用QoS 2
|
|
|
+ int qos = "fall_confirmed".equals(event) ? 2 : 0;
|
|
|
+ sendMessage(MqttTopics.DAS_EVENT, payload, qos);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending event message: {}, event: {}", deviceId, event, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendAlarmEventMessage(String deviceId, String description, String table, int tableId) {
|
|
|
- String topic = topicPrefix + deviceId + "/alarm/event";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("description", description);
|
|
|
- payload.put("table", table);
|
|
|
- payload.put("tableId", tableId);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Alarm event message sent: {}, description: {}", deviceId, description);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("message_type", DeviceConstants.MessageType.MSG_ALARM_EVENT.getCode());
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+ payload.put("desc", description);
|
|
|
+ payload.put("table", table);
|
|
|
+ payload.put("table_id", tableId);
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_ALARM_EVENT, payload);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending alarm event message: {}, desc: {}", deviceId, description, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendExistenceMessage(String deviceId, String event) {
|
|
|
- String topic = topicPrefix + deviceId + "/existence";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("event", event);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Existence event message sent: {}, event: {}", deviceId, event);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("message", "notify");
|
|
|
+ payload.put("message_type", DeviceConstants.MessageType.MSG_EVENT_EXIST.getCode());
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("event", event);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_EXIST_EVENT, payload);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending existence message: {}, event: {}", deviceId, event, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendNetworkConfigUpdate(String deviceId, Device.NetworkInfo networkInfo) {
|
|
|
- String topic = topicPrefix + deviceId + "/network/config";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("networkInfo", networkInfo);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Network config update sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ if (networkInfo != null) {
|
|
|
+ payload.put("ssid", networkInfo.getSsid());
|
|
|
+ payload.put("password", networkInfo.getPassword());
|
|
|
+ payload.put("ip", networkInfo.getIp());
|
|
|
+ }
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/network_config";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Network config update sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending network config update: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendInstallParamUpdate(String deviceId, Device.InstallParam installParam) {
|
|
|
- String topic = topicPrefix + deviceId + "/install/param";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("installParam", installParam);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Install parameter update sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ if (installParam != null) {
|
|
|
+ payload.put("mount_plain", installParam.getMountPlain());
|
|
|
+ payload.put("height", installParam.getHeight());
|
|
|
+
|
|
|
+ // 跟踪区域
|
|
|
+ if (installParam.getTrackingRegion() != null) {
|
|
|
+ Map<String, Object> trackingRegion = getStringObjectMap(installParam.getTrackingRegion());
|
|
|
+ payload.put("tracking_region", trackingRegion);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/install_param";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Install parameter update sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending install parameter update: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendTrackingRegionUpdate(String deviceId, Device.TrackingRegion trackingRegion) {
|
|
|
- String topic = topicPrefix + deviceId + "/tracking/region";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("trackingRegion", trackingRegion);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Tracking region update sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ if (trackingRegion != null) {
|
|
|
+ Map<String, Object> regionMap = getStringObjectMap(trackingRegion);
|
|
|
+ payload.put("tracking_region", regionMap);
|
|
|
+ }
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/tracking_region";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Tracking region update sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending tracking region update: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendAlarmScheduleUpdate(String deviceId, Map<String, Object> alarmSchedule) {
|
|
|
- String topic = topicPrefix + deviceId + "/alarm/schedule";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("alarmSchedule", alarmSchedule);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.info("Alarm schedule update sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("alarm_schedule", alarmSchedule);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ String topic = DeviceConstants.MqttConstant.TOPIC_DEVICE_PREFIX + deviceId + "/alarm_schedule";
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.info("Alarm schedule update sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending alarm schedule update: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendDeviceInfoResponse(String deviceId, Device device) {
|
|
|
- String topic = topicPrefix + deviceId + "/info/response";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("device", device);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Device info response sent: {}", deviceId);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("device", device);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.APP_DEVICE_INFO_RESPONSE, payload);
|
|
|
+ log.debug("Device info response sent: {}", deviceId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending device info response: {}", deviceId, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendStatusMessage(String deviceId, String status, Map<String, Object> data) {
|
|
|
- String topic = topicPrefix + deviceId + "/status";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>(data);
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("status", status);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Device status message sent: {}, status: {}", topic, status);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>(data);
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("status", status);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_STATUS, payload);
|
|
|
+ log.debug("Device status message sent: {}, status: {}", deviceId, status);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending status message: {}, status: {}", deviceId, status, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendBehaviorMessage(String deviceId, String behaviorType, Map<String, Object> data) {
|
|
|
- String topic = topicPrefix + deviceId + "/behavior";
|
|
|
-
|
|
|
- Map<String, Object> payload = new HashMap<>(data);
|
|
|
- payload.put("deviceId", deviceId);
|
|
|
- payload.put("behaviorType", behaviorType);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Behavior message sent: {}, type: {}", topic, behaviorType);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>(data);
|
|
|
+ payload.put("dev_id", deviceId);
|
|
|
+ payload.put("behaviorType", behaviorType);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(MqttTopics.DAS_BEHAVIOR_ANALYSIS, payload);
|
|
|
+ log.debug("Behavior message sent: {}, type: {}", deviceId, behaviorType);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending behavior message: {}, type: {}", deviceId, behaviorType, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -452,9 +572,7 @@ public class MqttGatewayImpl implements MqttGateway {
|
|
|
payload.put("global", globalConfig);
|
|
|
payload.put("timestamp", System.currentTimeMillis());
|
|
|
|
|
|
- String topic = "/das/report_alarm_param";
|
|
|
- publish(topic, payload);
|
|
|
-
|
|
|
+ sendMessage(MqttTopics.DAS_REPORT_ALARM_PARAM, payload);
|
|
|
log.debug("Alarm parameter response sent: code={}, config={}", code, globalConfig);
|
|
|
} catch (Exception e) {
|
|
|
log.error("Failed to send alarm parameter response", e);
|
|
@@ -469,9 +587,7 @@ public class MqttGatewayImpl implements MqttGateway {
|
|
|
payload.put("global", globalConfig);
|
|
|
payload.put("timestamp", System.currentTimeMillis());
|
|
|
|
|
|
- String topic = "/das/set_alarm_param_ack";
|
|
|
- publish(topic, payload);
|
|
|
-
|
|
|
+ sendMessage(MqttTopics.DAS_SET_ALARM_PARAM_ACK, payload);
|
|
|
log.debug("Set alarm parameter acknowledgment sent: code={}, config={}", code, globalConfig);
|
|
|
} catch (Exception e) {
|
|
|
log.error("Failed to send set alarm parameter acknowledgment", e);
|
|
@@ -480,35 +596,45 @@ public class MqttGatewayImpl implements MqttGateway {
|
|
|
|
|
|
@Override
|
|
|
public void sendResponse(String topic, int code, Map<String, Object> data) {
|
|
|
- Map<String, Object> payload = new HashMap<>(data);
|
|
|
- payload.put("code", code);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Response sent to topic: {}, code: {}", topic, code);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>(data);
|
|
|
+ payload.put("code", code);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.debug("Response sent to topic: {}, code: {}", topic, code);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending response to topic: {}", topic, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendCommand(String topic, String command, Map<String, Object> params) {
|
|
|
- Map<String, Object> payload = new HashMap<>();
|
|
|
- payload.put("command", command);
|
|
|
- if (params != null) {
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>();
|
|
|
+ payload.put("command", command);
|
|
|
payload.put("params", params);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.debug("Command sent to topic: {}, command: {}", topic, command);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending command to topic: {}", topic, e);
|
|
|
}
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Command sent to topic: {}, command: {}", topic, command);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void sendGenericMessage(String topic, String messageType, Map<String, Object> messageData) {
|
|
|
- Map<String, Object> payload = new HashMap<>(messageData);
|
|
|
- payload.put("messageType", messageType);
|
|
|
- payload.put("timestamp", System.currentTimeMillis());
|
|
|
-
|
|
|
- publish(topic, payload);
|
|
|
- log.debug("Generic message sent to topic: {}, type: {}", topic, messageType);
|
|
|
+ try {
|
|
|
+ Map<String, Object> payload = new HashMap<>(messageData);
|
|
|
+ payload.put("message_type", messageType);
|
|
|
+ payload.put("timestamp", System.currentTimeMillis());
|
|
|
+
|
|
|
+ sendMessage(topic, payload);
|
|
|
+ log.debug("Generic message sent to topic: {}, type: {}", topic, messageType);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending generic message to topic: {}", topic, e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -517,7 +643,8 @@ public class MqttGatewayImpl implements MqttGateway {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 发送消息到MQTT服务器
|
|
|
+ * 发送消息到MQTT服务器 - 增强版
|
|
|
+ * 支持动态QoS和消息保留设置
|
|
|
* @param topic 主题
|
|
|
* @param payload 负载
|
|
|
* @param qos 质量等级
|
|
@@ -538,4 +665,25 @@ public class MqttGatewayImpl implements MqttGateway {
|
|
|
log.error("Failed to send MQTT message to topic: {}, error: {}", topic, e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息 - 私有方法
|
|
|
+ * 默认QoS 1,适合大多数业务场景
|
|
|
+ */
|
|
|
+ private void sendMessage(String topic, Object payload) {
|
|
|
+ sendMessage(topic, payload, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息 - 支持动态QoS
|
|
|
+ */
|
|
|
+ private void sendMessage(String topic, Object payload, int qos) {
|
|
|
+ try {
|
|
|
+ String json = objectMapper.writeValueAsString(payload);
|
|
|
+ sendToMqtt(topic, json, qos, false);
|
|
|
+ log.debug("MQTT message sent to topic: {}", topic);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error sending MQTT message to topic: {}", topic, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|