Procházet zdrojové kódy

新增测试程序

nifangxu před 1 týdnem
rodič
revize
5ff2451ca3
5 změnil soubory, kde provedl 377 přidání a 83 odebrání
  1. 1 0
      .gitignore
  2. 74 82
      core/alarm_plan.py
  3. 174 0
      demos/SleepReplayAnalyzer.py
  4. 126 0
      demos/sleep_monitor.py
  5. 2 1
      mqtt/mqtt_recv.py

+ 1 - 0
.gitignore

@@ -13,6 +13,7 @@ bin-release/
 *.pyd
 
 log/
+demos/record/
 
 # C extensions
 *.so

+ 74 - 82
core/alarm_plan.py

@@ -877,14 +877,12 @@ class AlarmPlan:
         MOTION_SMOOTH_WINDOW = 10        # 平滑窗口大小(取最近10帧计算平均)
         STAY_THRESHOLD_PEACEFUL = 0.05   # 静止阈值
         STAY_THRESHOLD_MICRO = 0.15      # 微动阈值
-        LEAVE_BED_TS        = 3         # 离床判定时间阈值
+        LEAVE_BED_TS = 3                 # 离床判定时间阈值
 
         try:
             dev_id = self.dev_id_
-            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
-            if not device:
-                return
-            if (not self.rect_):
+            device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
+            if not device or not self.rect_:
                 return
 
             rtd_list = device.get_rtd_que_copy()
@@ -893,136 +891,130 @@ class AlarmPlan:
 
             # 初始化状态机
             if not hasattr(self, "sleep_stat_"):
-                self.sleep_stat_: str   = "leave"   # 当前睡眠状态
-                self.update_sleep_stat_: str   = "leave" # 要更新的睡眠状态
-                self.avg_motion_: float = 0.0       # 当前运动幅值
-                self.motion_stat_: str  = "leave"   # 当前运动状态
-                self.avg_breath_: float = 0.0       # 当前呼吸率
-                self.breathe_stat_: str = "r0"      # 当前呼吸状态
-
-                self.start_sleep_ts_: int   = -1    # 睡眠开始时间
-                self.end_sleep_ts_: int     = -1    # 睡眠结束时间
-
-                self.motion_window = deque(maxlen=MOTION_SMOOTH_WINDOW) # 近 N 次运动距离均值
-                self.sleep_segments_    = []    # 状态阶段记录 [{"ts":xxx, "stat":xxx}, ...]
-                self.last_pts_          = []        # 最后的点目标
-                self.last_update_ts_    = -1
+                self.sleep_stat_: str = "leave"   # 当前睡眠状态
+                self.update_sleep_stat_: str = "leave"
+                self.avg_motion_: float = 0.0
+                self.motion_stat_: str = "leave"
+                self.avg_breath_: float = 0.0
+                self.breathe_stat_: str = "r0"
+
+                self.start_sleep_ts_: int = -1
+                self.end_sleep_ts_: int = -1
+
+                self.motion_window = deque(maxlen=MOTION_SMOOTH_WINDOW)
+                self.sleep_segments_ = []    # [{"ts":xxx, "stat":xxx}, ...]
+                self.last_pts_ = []
+                self.last_update_ts_ = -1
                 self.miss_target_count_ = 0
-                self.last_leave_ts = -1 # 上次离床判定时间,离床判定时间超过5秒视为离床事件
+                self.last_leave_ts = -1  # 离床检测起点
 
-            now_ts  = get_utc_time_s()
+            now_ts = get_utc_time_s()
             rtd_unit = rtd_list[-1]
             ts = rtd_unit["timestamp"]
             target_point = rtd_unit["target_point"]
 
-            ## 1. 空间状态分析
+            ## 1. 空间状态分析(motion)
             if now_ts - ts > NO_TARGET_TIMEOUT_S:
-                ## 目标不存在
-                # 起夜
-                x, y, z, snr = target_point
-                if not helper.is_point_in_rect(x, y, self.rect_):
+                # 长时间无数据 → 目标消失
+                self.miss_target_count_ += 1
+                if self.miss_target_count_ >= NO_TARGET_MAX_COUNT:
                     motion = "leave"
-
-                # 检测不到体动
-                if self.start_sleep_ts_ == -1:
-                    return
                 else:
-                    motion = "peaceful"
+                    motion = self.motion_stat_
             else:
-                ## 目标存在
-                x = target_point[0]
-                y = target_point[1]
+                self.miss_target_count_ = 0
+                x, y, z, snr = target_point
                 if not helper.is_point_in_rect(x, y, self.rect_):
                     # 不在床上
-                    motion = "leave"
+                    if self.last_leave_ts == -1:
+                        self.last_leave_ts = now_ts
+                    elif now_ts - self.last_leave_ts > LEAVE_BED_TS:
+                        motion = "leave"
+                    else:
+                        motion = "active"
                 else:
-                    # 在床上
-                    # 计算体动
-                    WINDOWS_S = 10    # 滑动事件窗口
+                    self.last_leave_ts = -1
+                    # 在床上,计算位移
+                    WINDOWS_S = 10
                     recent_rtds = [r for r in rtd_list if now_ts - r["timestamp"] <= WINDOWS_S]
-                    if len(recent_rtds) < 5:
-                        return
-                    
-                    # 取最新点
-                    x,y,z,snr = recent_rtds[-1]["target_point"]
-                    # 检查是否在床上
-                    if not helper.is_point_in_rect(x, y, self.rect_):
-                        if self.last_leave_ts == -1:
-                            self.last_leave_ts = now_ts
-                        elif now_ts - self.last_leave_ts > LEAVE_BED_TS:
-                            motion = "leave"
+                    if len(recent_rtds) < 2:
+                        motion = self.motion_stat_
                     else:
-                        self.last_leave_ts = -1
-                        # 计算位移序列
                         motions = []
                         for i in range(1, len(recent_rtds)):
                             x1, y1, z1, _ = recent_rtds[i - 1]["target_point"]
                             x2, y2, z2, _ = recent_rtds[i]["target_point"]
                             dist = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5
                             motions.append(dist)
+
                         if motions:
                             avg_motion = sum(motions) / len(motions)
                             self.motion_window.append(avg_motion)
                             motion_smooth = sum(self.motion_window) / len(self.motion_window)
                         else:
-                            motion_smooth = 0
+                            motion_smooth = 0.0
 
-                    # 状态判定
-                    if motion_smooth < STAY_THRESHOLD_PEACEFUL:
-                        motion = "peaceful"
-                    elif motion_smooth < STAY_THRESHOLD_MICRO:
-                        motion = "mocro"
-                    else:
-                        motion = "active"
+                        # 判定运动状态
+                        if motion_smooth < STAY_THRESHOLD_PEACEFUL:
+                            motion = "peaceful"
+                        elif motion_smooth < STAY_THRESHOLD_MICRO:
+                            motion = "micro"
+                        else:
+                            motion = "active"
+
+            self.motion_stat_ = motion
 
-                self.motion_stat_ = motion
+            ## 2. 呼吸状态分析(breathe)
+            BREATHE_WINDOWS_S = 5
+            recent_breaths = [
+                r.get("breath_rpm", 0.0)
+                for r in rtd_list
+                if (now_ts - r["timestamp"] <= BREATHE_WINDOWS_S)
+                and isinstance(r.get("breath_rpm"), (int, float))
+            ]
 
-            ## 2. 呼吸率分析
-            BREATHE_WINDOWS_S = 5   # 呼吸滑动时间窗口
-            recent_breaths =[r["breath_rpm"] for r in rtd_list
-                             if (now_ts - r["timestamp"] <= BREATHE_WINDOWS_S) and
-                             ("breath_rpm" in r and isinstance(r["breath_rpm"], (int, float)))]
-            
             if not recent_breaths:
-                breathe_stat = "r0" # 无数据时视为异常/无呼吸
                 avg_breath = 0.0
+                breathe_stat = "r0"
             else:
                 avg_breath = sum(recent_breaths) / len(recent_breaths)
-
                 if avg_breath < 8:
                     breathe_stat = "r0"
                 elif avg_breath < 12:
                     breathe_stat = "r1"
                 elif avg_breath < 18:
-                    breathe_stat = "r1"
+                    breathe_stat = "r2"
+                elif avg_breath < 25:
+                    breathe_stat = "r3"
                 else:
-                    breathe_stat = "r1"
+                    breathe_stat = "r4"
 
             self.avg_breath_ = avg_breath
             self.breathe_stat_ = breathe_stat
 
-            ## 3. 睡眠状态分析
+            ## 3. 睡眠状态分析(sleep)
             try:
                 i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
                 i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
                 self.update_sleep_stat_ = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
             except ValueError:
-                LOGERR(f"infalid i_montion or i_breath")
-
+                LOGERR(f"[SleepMonitoring] invalid motion/breath index")
+                return
 
-            # 3.1 更新睡眠报告
-            sleep_node = {
-                "ts": now_ts,
-                "sleep_stat": self.update_sleep_stat_
-            }
-            self.sleep_segments_.append(sleep_node)
+            ## 4. 记录阶段变化
+            if (not self.sleep_segments_) or (self.update_sleep_stat_ != self.sleep_segments_[-1]["sleep_stat"]):
+                sleep_node = {
+                    "ts": now_ts,
+                    "sleep_stat": self.update_sleep_stat_
+                    }
+                self.sleep_segments_.append(sleep_node)
+                LOGINFO(f"update sleep_stat, current: {self.update_sleep_stat_}")
 
             self.sleep_stat_ = self.update_sleep_stat_
 
+            ## 5. 调试输出(可选)
+            LOGINFO(f"[SleepMonitoring] {dev_id} motion={self.motion_stat_}, breath={self.avg_breath_:.1f}, sleep={self.sleep_stat_}")
 
-            # 1. 分析空间状态
-            # 2. 分析呼吸率
-            # 3. 分析睡眠状态
             return
 
         except json.JSONDecodeError as e:

+ 174 - 0
demos/SleepReplayAnalyzer.py

@@ -0,0 +1,174 @@
+import json
+from collections import deque
+import os
+import sys
+from datetime import datetime
+
+# 将 LAS 根目录加入 sys.path
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+import core.alarm_plan_helper as helper
+from common.sys_comm import (
+    LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
+    get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
+    utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
+)
+from core.alarm_plan import EventAttr_SleepMonitoring
+
+
+class SleepReplayAnalyzer:
+    def __init__(self, file_path, bed_rect_ltwh):
+        """
+        file_path: 录像文件路径
+        bed_rect_ltwh: 床区域 [left, top, width, height]
+        """
+        self.file_path = file_path
+        self.bed_rect_ltwh = bed_rect_ltwh
+
+        # 窗口和状态
+        self.motion_buffer = deque()  # 每条: (ts, motion_value)
+        self.sleep_segments_ = []
+
+        self.last_target_point = None
+        self.last_target_ts = None
+        self.motion_stat_ = "leave"
+        self.sleep_stat_ = "leave"
+
+    # 判断目标点是否在床区域
+    def _is_point_in_bed(self, target_point):
+        x, y, *_ = target_point
+        l, t, w, h = self.bed_rect_ltwh
+        return l <= x <= l + w and t <= y <= t + h
+
+    # 根据窗口计算平滑运动状态
+    def _calc_motion_state_window(self, current_ts, threshold_peaceful=5, threshold_micro=10):
+        try:
+            # 清理超过60秒的数据
+            while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60:
+                self.motion_buffer.popleft()
+            # 平滑运动
+            if self.motion_buffer:
+                smooth_motion = sum(v for _, v in self.motion_buffer) / len(self.motion_buffer)
+            else:
+                smooth_motion = 0
+            # 运动状态
+            if smooth_motion < threshold_peaceful:
+                motion = "peaceful"
+            elif smooth_motion < threshold_micro:
+                motion = "micro"
+            else:
+                motion = "active"
+            return motion, smooth_motion
+        except Exception as e:
+            LOGERR(f"_calc_motion_state_window error: error: {e}")
+
+    def analyze(self):
+        if not os.path.exists(self.file_path):
+            LOGERR(f"录像文件不存在: {self.file_path}")
+            return
+
+        last_saved_minute = None
+
+        with open(self.file_path, "r", encoding="utf-8") as f:
+            lines = f.readlines()
+
+        for line in lines:
+            line = line.strip()
+            if not line:
+                continue
+            try:
+                entry = json.loads(line)
+                ts = entry["ts"] / 1000.0  # 转秒
+                payload = entry["payload"]
+                targets = payload.get("tracker_targets", [])
+                breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
+
+                # --- 运动状态分析 ---
+                motion = self.motion_stat_
+                if targets:
+                    if not self._is_point_in_bed(targets[0]):
+                        motion_value = 0
+                        smooth_motion = 0
+                        motion = "leave"
+                        LOGDBG(f"motion: {motion}, with target and not in bed")
+                    else:
+                        motion_value = 0
+                        if self.last_target_point is not None:
+                            x1, y1, z1, _ = self.last_target_point
+                            x2, y2, z2, _ = targets[0]
+                            motion_value = ((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) ** 0.5
+                        self.motion_buffer.append((ts, motion_value))
+                        motion, smooth_motion = self._calc_motion_state_window(ts)
+                    self.last_target_point = targets[0]
+                    self.last_target_ts = ts
+                else:
+                    smooth_motion = 0
+                    # 无目标时,根据最后目标位置判断
+                    if self.last_target_point and self._is_point_in_bed(self.last_target_point):
+                        # 目标消失但仍在床上,认为静止
+                        motion = "peaceful"
+                    else:
+                        # 目标消失且不在床上,认为离床
+                        motion = "leave"
+                        LOGDBG(f"motion: {motion}, without target and not in bed")
+
+                    # 为保持平滑趋势,补入0
+                    self.motion_buffer.append((ts, 0))
+
+                self.motion_stat_ = motion
+
+                # --- 睡眠状态分析 ---
+                if breath_rpm < 8: breathe_stat = "r0"
+                elif breath_rpm < 12: breathe_stat = "r1"
+                elif breath_rpm < 18: breathe_stat = "r2"
+                elif breath_rpm < 25: breathe_stat = "r3"
+                else: breathe_stat = "r4"
+
+                i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
+                i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
+                sleep_stat = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
+
+                # --- 按分钟保存 sleep_segment ---
+                dt = datetime.fromtimestamp(ts)
+                current_minute = dt.replace(second=0, microsecond=0)
+                if last_saved_minute != current_minute:
+                    sleep_segment = {
+                        "ts": ts,
+                        "sleep_stat": sleep_stat,
+                        "smooth_motion": round(smooth_motion, 3),
+                        "breath_rpm": breath_rpm,
+                        "breathe_stat": breathe_stat
+                    }
+                    self.sleep_segments_.append(sleep_segment)
+                    last_saved_minute = current_minute
+
+            except Exception as e:
+                LOGERR(f"解析行失败: {line}, error: {e}")
+
+    def export_report(self, out_file=None):
+        if not out_file:
+            out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json"
+        try:
+            with open(out_file, "w", encoding="utf-8") as f:
+                json.dump(self.sleep_segments_, f, ensure_ascii=False, indent=2)
+            LOGINFO(f"睡眠报告已生成: {out_file}")
+            print(f"睡眠报告已生成: {out_file}")
+        except Exception as e:
+            LOGERR(f"保存睡眠报告失败: {e}")
+            print(f"保存睡眠报告失败: {e}")
+
+
+# ---------------- 使用示例 ----------------
+if __name__ == "__main__":
+    if 0:
+        dev_id = "94A9900B0B38"
+        bed_rect = [0, 115, 200, 115]
+    else:
+        dev_id = "94A9900B0B80"
+        bed_rect = [-10, 130, 120, 210]
+    file_path = f"demos/record/{dev_id}.raw.json"
+    analyzer = SleepReplayAnalyzer(file_path, bed_rect)
+    analyzer.analyze()
+
+    ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
+    analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json")

+ 126 - 0
demos/sleep_monitor.py

@@ -0,0 +1,126 @@
+import json
+from collections import deque
+import os
+import sys
+
+# 将 LAS 根目录加入 sys.path
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+import core.alarm_plan_helper as helper
+from common.sys_comm import (
+    LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
+    get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
+    utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
+)
+import device.dev_mng as g_Dev
+from device.dev_mng import (
+    Device, g_dev_mgr, init_dev_mng
+)
+from core.alarm_plan import AlarmPlan
+from core.time_plan import TimePlan
+from core.event_type import EventType
+from core.linkage_action import LinkageAction
+
+# 假设你已经有 AlarmPlan 方法绑定在 AlarmPlan 实例上
+# 这里我们写一个回放测试程序
+class SleepMonitorTester:
+    def __init__(self, file_path, dev_id, rect):
+        self.file_path = file_path
+        init_dev_mng()
+        self.dev_id = dev_id
+        self.device = Device(dev_id)
+        g_Dev.g_dev_mgr.start()
+        g_Dev.g_dev_mgr.push_dev_map(dev_id, self.device)
+
+        # 初始化算法实例
+        linkage_action  = LinkageAction()
+        cron = {
+            "hour": 1,
+            "minute": 0
+        }
+        time_plan = TimePlan(
+            time_range  = [{"start_time": "00:00","end_time": "23:59"}],
+            start_date  = '2025-09-01',
+            stop_date   = '2099-12-31',
+            weekdays    = [1,2,3,4,5,6,7],
+            month_days  = []
+        )
+        self.sleep_monitor = AlarmPlan(
+            plan_uuid   = "plan_uuid",
+            name        = "plan_name",
+            dev_id      = 'LAS',
+            dev_name    = '告警联动服务',
+            enable      = 1,
+            time_plan   = time_plan,
+            rect        = [],
+            event_type  = EventType.SLEEP_MONITORING.value,
+            threshold_time  = 300,
+            merge_time  = 30,
+            param       = {},
+            cron        = cron,
+            linkage_action=linkage_action,
+            tenant_id   = 0
+        )
+        self.sleep_monitor.dev_id_ = dev_id
+        # 设置监测区域
+        self.sleep_monitor.rect_ = rect
+
+    def run(self):
+        with open(self.file_path, "r", encoding="utf-8") as f:
+            lines = f.readlines()
+
+        for line in lines:
+            line = line.strip()
+            if not line:
+                continue
+            try:
+                entry = json.loads(line)
+                payload = entry["payload"]
+                ts = entry["ts"]
+
+                # 构造 rtd_unit
+                tracker_targets = payload.get("tracker_targets", [[0, 0, 0, 0]])
+                breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
+                pose = 4  # 固定值,参考原逻辑
+                rtd_unit = {
+                    "timestamp": ts / 1000.0,   # 转成秒
+                    "target_point": tracker_targets[0],
+                    "breath_rpm": breath_rpm,
+                    "pose": pose
+                }
+
+                # 写入队列
+                self.device.put_rtd_unit(rtd_unit)
+                self.device.update_keepalive(rtd_unit["timestamp"])
+
+                # 调用睡眠监测算法
+                self.sleep_monitor.handle_sleep_monitoring()
+
+            except json.JSONDecodeError:
+                print(f"跳过非法行: {line}")
+            except Exception as e:
+                print(f"处理记录失败: {e}")
+
+        # 回放结束,打印睡眠状态序列
+        print("回放结束,睡眠状态序列:")
+        for seg in self.sleep_monitor.sleep_segments_:
+            print(f"ts={seg['ts']:.3f}, sleep_stat={seg['sleep_stat']}")
+            
+        # 保存 JSON 报告
+        report_file = os.path.splitext(os.path.basename(self.file_path))[0] + "_sleep_report.json"
+        report_path = os.path.join("demos/record", report_file)  # 可根据需要修改目录
+        try:
+            with open(report_path, "w", encoding="utf-8") as f:
+                json.dump(self.sleep_monitor.sleep_segments_, f, ensure_ascii=False, indent=2)
+            print(f"睡眠报告已生成: {report_path}")
+        except Exception as e:
+            print(f"生成睡眠报告失败: {e}")
+
+
+# 使用示例
+if __name__ == "__main__":
+    file_path = "demos/record/94A9900B0B38.raw.json"
+    dev_id = "94A9900B0B80"
+    rect = [0, 115, 200, 115]
+    tester = SleepMonitorTester(file_path, dev_id, rect)
+    tester.run()

+ 2 - 1
mqtt/mqtt_recv.py

@@ -24,7 +24,7 @@ import device.dev_mng as g_Dev
 from device.dev_mng import (
     Device,
 )
-from core.alarm_plan import AlarmPlan
+
 
 import core.g_LAS as g_las
 
@@ -221,6 +221,7 @@ def deal_las_test(msg:mqtt.MQTTMessage):
 
 # 获取调试信息
 def deal_get_debug_info(msg:mqtt.MQTTMessage):
+    from core.alarm_plan import AlarmPlan
     try:
         plans_info: list = []
         plan: AlarmPlan = None