from typing import List, Tuple, Optional from datetime import datetime, time, date from threading import Thread, Lock from enum import Enum from core.event_type import EventType from core.alarm_event import AlarmEvent from core.alarm_plan_manager import AlarmPlanManager class LinkageAlarmService: def __init__(self, plan_mgr: AlarmPlanManager): self.plan_mgr_ = plan_mgr self.last_trigger_times_ = {} # plan_uuid -> datetime def process_event(self, event: AlarmEvent): matched_plans = self.plan_mgr_.match_plans(event) now = event.timestamp_ for plan in matched_plans: plan_uuid = plan.plan_uuid_ last_time = self.last_trigger_times_.get(plan_uuid) # 归并事件判断 if last_time and (now - last_time).total_seconds() < plan.merge_time_: print(f"归并") continue print(f"触发事件 {event.event_id_}, 命中计划[{plan.name_}]") self.last_trigger_times_[plan_uuid] = now def simulate_event_input(service: LinkageAlarmService): event = AlarmEvent( event_id = "ev001", dev_id = "00FFAABBCC11", event_type = EventType.TOILETING_DETECTION )