12345678910111213141516171819202122232425262728293031323334353637 |
- from typing import List, Tuple, Optional
- from datetime import datetime, time, date
- from threading import Thread, Lock
- from enum import Enum
- from core.event_type import EventType
- from core.alarm_event import AlarmEvent
- from core.alarm_plan_manager import AlarmPlanManager
- class LinkageAlarmService:
- def __init__(self, plan_mgr: AlarmPlanManager):
- self.plan_mgr_ = plan_mgr
- self.last_trigger_times_ = {} # plan_uuid -> datetime
- def process_event(self, event: AlarmEvent):
- matched_plans = self.plan_mgr_.match_plans(event)
- now = event.timestamp_
- for plan in matched_plans:
- plan_uuid = plan.plan_uuid_
- last_time = self.last_trigger_times_.get(plan_uuid)
- # 归并事件判断
- if last_time and (now - last_time).total_seconds() < plan.merge_time_:
- print(f"归并")
- continue
- print(f"触发事件 {event.event_id_}, 命中计划[{plan.name_}]")
- self.last_trigger_times_[plan_uuid] = now
- def simulate_event_input(service: LinkageAlarmService):
- event = AlarmEvent(
- event_id = "ev001",
- dev_id = "00FFAABBCC11",
- event_type = EventType.TOILETING_DETECTION
- )
|