Browse Source

device redis缓存 改写

chejianzheng 3 months ago
parent
commit
5f2afbb67e

+ 1 - 1
device-service-application/src/main/java/com/hfln/device/application/service/DeviceSchedulerService.java

@@ -47,7 +47,7 @@ public class DeviceSchedulerService {
      * 定时检查设备告警确认状态
      * 每分钟执行一次
      */
-//    @Scheduled(fixedRate = 60000)
+    @Scheduled(fixedRate = 10000)
     public void checkDeviceAlarmAck() {
         try {
             log.debug("开始执行设备告警确认状态检查任务");

+ 5 - 0
device-service-application/src/main/java/com/hfln/device/application/service/impl/DeviceCommandServiceImpl.java

@@ -668,6 +668,11 @@ public class DeviceCommandServiceImpl implements DeviceCommandService {
             // 6. 更新确认时间(对应Python版本:now = get_utc_time_ms(); device.set_last_alarm_ack_time(now))
             long now = System.currentTimeMillis();
             device.setLastAlarmAckTime(now);
+
+            Map<String, Object> updateMap = new HashMap<>();
+            updateMap.put("alarmAck", Boolean.TRUE.toString());
+            updateMap.put("lastAlarmAckTime", now);
+            deviceManagerService.updateDeviceMapInCache(deviceId, updateMap);
             
         } catch (Exception e) {
             // 对应Python版本的异常处理逻辑

+ 16 - 2
device-service-application/src/main/java/com/hfln/device/application/service/impl/DeviceEventServiceImpl.java

@@ -1046,6 +1046,9 @@ public class DeviceEventServiceImpl implements DeviceEventService {
                 mqttGateway.sendEventMessage(deviceId, Collections.emptyList(), pose, targets, event);
                 
                 device.setLastFallTime(event, now);      // device.set_last_fall_time(event, now)
+                Map<String, Object> updateMap = new HashMap<>();
+                updateMap.put(event, now);
+                deviceManagerService.updateDeviceMapInCache(deviceId, updateMap);
                 String text = String.format("设备上报跌倒事件:躺, dev_id:%s, event:%s", deviceId, event);
                 log.debug(text);  // LOGDBG(text)
                 
@@ -1067,6 +1070,9 @@ public class DeviceEventServiceImpl implements DeviceEventService {
                 mqttGateway.sendEventMessage(deviceId, Collections.emptyList(), realtimePose.get(0).intValue(), targets, event);
                 
                 device.setLastReportFallTime(now);      // device.set_last_report_fall_time(now)
+                Map<String, Object> updateMap = new HashMap<>();
+                updateMap.put("lastReportFallTime", now);
+                deviceManagerService.updateDeviceMapInCache(deviceId, updateMap);
                 String text = String.format("设备上报跌倒事件:躺, dev_id:%s, event:%s", deviceId, event);
                 log.debug(text);  // LOGDBG(text)
             }
@@ -1231,6 +1237,10 @@ public class DeviceEventServiceImpl implements DeviceEventService {
                 String text = String.format("设备上报跌倒事件:躺, dev_id:%s, event:%s", deviceId, event);
                 log.debug(text); // LOGDBG(text)
             }
+
+            Map<String, Object> updateMap = new HashMap<>();
+            updateMap.put("lastReportFallTime", device.getLastFallTime());
+            deviceManagerService.updateDeviceMapInCache(deviceId, updateMap);
             
             log.info("跌倒事件处理完成: deviceId={}, event={}, type={}", deviceId, event, type);
             
@@ -1257,8 +1267,12 @@ public class DeviceEventServiceImpl implements DeviceEventService {
             //     mqtt_send.exist_msg(dev_id=dev_id, event=event)
             
             // 验证设备是否已注册(对应Python版本的设备检查)
-            Optional<Device> deviceOpt = deviceManagerService.getDeviceFromCache(deviceId);
-            if (!deviceOpt.isPresent()) {
+//            Optional<Device> deviceOpt = deviceManagerService.getDeviceFromCache(deviceId);
+//            if (!deviceOpt.isPresent()) {
+//                log.warn("设备未注册,忽略存在事件: deviceId={}", deviceId);
+//                return;
+//            }
+            if (!deviceManagerService.existInCache(deviceId)) {
                 log.warn("设备未注册,忽略存在事件: deviceId={}", deviceId);
                 return;
             }

+ 3 - 3
device-service-application/src/main/java/com/hfln/device/application/task/DeviceStatusCheckTask.java

@@ -37,7 +37,7 @@ public class DeviceStatusCheckTask {
      * 检查设备心跳超时 (参考Python版本的check_dev_keepalive函数)
      * 每5秒执行一次 - 平衡性能和及时性
      */
-    @Scheduled(fixedRate = 5000) // 5秒检查一次
+//    @Scheduled(fixedRate = 5000) // 5秒检查一次
     public void checkDeviceKeepAlive() {
         try {
             long currentTime = System.currentTimeMillis();
@@ -72,7 +72,7 @@ public class DeviceStatusCheckTask {
      * 检查设备告警确认超时 (参考Python版本的check_dev_alarm_ack函数)
      * 每10秒执行一次 - 平衡性能和及时性
      */
-    @Scheduled(fixedRate = 10000) // 10秒检查一次
+//    @Scheduled(fixedRate = 10000) // 10秒检查一次
     public void checkDeviceAlarmAck() {
         try {
             log.debug("开始检查设备告警确认状态...");
@@ -167,7 +167,7 @@ public class DeviceStatusCheckTask {
      * 检查所有设备告警计划 (参考Python版本的check_all_dev_alarm_plan函数)
      * 每秒执行一次 - 与Python版本保持一致
      */
-    @Scheduled(fixedRate = 1000) // 1秒检查一次 - 对应Python版本每秒检查
+//    @Scheduled(fixedRate = 1000) // 1秒检查一次 - 对应Python版本每秒检查
     public void checkAllDeviceAlarmPlan() {
         try {
             long currentTime = System.currentTimeMillis();

+ 1 - 1
device-service-common/src/main/java/com/hfln/device/common/constant/mqtt/topic/MqttTopics.java

@@ -56,7 +56,7 @@ public class MqttTopics {
     public static final String DAS_CLOUDPOINT = "/das/cloudpoint";
     public static final String DAS_REALTIME_POS = "/das/realtime_pos";
     public static final String DAS_EVENT = "/das/event";
-    public static final String DAS_EXIST_EVENT = "/das/existence_event";
+    public static final String DAS_EXIST_EVENT = "/das/exist";
     public static final String DAS_ALARM_EVENT = "/das/alarm_event";
     public static final String DAS_BEHAVIOR_ANALYSIS = "/das/behavior_analysis";
     public static final String DAS_SET_DEV_PARAM = "/das/set_device_param";

+ 3 - 2
device-service-domain/src/main/java/com/hfln/device/domain/entity/Device.java

@@ -522,8 +522,9 @@ public class Device {
         try {
             lock.lock();
             // 模拟Python版本的bug:设置局部变量而不是实例变量
-            @SuppressWarnings("unused")
-            boolean alarmAck = ack;  // 对应Python: alarm_ack_ = ack (缺少self.)
+//            @SuppressWarnings("unused")
+//            boolean alarmAck = ack;  // 对应Python: alarm_ack_ = ack (缺少self.)
+            alarmAck = ack;  // 对应Python: alarm_ack_ = ack (缺少self.)
             // 注意:这里故意不写 this.alarmAck = ack; 来模拟Python的bug
         } finally {
             lock.unlock();

+ 2 - 0
device-service-domain/src/main/java/com/hfln/device/domain/service/DeviceManagerService.java

@@ -24,6 +24,8 @@ public interface DeviceManagerService {
      * @param device 设备信息
      */
     void addDeviceToCache(Device device);
+
+    boolean existInCache(String deviceId);
     
     /**
      * 更新设备缓存

+ 23 - 9
device-service-domain/src/main/java/com/hfln/device/domain/service/impl/DeviceRedisManagerServiceImpl.java

@@ -243,6 +243,11 @@ public class DeviceRedisManagerServiceImpl implements DeviceManagerService {
     }
 
     @Override
+    public boolean existInCache(String deviceId) {
+        return redisService.sIsMember(RedisCacheConstant.KEY_DEVICE_ID, deviceId);
+    }
+
+    @Override
     public void updateDeviceInCache(Device device) {
 
         if (!redisService.sIsMember(RedisCacheConstant.KEY_DEVICE_ID, device.getDevId())) {
@@ -474,15 +479,24 @@ public class DeviceRedisManagerServiceImpl implements DeviceManagerService {
     @Override
     public void checkDeviceAlarmAck(long currentTimeMillis, long timeoutMillis) {
 
-//        Set<Object> devIdSet = redisService.sMembers(RedisCacheConstant.KEY_DEVICE_ID);
-//        if (CollectionUtils.isEmpty(devIdSet)) {
-//            log.info("No devices in cache");
-//            return;
-//        }
-//
-//        for (Object devId : devIdSet) {
-//            redisService.hGet(RedisCacheConstant.KEY_DEVICE_pre + devId, "alarmAck")
-//        }
+        Set<Object> devIdSet = redisService.sMembers(RedisCacheConstant.KEY_DEVICE_ID);
+        if (CollectionUtils.isEmpty(devIdSet)) {
+            log.info("No devices in cache");
+            return;
+        }
+
+        for (Object devId : devIdSet) {
+            Boolean alarmAck = Boolean.parseBoolean(String.valueOf(redisService.hGet(RedisCacheConstant.KEY_DEVICE_pre + devId, "alarmAck")));
+            Long lastAlarmAckTime = (Long) redisService.hGet(RedisCacheConstant.KEY_DEVICE_pre + devId, "lastAlarmAckTime");
+            if (Boolean.TRUE.equals(alarmAck) && lastAlarmAckTime != null) {
+                if (currentTimeMillis - lastAlarmAckTime > timeoutMillis) {
+                    log.info("告警确认超时,清除确认状态: deviceId={}, currentTime={}",
+                            devId, currentTimeMillis);
+                    redisService.hSet(RedisCacheConstant.KEY_DEVICE_pre + devId, "alarmAck", false);
+                }
+            }
+
+        }
 //
 //        deviceCache.forEach((devId, device) -> {
 //            // 只检查未确认的告警

+ 6 - 6
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/config/MqttConfig.java

@@ -120,13 +120,13 @@ public class MqttConfig {
             MqttTopics.DEV_LOGIN,
             MqttTopics.DEV_KEEPALIVE,
             MqttTopics.DEV_DSP_DATA,
-            MqttTopics.DEV_CLOUDPOINT,
-            MqttTopics.DEV_REP_DEV_INFO,
+//            MqttTopics.DEV_CLOUDPOINT,
+//            MqttTopics.DEV_REP_DEV_INFO,
             MqttTopics.DEV_REP_DEV_PARAM,
-            MqttTopics.DEV_REP_FALL_EVENT,
-            MqttTopics.DEV_REP_PRES_EVENT,
-            MqttTopics.DEV_SET_DEBUG,
-            MqttTopics.DEV_GET_DEBUG
+//            MqttTopics.DEV_REP_FALL_EVENT,
+//            MqttTopics.DEV_REP_PRES_EVENT,
+//            MqttTopics.DEV_SET_DEBUG,
+//            MqttTopics.DEV_GET_DEBUG
         };
         
         MqttPahoMessageDrivenChannelAdapter adapter =

+ 6 - 4
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/gateway/MqttGatewayImpl.java

@@ -163,6 +163,8 @@ public class MqttGatewayImpl implements MqttGateway {
     public void sendDeviceInfoUpdateNotification(Device device) {
         try {
             Map<String, Object> message = new HashMap<>();
+            message.put("message", "notify");
+            message.put("timestamp", System.currentTimeMillis());
             message.put("dev_id", device.getDevId());
             message.put("dev_type", device.getDevType());
             message.put("software", device.getSoftware());
@@ -193,10 +195,10 @@ public class MqttGatewayImpl implements MqttGateway {
                     installParam.put("tracking_region", trackingRegion);
                 }
 
-                message.put("install_param", installParam);
+                message.put("radar_param", installParam);
             }
 
-            String topic = "/mps/update_dev_info";
+            String topic = "/das/dev_status";
             publishJson(topic, message);
 
             log.info("Device info update notification sent: deviceId={}", device.getDevId());
@@ -775,9 +777,9 @@ public class MqttGatewayImpl implements MqttGateway {
 
             // 目标位置 (对应Python版本的targets)
             if (targets != null) {
-                message.put("targets", targets);
+                message.put("target_point", targets);
             } else {
-                message.put("targets", new ArrayList<>());
+                message.put("target_point", new ArrayList<>());
             }
 
             // 发送到实时位置主题 (对应Python版本的MQTT主题)

+ 24 - 20
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/DeviceMessageHandler.java

@@ -95,14 +95,18 @@ public class DeviceMessageHandler {
                         handleDeviceDspData(topic, payload);
                         break;
                     case "cloudpoint":
+                        // todo 目前没有 lna 设备,暂不考虑点云数据改造
                         handleDeviceCloudPoint(topic, payload);
                         break;
                     case "report_falling_event":
+                        // todo 这个 主题 待确认是否废弃
                         handleDeviceReportFallEvent(topic, payload);
                         break;
                     case "report_presence_event":
                         handleDeviceReportPresenceEvent(topic, payload);
                         break;
+
+                        // todo 待确认是否废弃
                     case "set_debug_param":
                         handleSetDebugParam(topic, payload);
                         break;
@@ -540,7 +544,7 @@ public class DeviceMessageHandler {
             
             // === 设备验证 (对应Python: with g_dev_map_lock: if dev_id not in g_dev_map) ===
             // 未注册的设备,不处理
-            if (!deviceManagerService.getDeviceFromCache(deviceId).isPresent()) {
+            if (!deviceManagerService.existInCache(deviceId)) {
                 log.debug("error, device not registed: {}", deviceId); // 对应Python: LOGDBG(f"error, device not registed: {dev_id}")
                 return;
             }
@@ -564,25 +568,25 @@ public class DeviceMessageHandler {
             log.info("Device {} presence event: type={}, event={}, time={}", 
                     deviceId, type, event, timestamp);
             
-            // 根据事件类型进行不同的处理
-            switch (event) {
-                case "presence_detected":
-                    log.info("Person entered monitoring area of device {}", deviceId);
-                    break;
-                case "presence_lost":
-                    log.info("Person left monitoring area of device {}", deviceId);
-                    break;
-                case "motion_detected":
-                    log.debug("Motion detected by device {}", deviceId);
-                    break;
-                case "motion_stopped":
-                    log.debug("Motion stopped at device {}", deviceId);
-                    break;
-                default:
-                    log.debug("Unknown presence event: {} from device {}", event, deviceId);
-                    break;
-            }
-            
+//            // 根据事件类型进行不同的处理
+//            switch (event) {
+//                case "presence_detected":
+//                    log.info("Person entered monitoring area of device {}", deviceId);
+//                    break;
+//                case "presence_lost":
+//                    log.info("Person left monitoring area of device {}", deviceId);
+//                    break;
+//                case "motion_detected":
+//                    log.debug("Motion detected by device {}", deviceId);
+//                    break;
+//                case "motion_stopped":
+//                    log.debug("Motion stopped at device {}", deviceId);
+//                    break;
+//                default:
+//                    log.debug("Unknown presence event: {} from device {}", event, deviceId);
+//                    break;
+//            }
+//
             // === 委托给应用层服务处理 (对应Python: mqtt_send.exist_msg(dev_id=dev_id, event=event)) ===
             // 应用层将进行设备验证,并处理:状态更新、活动统计、模式分析、异常检测,发送存在事件消息
             deviceEventPort.handleExistEvent(deviceId, timestamp, type, event);

+ 26 - 24
device-service-infrastructure/src/main/java/com/hfln/device/infrastructure/mqtt/handler/MpsMessageHandler.java

@@ -94,6 +94,7 @@ public class MpsMessageHandler {
                 handleDeviceReboot(topic, payload);
             } else {
                 switch (topic) {
+                    // todo 待确定是否废弃
                     case MqttTopics.MPS_GET_DEV_INFO:
                         handleGetDeviceInfo(topic, payload);
                         break;
@@ -103,6 +104,7 @@ public class MpsMessageHandler {
                     case MqttTopics.MPS_SET_DEV_PARAM:
                         handleSetDeviceParam(topic, payload);
                         break;
+                        // todo 待确定 添加删除 逻辑
                     case MqttTopics.MPS_ADD_DEVICE:
                         handleAddDevice(topic, payload);
                         break;
@@ -289,12 +291,12 @@ public class MpsMessageHandler {
             log.info("Processing set device param request: {}, payload: {}", devId, payload);
             
             // 获取操作者信息用于审计
-            String operatorId = (String) messageData.get("operator_id");
-            String operatorType = (String) messageData.get("operator_type");
-            String operationReason = (String) messageData.get("reason");
-            
-            log.info("Device parameters being set by: {} ({}), target device: {}, reason: {}", 
-                    operatorId, operatorType, devId, operationReason);
+//            String operatorId = (String) messageData.get("operator_id");
+//            String operatorType = (String) messageData.get("operator_type");
+//            String operationReason = (String) messageData.get("reason");
+//
+//            log.info("Device parameters being set by: {} ({}), target device: {}, reason: {}",
+//                    operatorId, operatorType, devId, operationReason);
             
             // 参数合理性验证
             Float heightValue = height.floatValue();
@@ -341,7 +343,7 @@ public class MpsMessageHandler {
             // 应用层将处理:设备通信、配置下发、确认等待、数据库更新、审计记录
             deviceCommandService.handleSetDeviceParam(devId, mountingPlain, areaStr, heightValue);
             
-            log.info("Device parameter setting request submitted by operator {}: {}", operatorId, devId);
+//            log.info("Device parameter setting request submitted by operator {}: {}", operatorId, devId);
             
         } catch (Exception e) {
             log.error("Error handling set device param request: {}", e.getMessage(), e);
@@ -391,23 +393,23 @@ public class MpsMessageHandler {
             String devId = matcher.group(1);
             log.warn("CRITICAL: Processing device reboot request for: {}", devId);
             
-            // 解析重启请求信息
-            Map<String, Object> messageData = JsonUtil.parseMap(payload);
-            String operatorId = (String) messageData.get("operator_id");
-            String operatorType = (String) messageData.get("operator_type");
-            String rebootReason = (String) messageData.get("reason");
-            String emergencyLevel = (String) messageData.get("emergency_level");
-            
-            // 记录重要的重启操作
-            log.warn("DEVICE REBOOT INITIATED - Device: {}, Operator: {} ({}), Reason: {}, Emergency Level: {}", 
-                    devId, operatorId, operatorType, rebootReason, emergencyLevel);
-            
-            // 验证重启权限(高风险操作)
-            if (!"admin".equals(operatorType) && !"technician".equals(operatorType)) {
-                log.error("UNAUTHORIZED REBOOT ATTEMPT - Device: {}, Operator: {} ({})", 
-                         devId, operatorId, operatorType);
-                return;
-            }
+//            // 解析重启请求信息
+//            Map<String, Object> messageData = JsonUtil.parseMap(payload);
+//            String operatorId = (String) messageData.get("operator_id");
+//            String operatorType = (String) messageData.get("operator_type");
+//            String rebootReason = (String) messageData.get("reason");
+//            String emergencyLevel = (String) messageData.get("emergency_level");
+//
+//            // 记录重要的重启操作
+//            log.warn("DEVICE REBOOT INITIATED - Device: {}, Operator: {} ({}), Reason: {}, Emergency Level: {}",
+//                    devId, operatorId, operatorType, rebootReason, emergencyLevel);
+//
+//            // 验证重启权限(高风险操作)
+//            if (!"admin".equals(operatorType) && !"technician".equals(operatorType)) {
+//                log.error("UNAUTHORIZED REBOOT ATTEMPT - Device: {}, Operator: {} ({})",
+//                         devId, operatorId, operatorType);
+//                return;
+//            }
             
             // 记录重启前的设备状态用于审计
             log.info("Recording device state before reboot: {}", devId);