sleep_monitor.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import json
  2. from collections import deque
  3. import os
  4. import sys
  5. # 将 LAS 根目录加入 sys.path
  6. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
  7. import core.alarm_plan_helper as helper
  8. from common.sys_comm import (
  9. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  10. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  11. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  12. )
  13. import device.dev_mng as g_Dev
  14. from device.dev_mng import (
  15. Device, g_dev_mgr, init_dev_mng
  16. )
  17. from core.alarm_plan import AlarmPlan
  18. from core.time_plan import TimePlan
  19. from core.event_type import EventType
  20. from core.linkage_action import LinkageAction
  21. # 假设你已经有 AlarmPlan 方法绑定在 AlarmPlan 实例上
  22. # 这里我们写一个回放测试程序
  23. class SleepMonitorTester:
  24. def __init__(self, file_path, dev_id, rect):
  25. self.file_path = file_path
  26. init_dev_mng()
  27. self.dev_id = dev_id
  28. self.device = Device(dev_id)
  29. g_Dev.g_dev_mgr.start()
  30. g_Dev.g_dev_mgr.push_dev_map(dev_id, self.device)
  31. # 初始化算法实例
  32. linkage_action = LinkageAction()
  33. cron = {
  34. "hour": 1,
  35. "minute": 0
  36. }
  37. time_plan = TimePlan(
  38. time_range = [{"start_time": "00:00","end_time": "23:59"}],
  39. start_date = '2025-09-01',
  40. stop_date = '2099-12-31',
  41. weekdays = [1,2,3,4,5,6,7],
  42. month_days = []
  43. )
  44. self.sleep_monitor = AlarmPlan(
  45. plan_uuid = "plan_uuid",
  46. name = "plan_name",
  47. dev_id = 'LAS',
  48. dev_name = '告警联动服务',
  49. enable = 1,
  50. time_plan = time_plan,
  51. rect = [],
  52. event_type = EventType.SLEEP_MONITORING.value,
  53. threshold_time = 300,
  54. merge_time = 30,
  55. param = {},
  56. cron = cron,
  57. linkage_action=linkage_action,
  58. tenant_id = 0
  59. )
  60. self.sleep_monitor.dev_id_ = dev_id
  61. # 设置监测区域
  62. self.sleep_monitor.rect_ = rect
  63. def run(self):
  64. with open(self.file_path, "r", encoding="utf-8") as f:
  65. lines = f.readlines()
  66. for line in lines:
  67. line = line.strip()
  68. if not line:
  69. continue
  70. try:
  71. entry = json.loads(line)
  72. payload = entry["payload"]
  73. ts = entry["ts"]
  74. # 构造 rtd_unit
  75. tracker_targets = payload.get("tracker_targets", [[0, 0, 0, 0]])
  76. breath_rpm = payload.get("health", {}).get("breath_rpm", 0.0)
  77. pose = 4 # 固定值,参考原逻辑
  78. rtd_unit = {
  79. "timestamp": ts / 1000.0, # 转成秒
  80. "target_point": tracker_targets[0],
  81. "breath_rpm": breath_rpm,
  82. "pose": pose
  83. }
  84. # 写入队列
  85. self.device.put_rtd_unit(rtd_unit)
  86. self.device.update_keepalive(rtd_unit["timestamp"])
  87. # 调用睡眠监测算法
  88. self.sleep_monitor.handle_sleep_monitoring()
  89. except json.JSONDecodeError:
  90. print(f"跳过非法行: {line}")
  91. except Exception as e:
  92. print(f"处理记录失败: {e}")
  93. # 回放结束,打印睡眠状态序列
  94. print("回放结束,睡眠状态序列:")
  95. for seg in self.sleep_monitor.sleep_segments_:
  96. print(f"ts={seg['ts']:.3f}, sleep_stat={seg['sleep_stat']}")
  97. # 保存 JSON 报告
  98. report_file = os.path.splitext(os.path.basename(self.file_path))[0] + "_sleep_report.json"
  99. report_path = os.path.join("demos/record", report_file) # 可根据需要修改目录
  100. try:
  101. with open(report_path, "w", encoding="utf-8") as f:
  102. json.dump(self.sleep_monitor.sleep_segments_, f, ensure_ascii=False, indent=2)
  103. print(f"睡眠报告已生成: {report_path}")
  104. except Exception as e:
  105. print(f"生成睡眠报告失败: {e}")
  106. # 使用示例
  107. if __name__ == "__main__":
  108. file_path = "demos/record/94A9900B0B38.raw.json"
  109. dev_id = "94A9900B0B80"
  110. rect = [0, 115, 200, 115]
  111. tester = SleepMonitorTester(file_path, dev_id, rect)
  112. tester.run()