linkage_alarm_service.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from typing import List, Tuple, Optional
  2. from datetime import datetime, time, date
  3. from threading import Thread, Lock
  4. from enum import Enum
  5. from core.event_type import EventType
  6. from core.alarm_event import AlarmEvent
  7. from core.alarm_plan_manager import AlarmPlanManager
  8. class LinkageAlarmService:
  9. def __init__(self, plan_mgr: AlarmPlanManager):
  10. self.plan_mgr_ = plan_mgr
  11. self.last_trigger_times_ = {} # plan_uuid -> datetime
  12. def process_event(self, event: AlarmEvent):
  13. matched_plans = self.plan_mgr_.match_plans(event)
  14. now = event.timestamp_
  15. for plan in matched_plans:
  16. plan_uuid = plan.plan_uuid_
  17. last_time = self.last_trigger_times_.get(plan_uuid)
  18. # 归并事件判断
  19. if last_time and (now - last_time).total_seconds() < plan.merge_time_:
  20. print(f"归并")
  21. continue
  22. print(f"触发事件 {event.event_id_}, 命中计划[{plan.name_}]")
  23. self.last_trigger_times_[plan_uuid] = now
  24. def simulate_event_input(service: LinkageAlarmService):
  25. event = AlarmEvent(
  26. event_id = "ev001",
  27. dev_id = "00FFAABBCC11",
  28. event_type = EventType.TOILETING_DETECTION
  29. )