alarm_plan_dispatcher.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import queue
  2. import threading
  3. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  4. from core.alarm_plan import AlarmPlan
  5. from core.event_type import EventType
  6. from core.alarm_plan_manager import AlarmPlanManager
  7. import core.g_LAS as g_las
  8. # 分发器
  9. class AlarmPlanDispatcher:
  10. def __init__(self, num_workers=4):
  11. self.task_queue = queue.Queue()
  12. self.threads = []
  13. self.running = False
  14. self.num_workers = num_workers
  15. def start(self):
  16. self.running = True
  17. for i in range(self.num_workers):
  18. t = threading.Thread(
  19. target=self.worker,
  20. daemon=True,
  21. name=f"APDispatcherWorker-{i}"
  22. )
  23. self.threads.append(t)
  24. t.start()
  25. def stop(self):
  26. self.running = False
  27. for _ in self.threads:
  28. self.task_queue.put(None)
  29. def dispatch(self, plan: "AlarmPlan"):
  30. self.task_queue.put(plan)
  31. def worker(self):
  32. while self.running:
  33. try:
  34. plan = self.task_queue.get(timeout=1)
  35. if plan is None:
  36. break
  37. if plan.handle_func_:
  38. plan.handle_func_(plan)
  39. else:
  40. LOGINFO(f"[Dispatcher] Plan {plan.plan_uuid_} has no handler")
  41. except queue.Empty:
  42. continue
  43. except Exception as e:
  44. LOGERR(f"[Dispatcher] Worker error: {e}")
  45. def init_alarm_plan_disp():
  46. g_las.g_alarm_plan_disp = AlarmPlanDispatcher(num_workers=4)
  47. def start_alarm_plan_dispatcher():
  48. g_las.g_alarm_plan_disp.start()
  49. LOGINFO(f"start g_alarm_plan_disp succeed")