import json from collections import deque import os import sys # 将 LAS 根目录加入 sys.path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import core.alarm_plan_helper as helper from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, EC, get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s ) import device.dev_mng as g_Dev from device.dev_mng import ( Device, g_dev_mgr, init_dev_mng ) from core.alarm_plan import AlarmPlan from core.time_plan import TimePlan from core.event_type import EventType from core.linkage_action import LinkageAction # 假设你已经有 AlarmPlan 方法绑定在 AlarmPlan 实例上 # 这里我们写一个回放测试程序 class SleepMonitorTester: def __init__(self, file_path, dev_id, rect): self.file_path = file_path init_dev_mng() self.dev_id = dev_id self.device = Device(dev_id) g_Dev.g_dev_mgr.start() g_Dev.g_dev_mgr.push_dev_map(dev_id, self.device) # 初始化算法实例 linkage_action = LinkageAction() cron = { "hour": 1, "minute": 0 } time_plan = TimePlan( time_range = [{"start_time": "00:00","end_time": "23:59"}], start_date = '2025-09-01', stop_date = '2099-12-31', weekdays = [1,2,3,4,5,6,7], month_days = [] ) self.sleep_monitor = AlarmPlan( plan_uuid = "plan_uuid", name = "plan_name", dev_id = 'LAS', dev_name = '告警联动服务', enable = 1, time_plan = time_plan, rect = [], event_type = EventType.SLEEP_MONITORING.value, threshold_time = 300, merge_time = 30, param = {}, cron = cron, linkage_action=linkage_action, tenant_id = 0 ) self.sleep_monitor.dev_id_ = dev_id # 设置监测区域 self.sleep_monitor.rect_ = rect def run(self): with open(self.file_path, "r", encoding="utf-8") as f: lines = f.readlines() for line in lines: line = line.strip() if not line: continue try: entry = json.loads(line) payload = entry["payload"] ts = entry["ts"] # 构造 rtd_unit tracker_targets = payload.get("tracker_targets", [[0, 0, 0, 0]]) breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0) pose = 4 # 固定值,参考原逻辑 rtd_unit = { "timestamp": ts / 1000.0, # 转成秒 "target_point": tracker_targets[0], "breath_rpm": breath_rpm, "pose": pose } # 写入队列 self.device.put_rtd_unit(rtd_unit) self.device.update_keepalive(rtd_unit["timestamp"]) # 调用睡眠监测算法 self.sleep_monitor.handle_sleep_monitoring() except json.JSONDecodeError: print(f"跳过非法行: {line}") except Exception as e: print(f"处理记录失败: {e}") # 回放结束,打印睡眠状态序列 print("回放结束,睡眠状态序列:") for seg in self.sleep_monitor.sleep_segments_: print(f"ts={seg['ts']:.3f}, sleep_stat={seg['sleep_stat']}") # 保存 JSON 报告 report_file = os.path.splitext(os.path.basename(self.file_path))[0] + "_sleep_report.json" report_path = os.path.join("demos/record", report_file) # 可根据需要修改目录 try: with open(report_path, "w", encoding="utf-8") as f: json.dump(self.sleep_monitor.sleep_segments_, f, ensure_ascii=False, indent=2) print(f"睡眠报告已生成: {report_path}") except Exception as e: print(f"生成睡眠报告失败: {e}") # 使用示例 if __name__ == "__main__": file_path = "demos/record/94A9900B0B38.raw.json" dev_id = "94A9900B0B80" rect = [0, 115, 200, 115] tester = SleepMonitorTester(file_path, dev_id, rect) tester.run()