12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- import queue
- import threading
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from core.alarm_plan import AlarmPlan
- from core.event_type import EventType
- from core.alarm_plan_manager import AlarmPlanManager
- import core.g_LAS as g_las
- # 分发器
- class AlarmPlanDispatcher:
- def __init__(self, num_workers=4):
- self.task_queue = queue.Queue()
- self.threads = []
- self.running = False
- self.num_workers = num_workers
- def start(self):
- self.running = True
- for i in range(self.num_workers):
- t = threading.Thread(
- target=self.worker,
- daemon=True,
- name=f"APDispatcherWorker-{i}"
- )
- self.threads.append(t)
- t.start()
- def stop(self):
- self.running = False
- for _ in self.threads:
- self.task_queue.put(None)
- def dispatch(self, plan: "AlarmPlan"):
- self.task_queue.put(plan)
- def worker(self):
- while self.running:
- try:
- plan = self.task_queue.get(timeout=1)
- if plan is None:
- break
- if plan.handle_func:
- plan.handle_func(plan)
- else:
- LOGINFO(f"[Dispatcher] Plan {plan.plan_uuid_} has no handler")
- except queue.Empty:
- continue
- except Exception as e:
- LOGERR(f"[Dispatcher] Worker error: {e}")
- def init_alarm_plan_disp():
- g_las.g_alarm_plan_disp = AlarmPlanDispatcher(num_workers=4)
- def start_alarm_plan_dispatcher():
- g_las.g_alarm_plan_disp.start()
|