import json from collections import deque, Counter import os import sys from datetime import datetime # 将 LAS 根目录加入 sys.path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import core.alarm_plan_helper as helper from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, EC, get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s ) from core.alarm_plan import EventAttr_SleepMonitoring class SleepReplayAnalyzer: def __init__(self, file_path, bed_rect_ltwh): self.file_path = file_path self.bed_rect_ltwh = bed_rect_ltwh # 窗口和状态 self.motion_buffer = deque() # 每条: (ts, motion_value) self.sleep_segments_raw_ = [] self.sleep_segments_ = [] self.last_target_point = None self.last_target_ts = None self.motion_stat_ = "leave" self.sleep_stat_ = "leave" # 判断目标点是否在床区域 def _is_point_in_bed(self, target_point): x, y, *_ = target_point l, t, w, h = self.bed_rect_ltwh return l <= x <= l + w and (t - h) <= y <= t # 判断目标点是否在床区域 def _is_point_in_bed_ex(self, target_point, expand=10): x, y, *_ = target_point l, t, w, h = self.bed_rect_ltwh l_expanded = l - expand r_expanded = l + w + expand top_expanded = t + expand bottom_expanded = t - h - expand return l_expanded <= x <= r_expanded and bottom_expanded <= y <= top_expanded # 根据窗口计算平滑运动状态 def _calc_motion_state_window(self, current_ts, threshold_peaceful=2, threshold_micro=4): try: # 清理超过60秒的数据 while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60: self.motion_buffer.popleft() # 平滑运动 if self.motion_buffer: smooth_motion = sum(v for _, v in self.motion_buffer) / len(self.motion_buffer) else: smooth_motion = 0 # 运动状态 if smooth_motion < threshold_peaceful: motion = "peaceful" elif smooth_motion < threshold_micro: motion = "micro" else: motion = "active" return motion, smooth_motion except Exception as e: LOGERR(f"_calc_motion_state_window error: error: {e}") # 🔹 新版本:根据时间窗口(10分钟)分析最近数据 def _analyze_recent_window(self, ts, smooth_motion, breath_rpm, breathe_stat): try: if not self.sleep_segments_raw_: return None # 保留最近10分钟(600秒)的数据 time_window = 600 # 秒 start_ts = ts - time_window recent_data = [d for d in self.sleep_segments_raw_ if d["ts"] >= start_ts] if not recent_data: return None stats = [d["sleep_stat"] for d in recent_data] counter = Counter(stats) total = len(stats) major_stat, count = counter.most_common(1)[0] # 若占比超过50%,用当前主要状态,否则保持上次状态 if count / total >= 0.5: final_sleep_stat = major_stat else: final_sleep_stat = self.sleep_stat_ sleep_segment = { "ts": ts, "sleep_stat": final_sleep_stat, "smooth_motion": round(smooth_motion, 3), "breath_rpm": breath_rpm, "breathe_stat": breathe_stat } return sleep_segment except Exception as e: LOGERR(f"_analyze_recent_window error: {e}") return None # 主分析逻辑 def analyze(self): if not os.path.exists(self.file_path): LOGERR(f"录像文件不存在: {self.file_path}") return last_analyze_ts = None # 上次分析时间 last_saved_minute = None with open(self.file_path, "r", encoding="utf-8") as f: lines = f.readlines() for line in lines: line = line.strip() if not line: continue try: entry = json.loads(line) ts = entry["ts"] / 1000.0 # 转秒 payload = entry["payload"] targets = payload.get("tracker_targets", []) breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0) # --- 运动状态 --- motion = self.motion_stat_ if targets: if not self._is_point_in_bed(targets[0]): motion_value = 0 smooth_motion = 0 motion = "leave" else: motion_value = 0 if self.last_target_point is not None: x1, y1, z1, _ = self.last_target_point x2, y2, z2, _ = targets[0] motion_value = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5 self.motion_buffer.append((ts, motion_value)) motion, smooth_motion = self._calc_motion_state_window(ts) self.last_target_point = targets[0] self.last_target_ts = ts else: smooth_motion = 0 if self.last_target_point and self._is_point_in_bed(self.last_target_point): motion = "peaceful" else: motion = "leave" self.motion_buffer.append((ts, 0)) self.motion_stat_ = motion # --- 呼吸状态 --- if breath_rpm < 8: breathe_stat = "r0" elif breath_rpm < 12: breathe_stat = "r1" elif breath_rpm < 18: breathe_stat = "r2" elif breath_rpm < 25: breathe_stat = "r3" else: breathe_stat = "r4" i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_) i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat) sleep_stat_raw = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath] # --- 保存原始实时片段 --- self.sleep_segments_raw_.append({ "ts": ts, "sleep_stat": sleep_stat_raw, "smooth_motion": round(smooth_motion, 3), "breath_rpm": breath_rpm, "breathe_stat": breathe_stat }) # --- 每隔30秒分析一次 --- if (last_analyze_ts is None) or (ts - last_analyze_ts >= 30): analyzed_segment = self._analyze_recent_window(ts, smooth_motion, breath_rpm, breathe_stat) if analyzed_segment: self.sleep_segments_.append(analyzed_segment) self.sleep_stat_ = analyzed_segment["sleep_stat"] last_analyze_ts = ts # 更新时间戳 # --- 可选:按分钟对齐输出日志(非必要) dt = datetime.fromtimestamp(ts) current_minute = dt.replace(second=0, microsecond=0) if last_saved_minute != current_minute: LOGDBG(f"[{current_minute.strftime('%H:%M')}] 当前状态: {self.sleep_stat_}") last_saved_minute = current_minute except Exception as e: LOGERR(f"解析行失败: {line}, error: {e}") def export_report(self, out_file=None): if not out_file: out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json" try: with open(out_file, "w", encoding="utf-8") as f: json.dump(self.sleep_segments_, f, ensure_ascii=False, indent=2) LOGINFO(f"睡眠报告已生成: {out_file}") print(f"睡眠报告已生成: {out_file}") except Exception as e: LOGERR(f"保存睡眠报告失败: {e}") print(f"保存睡眠报告失败: {e}") # ---------------- 使用示例 ---------------- if __name__ == "__main__": start_ts = get_utc_time_s() if 1: dev_id = "94A9900B0B38" bed_rect = [0, 250, 200, 115] else: dev_id = "94A9900B0B80" bed_rect = [80, 130, 120, 210] file_path = f"demos/record/{dev_id}.raw.json" # file_path = f"demos/record/2025-10-13_21-03-13_94A9900B0B80.raw.json" analyzer = SleepReplayAnalyzer(file_path, bed_rect) analyzer.analyze() ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json") end_ts = get_utc_time_s() total_time = end_ts - start_ts print(f" 耗时:{total_time}秒") LOGINFO(f" 耗时:{total_time}秒")