import queue import threading from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from core.alarm_plan import AlarmPlan from core.event_type import EventType from core.alarm_plan_manager import AlarmPlanManager import core.g_LAS as g_las # 分发器 class AlarmPlanDispatcher: def __init__(self, num_workers=4): self.task_queue = queue.Queue() self.threads = [] self.running = False self.num_workers = num_workers def start(self): self.running = True for i in range(self.num_workers): t = threading.Thread( target=self.worker, daemon=True, name=f"APDispatcherWorker-{i}" ) self.threads.append(t) t.start() def stop(self): self.running = False for _ in self.threads: self.task_queue.put(None) def dispatch(self, plan: "AlarmPlan"): self.task_queue.put(plan) def worker(self): while self.running: try: plan = self.task_queue.get(timeout=1) if plan is None: break if plan.handle_func: plan.handle_func(plan) else: LOGINFO(f"[Dispatcher] Plan {plan.plan_uuid_} has no handler") except queue.Empty: continue except Exception as e: LOGERR(f"[Dispatcher] Worker error: {e}") def init_alarm_plan_disp(): g_las.g_alarm_plan_disp = AlarmPlanDispatcher(num_workers=4) def start_alarm_plan_dispatcher(): g_las.g_alarm_plan_disp.start()