SleepReplayAnalyzer.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. import json
  2. from collections import deque, Counter
  3. import os
  4. import sys
  5. from datetime import datetime
  6. # 将 LAS 根目录加入 sys.path
  7. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
  8. import core.alarm_plan_helper as helper
  9. from common.sys_comm import (
  10. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  11. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  12. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  13. )
  14. from core.alarm_plan import EventAttr_SleepMonitoring
  15. class SleepReplayAnalyzer:
  16. def __init__(self, file_path, bed_rect_ltwh):
  17. self.file_path = file_path
  18. self.bed_rect_ltwh = bed_rect_ltwh
  19. # 窗口和状态
  20. self.motion_buffer = deque() # 每条: (ts, motion_value)
  21. self.sleep_segments_raw_ = []
  22. self.sleep_segments_ = []
  23. self.last_target_point = None
  24. self.last_target_ts = None
  25. self.motion_stat_ = "leave"
  26. self.sleep_stat_ = "leave"
  27. # 判断目标点是否在床区域
  28. def _is_point_in_bed(self, target_point):
  29. x, y, *_ = target_point
  30. l, t, w, h = self.bed_rect_ltwh
  31. return l <= x <= l + w and (t - h) <= y <= t
  32. # 判断目标点是否在床区域
  33. def _is_point_in_bed_ex(self, target_point, expand=10):
  34. x, y, *_ = target_point
  35. l, t, w, h = self.bed_rect_ltwh
  36. l_expanded = l - expand
  37. r_expanded = l + w + expand
  38. top_expanded = t + expand
  39. bottom_expanded = t - h - expand
  40. return l_expanded <= x <= r_expanded and bottom_expanded <= y <= top_expanded
  41. # 根据窗口计算平滑运动状态
  42. def _calc_motion_state_window(self, current_ts, threshold_peaceful=2, threshold_micro=4):
  43. try:
  44. # 清理超过60秒的数据
  45. while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60:
  46. self.motion_buffer.popleft()
  47. # 平滑运动
  48. if self.motion_buffer:
  49. smooth_motion = sum(v for _, v in self.motion_buffer) / len(self.motion_buffer)
  50. else:
  51. smooth_motion = 0
  52. # 运动状态
  53. if smooth_motion < threshold_peaceful:
  54. motion = "peaceful"
  55. elif smooth_motion < threshold_micro:
  56. motion = "micro"
  57. else:
  58. motion = "active"
  59. return motion, smooth_motion
  60. except Exception as e:
  61. LOGERR(f"_calc_motion_state_window error: error: {e}")
  62. # 🔹 新版本:根据时间窗口(10分钟)分析最近数据
  63. def _analyze_recent_window(self, ts, smooth_motion, breath_rpm, breathe_stat):
  64. try:
  65. if not self.sleep_segments_raw_:
  66. return None
  67. # 保留最近10分钟(600秒)的数据
  68. time_window = 600 # 秒
  69. start_ts = ts - time_window
  70. recent_data = [d for d in self.sleep_segments_raw_ if d["ts"] >= start_ts]
  71. if not recent_data:
  72. return None
  73. stats = [d["sleep_stat"] for d in recent_data]
  74. counter = Counter(stats)
  75. total = len(stats)
  76. major_stat, count = counter.most_common(1)[0]
  77. # 若占比超过50%,用当前主要状态,否则保持上次状态
  78. if count / total >= 0.5:
  79. final_sleep_stat = major_stat
  80. else:
  81. final_sleep_stat = self.sleep_stat_
  82. sleep_segment = {
  83. "ts": ts,
  84. "sleep_stat": final_sleep_stat,
  85. "smooth_motion": round(smooth_motion, 3),
  86. "breath_rpm": breath_rpm,
  87. "breathe_stat": breathe_stat
  88. }
  89. return sleep_segment
  90. except Exception as e:
  91. LOGERR(f"_analyze_recent_window error: {e}")
  92. return None
  93. # 主分析逻辑
  94. def analyze(self):
  95. if not os.path.exists(self.file_path):
  96. LOGERR(f"录像文件不存在: {self.file_path}")
  97. return
  98. last_analyze_ts = None # 上次分析时间
  99. last_saved_minute = None
  100. with open(self.file_path, "r", encoding="utf-8") as f:
  101. lines = f.readlines()
  102. for line in lines:
  103. line = line.strip()
  104. if not line:
  105. continue
  106. try:
  107. entry = json.loads(line)
  108. ts = entry["ts"] / 1000.0 # 转秒
  109. payload = entry["payload"]
  110. targets = payload.get("tracker_targets", [])
  111. breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
  112. # --- 运动状态 ---
  113. motion = self.motion_stat_
  114. if targets:
  115. if not self._is_point_in_bed(targets[0]):
  116. motion_value = 0
  117. smooth_motion = 0
  118. motion = "leave"
  119. else:
  120. motion_value = 0
  121. if self.last_target_point is not None:
  122. x1, y1, z1, _ = self.last_target_point
  123. x2, y2, z2, _ = targets[0]
  124. motion_value = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5
  125. self.motion_buffer.append((ts, motion_value))
  126. motion, smooth_motion = self._calc_motion_state_window(ts)
  127. self.last_target_point = targets[0]
  128. self.last_target_ts = ts
  129. else:
  130. smooth_motion = 0
  131. if self.last_target_point and self._is_point_in_bed(self.last_target_point):
  132. motion = "peaceful"
  133. else:
  134. motion = "leave"
  135. self.motion_buffer.append((ts, 0))
  136. self.motion_stat_ = motion
  137. # --- 呼吸状态 ---
  138. if breath_rpm < 8: breathe_stat = "r0"
  139. elif breath_rpm < 12: breathe_stat = "r1"
  140. elif breath_rpm < 18: breathe_stat = "r2"
  141. elif breath_rpm < 25: breathe_stat = "r3"
  142. else: breathe_stat = "r4"
  143. i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
  144. i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
  145. sleep_stat_raw = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
  146. # --- 保存原始实时片段 ---
  147. self.sleep_segments_raw_.append({
  148. "ts": ts,
  149. "sleep_stat": sleep_stat_raw,
  150. "smooth_motion": round(smooth_motion, 3),
  151. "breath_rpm": breath_rpm,
  152. "breathe_stat": breathe_stat
  153. })
  154. # --- 每隔30秒分析一次 ---
  155. if (last_analyze_ts is None) or (ts - last_analyze_ts >= 30):
  156. analyzed_segment = self._analyze_recent_window(ts, smooth_motion, breath_rpm, breathe_stat)
  157. if analyzed_segment:
  158. self.sleep_segments_.append(analyzed_segment)
  159. self.sleep_stat_ = analyzed_segment["sleep_stat"]
  160. last_analyze_ts = ts # 更新时间戳
  161. # --- 可选:按分钟对齐输出日志(非必要)
  162. dt = datetime.fromtimestamp(ts)
  163. current_minute = dt.replace(second=0, microsecond=0)
  164. if last_saved_minute != current_minute:
  165. LOGDBG(f"[{current_minute.strftime('%H:%M')}] 当前状态: {self.sleep_stat_}")
  166. last_saved_minute = current_minute
  167. except Exception as e:
  168. LOGERR(f"解析行失败: {line}, error: {e}")
  169. def export_report(self, out_file=None):
  170. if not out_file:
  171. out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json"
  172. try:
  173. with open(out_file, "w", encoding="utf-8") as f:
  174. json.dump(self.sleep_segments_, f, ensure_ascii=False, indent=2)
  175. LOGINFO(f"睡眠报告已生成: {out_file}")
  176. print(f"睡眠报告已生成: {out_file}")
  177. except Exception as e:
  178. LOGERR(f"保存睡眠报告失败: {e}")
  179. print(f"保存睡眠报告失败: {e}")
  180. # ---------------- 使用示例 ----------------
  181. if __name__ == "__main__":
  182. start_ts = get_utc_time_s()
  183. if 1:
  184. dev_id = "94A9900B0B38"
  185. bed_rect = [0, 250, 200, 115]
  186. else:
  187. dev_id = "94A9900B0B80"
  188. bed_rect = [80, 130, 120, 210]
  189. file_path = f"demos/record/{dev_id}.raw.json"
  190. # file_path = f"demos/record/2025-10-13_21-03-13_94A9900B0B80.raw.json"
  191. analyzer = SleepReplayAnalyzer(file_path, bed_rect)
  192. analyzer.analyze()
  193. ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
  194. analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json")
  195. end_ts = get_utc_time_s()
  196. total_time = end_ts - start_ts
  197. print(f" 耗时:{total_time}秒")
  198. LOGINFO(f" 耗时:{total_time}秒")