SleepReplayAnalyzer.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import json
  2. from collections import deque
  3. import os
  4. import sys
  5. from datetime import datetime
  6. # 将 LAS 根目录加入 sys.path
  7. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
  8. import core.alarm_plan_helper as helper
  9. from common.sys_comm import (
  10. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  11. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  12. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  13. )
  14. from core.alarm_plan import EventAttr_SleepMonitoring
  15. class SleepReplayAnalyzer:
  16. def __init__(self, file_path, bed_rect_ltwh):
  17. """
  18. file_path: 录像文件路径
  19. bed_rect_ltwh: 床区域 [left, top, width, height]
  20. """
  21. self.file_path = file_path
  22. self.bed_rect_ltwh = bed_rect_ltwh
  23. # 窗口和状态
  24. self.motion_buffer = deque() # 每条: (ts, motion_value)
  25. self.sleep_segments_ = []
  26. self.last_target_point = None
  27. self.last_target_ts = None
  28. self.motion_stat_ = "leave"
  29. self.sleep_stat_ = "leave"
  30. # 判断目标点是否在床区域
  31. def _is_point_in_bed(self, target_point):
  32. x, y, *_ = target_point
  33. l, t, w, h = self.bed_rect_ltwh
  34. return l <= x <= l + w and t <= y <= t + h
  35. # 根据窗口计算平滑运动状态
  36. def _calc_motion_state_window(self, current_ts, threshold_peaceful=5, threshold_micro=10):
  37. try:
  38. # 清理超过60秒的数据
  39. while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60:
  40. self.motion_buffer.popleft()
  41. # 平滑运动
  42. if self.motion_buffer:
  43. smooth_motion = sum(v for _, v in self.motion_buffer) / len(self.motion_buffer)
  44. else:
  45. smooth_motion = 0
  46. # 运动状态
  47. if smooth_motion < threshold_peaceful:
  48. motion = "peaceful"
  49. elif smooth_motion < threshold_micro:
  50. motion = "micro"
  51. else:
  52. motion = "active"
  53. return motion, smooth_motion
  54. except Exception as e:
  55. LOGERR(f"_calc_motion_state_window error: error: {e}")
  56. def analyze(self):
  57. if not os.path.exists(self.file_path):
  58. LOGERR(f"录像文件不存在: {self.file_path}")
  59. return
  60. last_saved_minute = None
  61. with open(self.file_path, "r", encoding="utf-8") as f:
  62. lines = f.readlines()
  63. for line in lines:
  64. line = line.strip()
  65. if not line:
  66. continue
  67. try:
  68. entry = json.loads(line)
  69. ts = entry["ts"] / 1000.0 # 转秒
  70. payload = entry["payload"]
  71. targets = payload.get("tracker_targets", [])
  72. breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
  73. # --- 运动状态分析 ---
  74. motion = self.motion_stat_
  75. if targets:
  76. if not self._is_point_in_bed(targets[0]):
  77. motion_value = 0
  78. smooth_motion = 0
  79. motion = "leave"
  80. LOGDBG(f"motion: {motion}, with target and not in bed")
  81. else:
  82. motion_value = 0
  83. if self.last_target_point is not None:
  84. x1, y1, z1, _ = self.last_target_point
  85. x2, y2, z2, _ = targets[0]
  86. motion_value = ((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) ** 0.5
  87. self.motion_buffer.append((ts, motion_value))
  88. motion, smooth_motion = self._calc_motion_state_window(ts)
  89. self.last_target_point = targets[0]
  90. self.last_target_ts = ts
  91. else:
  92. smooth_motion = 0
  93. # 无目标时,根据最后目标位置判断
  94. if self.last_target_point and self._is_point_in_bed(self.last_target_point):
  95. # 目标消失但仍在床上,认为静止
  96. motion = "peaceful"
  97. else:
  98. # 目标消失且不在床上,认为离床
  99. motion = "leave"
  100. LOGDBG(f"motion: {motion}, without target and not in bed")
  101. # 为保持平滑趋势,补入0
  102. self.motion_buffer.append((ts, 0))
  103. self.motion_stat_ = motion
  104. # --- 睡眠状态分析 ---
  105. if breath_rpm < 8: breathe_stat = "r0"
  106. elif breath_rpm < 12: breathe_stat = "r1"
  107. elif breath_rpm < 18: breathe_stat = "r2"
  108. elif breath_rpm < 25: breathe_stat = "r3"
  109. else: breathe_stat = "r4"
  110. i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
  111. i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
  112. sleep_stat = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
  113. # --- 按分钟保存 sleep_segment ---
  114. dt = datetime.fromtimestamp(ts)
  115. current_minute = dt.replace(second=0, microsecond=0)
  116. if last_saved_minute != current_minute:
  117. sleep_segment = {
  118. "ts": ts,
  119. "sleep_stat": sleep_stat,
  120. "smooth_motion": round(smooth_motion, 3),
  121. "breath_rpm": breath_rpm,
  122. "breathe_stat": breathe_stat
  123. }
  124. self.sleep_segments_.append(sleep_segment)
  125. last_saved_minute = current_minute
  126. except Exception as e:
  127. LOGERR(f"解析行失败: {line}, error: {e}")
  128. def export_report(self, out_file=None):
  129. if not out_file:
  130. out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json"
  131. try:
  132. with open(out_file, "w", encoding="utf-8") as f:
  133. json.dump(self.sleep_segments_, f, ensure_ascii=False, indent=2)
  134. LOGINFO(f"睡眠报告已生成: {out_file}")
  135. print(f"睡眠报告已生成: {out_file}")
  136. except Exception as e:
  137. LOGERR(f"保存睡眠报告失败: {e}")
  138. print(f"保存睡眠报告失败: {e}")
  139. # ---------------- 使用示例 ----------------
  140. if __name__ == "__main__":
  141. if 0:
  142. dev_id = "94A9900B0B38"
  143. bed_rect = [0, 115, 200, 115]
  144. else:
  145. dev_id = "94A9900B0B80"
  146. bed_rect = [-10, 130, 120, 210]
  147. file_path = f"demos/record/{dev_id}.raw.json"
  148. analyzer = SleepReplayAnalyzer(file_path, bed_rect)
  149. analyzer.analyze()
  150. ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
  151. analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json")