import json from collections import deque import os import sys from datetime import datetime # 将 LAS 根目录加入 sys.path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import core.alarm_plan_helper as helper from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, EC, get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s ) from core.alarm_plan import EventAttr_SleepMonitoring class SleepReplayAnalyzer: def __init__(self, file_path, bed_rect_ltwh): """ file_path: 录像文件路径 bed_rect_ltwh: 床区域 [left, top, width, height] """ self.file_path = file_path self.bed_rect_ltwh = bed_rect_ltwh # 窗口和状态 self.motion_buffer = deque() # 每条: (ts, motion_value) self.sleep_segments_ = [] self.last_target_point = None self.last_target_ts = None self.motion_stat_ = "leave" self.sleep_stat_ = "leave" # 判断目标点是否在床区域 def _is_point_in_bed(self, target_point): x, y, *_ = target_point l, t, w, h = self.bed_rect_ltwh return l <= x <= l + w and t <= y <= t + h # 根据窗口计算平滑运动状态 def _calc_motion_state_window(self, current_ts, threshold_peaceful=5, threshold_micro=10): try: # 清理超过60秒的数据 while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60: self.motion_buffer.popleft() # 平滑运动 if self.motion_buffer: smooth_motion = sum(v for _, v in self.motion_buffer) / len(self.motion_buffer) else: smooth_motion = 0 # 运动状态 if smooth_motion < threshold_peaceful: motion = "peaceful" elif smooth_motion < threshold_micro: motion = "micro" else: motion = "active" return motion, smooth_motion except Exception as e: LOGERR(f"_calc_motion_state_window error: error: {e}") def analyze(self): if not os.path.exists(self.file_path): LOGERR(f"录像文件不存在: {self.file_path}") return last_saved_minute = None with open(self.file_path, "r", encoding="utf-8") as f: lines = f.readlines() for line in lines: line = line.strip() if not line: continue try: entry = json.loads(line) ts = entry["ts"] / 1000.0 # 转秒 payload = entry["payload"] targets = payload.get("tracker_targets", []) breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0) # --- 运动状态分析 --- motion = self.motion_stat_ if targets: if not self._is_point_in_bed(targets[0]): motion_value = 0 smooth_motion = 0 motion = "leave" LOGDBG(f"motion: {motion}, with target and not in bed") else: motion_value = 0 if self.last_target_point is not None: x1, y1, z1, _ = self.last_target_point x2, y2, z2, _ = targets[0] motion_value = ((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) ** 0.5 self.motion_buffer.append((ts, motion_value)) motion, smooth_motion = self._calc_motion_state_window(ts) self.last_target_point = targets[0] self.last_target_ts = ts else: smooth_motion = 0 # 无目标时,根据最后目标位置判断 if self.last_target_point and self._is_point_in_bed(self.last_target_point): # 目标消失但仍在床上,认为静止 motion = "peaceful" else: # 目标消失且不在床上,认为离床 motion = "leave" LOGDBG(f"motion: {motion}, without target and not in bed") # 为保持平滑趋势,补入0 self.motion_buffer.append((ts, 0)) self.motion_stat_ = motion # --- 睡眠状态分析 --- if breath_rpm < 8: breathe_stat = "r0" elif breath_rpm < 12: breathe_stat = "r1" elif breath_rpm < 18: breathe_stat = "r2" elif breath_rpm < 25: breathe_stat = "r3" else: breathe_stat = "r4" i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_) i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat) sleep_stat = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath] # --- 按分钟保存 sleep_segment --- dt = datetime.fromtimestamp(ts) current_minute = dt.replace(second=0, microsecond=0) if last_saved_minute != current_minute: sleep_segment = { "ts": ts, "sleep_stat": sleep_stat, "smooth_motion": round(smooth_motion, 3), "breath_rpm": breath_rpm, "breathe_stat": breathe_stat } self.sleep_segments_.append(sleep_segment) last_saved_minute = current_minute except Exception as e: LOGERR(f"解析行失败: {line}, error: {e}") def export_report(self, out_file=None): if not out_file: out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json" try: with open(out_file, "w", encoding="utf-8") as f: json.dump(self.sleep_segments_, f, ensure_ascii=False, indent=2) LOGINFO(f"睡眠报告已生成: {out_file}") print(f"睡眠报告已生成: {out_file}") except Exception as e: LOGERR(f"保存睡眠报告失败: {e}") print(f"保存睡眠报告失败: {e}") # ---------------- 使用示例 ---------------- if __name__ == "__main__": if 0: dev_id = "94A9900B0B38" bed_rect = [0, 115, 200, 115] else: dev_id = "94A9900B0B80" bed_rect = [-10, 130, 120, 210] file_path = f"demos/record/{dev_id}.raw.json" analyzer = SleepReplayAnalyzer(file_path, bed_rect) analyzer.analyze() ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json")