import queue import threading from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from core.alarm_plan import AlarmPlan from core.event_type import EventType from core.alarm_plan_manager import AlarmPlanManager import core.g_LAS as g_las # 分发器 class AlarmPlanDispatcher: def __init__(self): self.queues = {} # event_type -> Queue self.threads = {} self.running = False def start(self, handlers: dict): """handlers: {event_type: handler_func}""" self.running = True for event_type, handler in handlers.items(): q = queue.Queue() self.queues[event_type] = q t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True) self.threads[event_type] = t t.start() def stop(self): self.running = False def dispatch(self, event_type: int, plan): if event_type in self.queues: self.queues[event_type].put(plan) else: LOGINFO(f"[Dispatcher] No queue for event_type={event_type}") def worker(self, event_type, q: queue.Queue, handler): while self.running: try: plan = q.get(timeout=1) handler(plan) except queue.Empty: continue except Exception as e: LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}") def init_alarm_plan_disp(): g_las.g_alarm_plan_disp = AlarmPlanDispatcher() def start_alarm_plan_dispatcher(): # 注册事件处理函数 handles = { EventType.STAY_DETECTION.value : AlarmPlan.handle_stay_detection, EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection, EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection, EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency, EventType.NIGHT_TOILETING_FREQUENCY.value : AlarmPlan.handle_night_toileting_frequency, EventType.TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_toileting_frequency_abnormal, EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal, EventType.BATHROOM_STAY_FREQUENCY.value : AlarmPlan.handle_bathroom_stay_frequency, EventType.TARGET_ABSENCE.value : AlarmPlan.handle_target_absence, } g_las.g_alarm_plan_disp.start(handles)