123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import json
- from collections import deque
- import os
- import sys
- # 将 LAS 根目录加入 sys.path
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
- import core.alarm_plan_helper as helper
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
- get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
- utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
- )
- import device.dev_mng as g_Dev
- from device.dev_mng import (
- Device, g_dev_mgr, init_dev_mng
- )
- from core.alarm_plan import AlarmPlan
- from core.time_plan import TimePlan
- from core.event_type import EventType
- from core.linkage_action import LinkageAction
- # 假设你已经有 AlarmPlan 方法绑定在 AlarmPlan 实例上
- # 这里我们写一个回放测试程序
- class SleepMonitorTester:
- def __init__(self, file_path, dev_id, rect):
- self.file_path = file_path
- init_dev_mng()
- self.dev_id = dev_id
- self.device = Device(dev_id)
- g_Dev.g_dev_mgr.start()
- g_Dev.g_dev_mgr.push_dev_map(dev_id, self.device)
- # 初始化算法实例
- linkage_action = LinkageAction()
- cron = {
- "hour": 1,
- "minute": 0
- }
- time_plan = TimePlan(
- time_range = [{"start_time": "00:00","end_time": "23:59"}],
- start_date = '2025-09-01',
- stop_date = '2099-12-31',
- weekdays = [1,2,3,4,5,6,7],
- month_days = []
- )
- self.sleep_monitor = AlarmPlan(
- plan_uuid = "plan_uuid",
- name = "plan_name",
- dev_id = 'LAS',
- dev_name = '告警联动服务',
- enable = 1,
- time_plan = time_plan,
- rect = [],
- event_type = EventType.SLEEP_MONITORING.value,
- threshold_time = 300,
- merge_time = 30,
- param = {},
- cron = cron,
- linkage_action=linkage_action,
- tenant_id = 0
- )
- self.sleep_monitor.dev_id_ = dev_id
- # 设置监测区域
- self.sleep_monitor.rect_ = rect
- def run(self):
- with open(self.file_path, "r", encoding="utf-8") as f:
- lines = f.readlines()
- for line in lines:
- line = line.strip()
- if not line:
- continue
- try:
- entry = json.loads(line)
- payload = entry["payload"]
- ts = entry["ts"]
- # 构造 rtd_unit
- tracker_targets = payload.get("tracker_targets", [[0, 0, 0, 0]])
- breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
- pose = 4 # 固定值,参考原逻辑
- rtd_unit = {
- "timestamp": ts / 1000.0, # 转成秒
- "target_point": tracker_targets[0],
- "breath_rpm": breath_rpm,
- "pose": pose
- }
- # 写入队列
- self.device.put_rtd_unit(rtd_unit)
- self.device.update_keepalive(rtd_unit["timestamp"])
- # 调用睡眠监测算法
- self.sleep_monitor.handle_sleep_monitoring()
- except json.JSONDecodeError:
- print(f"跳过非法行: {line}")
- except Exception as e:
- print(f"处理记录失败: {e}")
- # 回放结束,打印睡眠状态序列
- print("回放结束,睡眠状态序列:")
- for seg in self.sleep_monitor.sleep_segments_:
- print(f"ts={seg['ts']:.3f}, sleep_stat={seg['sleep_stat']}")
-
- # 保存 JSON 报告
- report_file = os.path.splitext(os.path.basename(self.file_path))[0] + "_sleep_report.json"
- report_path = os.path.join("demos/record", report_file) # 可根据需要修改目录
- try:
- with open(report_path, "w", encoding="utf-8") as f:
- json.dump(self.sleep_monitor.sleep_segments_, f, ensure_ascii=False, indent=2)
- print(f"睡眠报告已生成: {report_path}")
- except Exception as e:
- print(f"生成睡眠报告失败: {e}")
- # 使用示例
- if __name__ == "__main__":
- file_path = "demos/record/94A9900B0B38.raw.json"
- dev_id = "94A9900B0B80"
- rect = [0, 115, 200, 115]
- tester = SleepMonitorTester(file_path, dev_id, rect)
- tester.run()
|