123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import json
- from collections import deque
- import os
- import sys
- from datetime import datetime
- # 将 LAS 根目录加入 sys.path
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
- import core.alarm_plan_helper as helper
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
- get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
- utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
- )
- from core.alarm_plan import EventAttr_SleepMonitoring
- class SleepReplayAnalyzer:
- def __init__(self, file_path, bed_rect_ltwh):
- """
- file_path: 录像文件路径
- bed_rect_ltwh: 床区域 [left, top, width, height]
- """
- self.file_path = file_path
- self.bed_rect_ltwh = bed_rect_ltwh
- # 窗口和状态
- self.motion_buffer = deque() # 每条: (ts, motion_value)
- self.sleep_segments_ = []
- self.last_target_point = None
- self.last_target_ts = None
- self.motion_stat_ = "leave"
- self.sleep_stat_ = "leave"
- # 判断目标点是否在床区域
- def _is_point_in_bed(self, target_point):
- x, y, *_ = target_point
- l, t, w, h = self.bed_rect_ltwh
- return l <= x <= l + w and t <= y <= t + h
- # 根据窗口计算平滑运动状态
- def _calc_motion_state_window(self, current_ts, threshold_peaceful=5, threshold_micro=10):
- try:
- # 清理超过60秒的数据
- while self.motion_buffer and current_ts - self.motion_buffer[0][0] > 60:
- self.motion_buffer.popleft()
- # 平滑运动
- if self.motion_buffer:
- smooth_motion = sum(v for _, v in self.motion_buffer) / len(self.motion_buffer)
- else:
- smooth_motion = 0
- # 运动状态
- if smooth_motion < threshold_peaceful:
- motion = "peaceful"
- elif smooth_motion < threshold_micro:
- motion = "micro"
- else:
- motion = "active"
- return motion, smooth_motion
- except Exception as e:
- LOGERR(f"_calc_motion_state_window error: error: {e}")
- def analyze(self):
- if not os.path.exists(self.file_path):
- LOGERR(f"录像文件不存在: {self.file_path}")
- return
- last_saved_minute = None
- with open(self.file_path, "r", encoding="utf-8") as f:
- lines = f.readlines()
- for line in lines:
- line = line.strip()
- if not line:
- continue
- try:
- entry = json.loads(line)
- ts = entry["ts"] / 1000.0 # 转秒
- payload = entry["payload"]
- targets = payload.get("tracker_targets", [])
- breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
- # --- 运动状态分析 ---
- motion = self.motion_stat_
- if targets:
- if not self._is_point_in_bed(targets[0]):
- motion_value = 0
- smooth_motion = 0
- motion = "leave"
- LOGDBG(f"motion: {motion}, with target and not in bed")
- else:
- motion_value = 0
- if self.last_target_point is not None:
- x1, y1, z1, _ = self.last_target_point
- x2, y2, z2, _ = targets[0]
- motion_value = ((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2) ** 0.5
- self.motion_buffer.append((ts, motion_value))
- motion, smooth_motion = self._calc_motion_state_window(ts)
- self.last_target_point = targets[0]
- self.last_target_ts = ts
- else:
- smooth_motion = 0
- # 无目标时,根据最后目标位置判断
- if self.last_target_point and self._is_point_in_bed(self.last_target_point):
- # 目标消失但仍在床上,认为静止
- motion = "peaceful"
- else:
- # 目标消失且不在床上,认为离床
- motion = "leave"
- LOGDBG(f"motion: {motion}, without target and not in bed")
- # 为保持平滑趋势,补入0
- self.motion_buffer.append((ts, 0))
- self.motion_stat_ = motion
- # --- 睡眠状态分析 ---
- if breath_rpm < 8: breathe_stat = "r0"
- elif breath_rpm < 12: breathe_stat = "r1"
- elif breath_rpm < 18: breathe_stat = "r2"
- elif breath_rpm < 25: breathe_stat = "r3"
- else: breathe_stat = "r4"
- i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
- i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
- sleep_stat = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
- # --- 按分钟保存 sleep_segment ---
- dt = datetime.fromtimestamp(ts)
- current_minute = dt.replace(second=0, microsecond=0)
- if last_saved_minute != current_minute:
- sleep_segment = {
- "ts": ts,
- "sleep_stat": sleep_stat,
- "smooth_motion": round(smooth_motion, 3),
- "breath_rpm": breath_rpm,
- "breathe_stat": breathe_stat
- }
- self.sleep_segments_.append(sleep_segment)
- last_saved_minute = current_minute
- except Exception as e:
- LOGERR(f"解析行失败: {line}, error: {e}")
- def export_report(self, out_file=None):
- if not out_file:
- out_file = os.path.splitext(self.file_path)[0] + "_sleep_report.json"
- try:
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(self.sleep_segments_, f, ensure_ascii=False, indent=2)
- LOGINFO(f"睡眠报告已生成: {out_file}")
- print(f"睡眠报告已生成: {out_file}")
- except Exception as e:
- LOGERR(f"保存睡眠报告失败: {e}")
- print(f"保存睡眠报告失败: {e}")
- # ---------------- 使用示例 ----------------
- if __name__ == "__main__":
- if 0:
- dev_id = "94A9900B0B38"
- bed_rect = [0, 115, 200, 115]
- else:
- dev_id = "94A9900B0B80"
- bed_rect = [-10, 130, 120, 210]
- file_path = f"demos/record/{dev_id}.raw.json"
- analyzer = SleepReplayAnalyzer(file_path, bed_rect)
- analyzer.analyze()
- ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- analyzer.export_report(f"./demos/record/report_{dev_id}_{ts}.json")
|