Parcourir la source

优化睡眠监测算法demo

nifangxu il y a 1 semaine
Parent
commit
a430ffc27c
3 fichiers modifiés avec 97 ajouts et 35 suppressions
  1. 3 2
      core/alarm_plan.py
  2. 91 31
      demos/SleepReplayAnalyzer.py
  3. 3 2
      device/dev_mng.py

+ 3 - 2
core/alarm_plan.py

@@ -166,11 +166,12 @@ class EventAttr_TargetAbsence(EventAttr_Base):
 class EventAttr_SleepMonitoring(EventAttr_Base):
     motion_stat = ["peaceful", "micro", "active", "leave"]  # 空间状态
     # 呼吸率划分:r0:[0-8) r1:[8,12) r2:[12,18) r3:[18,25] r4:25以上
-    breathe_stat = ["r0", "r1", "r2", "r3", "r4"]   # 呼吸状态:呼吸过慢/异常,深睡,浅睡,REM/清醒,呼吸过快/清醒
+    breathe_stat = ["r0", "r1", "r2", "r3", "r4"]   # 呼吸状态
+    # 睡眠状态矩阵
     # 深睡(deep),浅睡(light),REM,清醒(awake),离床(leave)
     sleep_stat = [
         ["deep", "deep", "light", "REM", "awake"],      # peaceful
-        ["deep", "light", "light", "REM", "awake"],     # micro
+        ["deep", "light", "REM", "REM", "awake"],     # micro
         ["awake", "awake", "awake", "awake", "awake"],  # active
         ["leave", "leave", "leave", "leave", "leave"]]  # leave
     # 异常:呼吸过慢(slow_breathe),呼吸过快(fast_breathe)

+ 91 - 31
demos/SleepReplayAnalyzer.py

@@ -1,5 +1,5 @@
 import json
-from collections import deque
+from collections import deque, Counter
 import os
 import sys
 from datetime import datetime
@@ -18,15 +18,12 @@ from core.alarm_plan import EventAttr_SleepMonitoring
 
 class SleepReplayAnalyzer:
     def __init__(self, file_path, bed_rect_ltwh):
-        """
-        file_path: 录像文件路径
-        bed_rect_ltwh: 床区域 [left, top, width, height]
-        """
         self.file_path = file_path
         self.bed_rect_ltwh = bed_rect_ltwh
 
         # 窗口和状态
         self.motion_buffer = deque()  # 每条: (ts, motion_value)
+        self.sleep_segments_raw_ = []
         self.sleep_segments_ = []
 
         self.last_target_point = None
@@ -38,10 +35,21 @@ class SleepReplayAnalyzer:
     def _is_point_in_bed(self, target_point):
         x, y, *_ = target_point
         l, t, w, h = self.bed_rect_ltwh
-        return l <= x <= l + w and t <= y <= t + h
+        return l <= x <= l + w and (t - h) <= y <= t
+
+    # 判断目标点是否在床区域
+    def _is_point_in_bed_ex(self, target_point, expand=10):
+        x, y, *_ = target_point
+        l, t, w, h = self.bed_rect_ltwh
+        l_expanded = l - expand
+        r_expanded = l + w + expand
+        top_expanded = t + expand 
+        bottom_expanded = t - h - expand
+        return l_expanded <= x <= r_expanded and bottom_expanded <= y <= top_expanded
+
 
     # 根据窗口计算平滑运动状态
-    def _calc_motion_state_window(self, current_ts, threshold_peaceful=5, threshold_micro=10):
+    def _calc_motion_state_window(self, current_ts, threshold_peaceful=2, threshold_micro=4):
         try:
             # 清理超过60秒的数据
             while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60:
@@ -62,11 +70,53 @@ class SleepReplayAnalyzer:
         except Exception as e:
             LOGERR(f"_calc_motion_state_window error: error: {e}")
 
+
+    # 🔹 新版本:根据时间窗口(10分钟)分析最近数据
+    def _analyze_recent_window(self, ts, smooth_motion, breath_rpm, breathe_stat):
+        try:
+            if not self.sleep_segments_raw_:
+                return None
+
+            # 保留最近10分钟(600秒)的数据
+            time_window = 600  # 秒
+            start_ts = ts - time_window
+            recent_data = [d for d in self.sleep_segments_raw_ if d["ts"] >= start_ts]
+
+            if not recent_data:
+                return None
+
+            stats = [d["sleep_stat"] for d in recent_data]
+            counter = Counter(stats)
+            total = len(stats)
+            major_stat, count = counter.most_common(1)[0]
+
+            # 若占比超过50%,用当前主要状态,否则保持上次状态
+            if count / total >= 0.5:
+                final_sleep_stat = major_stat
+            else:
+                final_sleep_stat = self.sleep_stat_
+
+            sleep_segment = {
+                "ts": ts,
+                "sleep_stat": final_sleep_stat,
+                "smooth_motion": round(smooth_motion, 3),
+                "breath_rpm": breath_rpm,
+                "breathe_stat": breathe_stat
+            }
+            return sleep_segment
+
+        except Exception as e:
+            LOGERR(f"_analyze_recent_window error: {e}")
+            return None
+
+
+    # 主分析逻辑
     def analyze(self):
         if not os.path.exists(self.file_path):
             LOGERR(f"录像文件不存在: {self.file_path}")
             return
 
+        last_analyze_ts = None  # 上次分析时间
         last_saved_minute = None
 
         with open(self.file_path, "r", encoding="utf-8") as f:
@@ -83,41 +133,34 @@ class SleepReplayAnalyzer:
                 targets = payload.get("tracker_targets", [])
                 breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
 
-                # --- 运动状态分析 ---
+                # --- 运动状态 ---
                 motion = self.motion_stat_
                 if targets:
                     if not self._is_point_in_bed(targets[0]):
                         motion_value = 0
                         smooth_motion = 0
                         motion = "leave"
-                        LOGDBG(f"motion: {motion}, with target and not in bed")
                     else:
                         motion_value = 0
                         if self.last_target_point is not None:
                             x1, y1, z1, _ = self.last_target_point
                             x2, y2, z2, _ = targets[0]
-                            motion_value = ((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) ** 0.5
+                            motion_value = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5
                         self.motion_buffer.append((ts, motion_value))
                         motion, smooth_motion = self._calc_motion_state_window(ts)
                     self.last_target_point = targets[0]
                     self.last_target_ts = ts
                 else:
                     smooth_motion = 0
-                    # 无目标时,根据最后目标位置判断
                     if self.last_target_point and self._is_point_in_bed(self.last_target_point):
-                        # 目标消失但仍在床上,认为静止
                         motion = "peaceful"
                     else:
-                        # 目标消失且不在床上,认为离床
                         motion = "leave"
-                        LOGDBG(f"motion: {motion}, without target and not in bed")
-
-                    # 为保持平滑趋势,补入0
                     self.motion_buffer.append((ts, 0))
 
                 self.motion_stat_ = motion
 
-                # --- 睡眠状态分析 ---
+                # --- 呼吸状态 ---
                 if breath_rpm < 8: breathe_stat = "r0"
                 elif breath_rpm < 12: breathe_stat = "r1"
                 elif breath_rpm < 18: breathe_stat = "r2"
@@ -126,25 +169,36 @@ class SleepReplayAnalyzer:
 
                 i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
                 i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
-                sleep_stat = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
-
-                # --- 按分钟保存 sleep_segment ---
+                sleep_stat_raw = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
+
+                # --- 保存原始实时片段 ---
+                self.sleep_segments_raw_.append({
+                    "ts": ts,
+                    "sleep_stat": sleep_stat_raw,
+                    "smooth_motion": round(smooth_motion, 3),
+                    "breath_rpm": breath_rpm,
+                    "breathe_stat": breathe_stat
+                })
+
+                # --- 每隔30秒分析一次 ---
+                if (last_analyze_ts is None) or (ts - last_analyze_ts >= 30):
+                    analyzed_segment = self._analyze_recent_window(ts, smooth_motion, breath_rpm, breathe_stat)
+                    if analyzed_segment:
+                        self.sleep_segments_.append(analyzed_segment)
+                        self.sleep_stat_ = analyzed_segment["sleep_stat"]
+                    last_analyze_ts = ts  # 更新时间戳
+
+                # --- 可选:按分钟对齐输出日志(非必要)
                 dt = datetime.fromtimestamp(ts)
                 current_minute = dt.replace(second=0, microsecond=0)
                 if last_saved_minute != current_minute:
-                    sleep_segment = {
-                        "ts": ts,
-                        "sleep_stat": sleep_stat,
-                        "smooth_motion": round(smooth_motion, 3),
-                        "breath_rpm": breath_rpm,
-                        "breathe_stat": breathe_stat
-                    }
-                    self.sleep_segments_.append(sleep_segment)
+                    LOGDBG(f"[{current_minute.strftime('%H:%M')}] 当前状态: {self.sleep_stat_}")
                     last_saved_minute = current_minute
 
             except Exception as e:
                 LOGERR(f"解析行失败: {line}, error: {e}")
 
+
     def export_report(self, out_file=None):
         if not out_file:
             out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json"
@@ -160,15 +214,21 @@ class SleepReplayAnalyzer:
 
 # ---------------- 使用示例 ----------------
 if __name__ == "__main__":
-    if 0:
+    start_ts = get_utc_time_s()
+    if 1:
         dev_id = "94A9900B0B38"
-        bed_rect = [0, 115, 200, 115]
+        bed_rect = [0, 250, 200, 115]
     else:
         dev_id = "94A9900B0B80"
-        bed_rect = [-10, 130, 120, 210]
+        bed_rect = [80, 130, 120, 210]
     file_path = f"demos/record/{dev_id}.raw.json"
+    # file_path = f"demos/record/2025-10-13_21-03-13_94A9900B0B80.raw.json"
     analyzer = SleepReplayAnalyzer(file_path, bed_rect)
     analyzer.analyze()
 
     ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
     analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json")
+    end_ts = get_utc_time_s()
+    total_time = end_ts - start_ts
+    print(f"    耗时:{total_time}秒")
+    LOGINFO(f"    耗时:{total_time}秒")

+ 3 - 2
device/dev_mng.py

@@ -251,8 +251,9 @@ class DeviceManager():
                     y2 = row["stop_y"]
                     z2 = row["stop_z"]
                     mount_plain = row["mount_plain"]
-                    height = row["height"]
-                    install_param = InstallParam(mount_plain, round(height,3), TrackingRegion(x1, y1, z1, x2, y2, z2))
+                    height = row.get("height", 0.0)
+                    height = round(height, 3) if isinstance(height, (int, float)) else 0.0
+                    install_param = InstallParam(mount_plain, height, TrackingRegion(x1, y1, z1, x2, y2, z2))
                     dev_instance = Device(
                         dev_id=row["client_id"],
                         dev_name=row["dev_name"],