Forráskód Böngészése

优化分发器线程池

nifangxu 1 hónapja
szülő
commit
5db4b79e9f
2 módosított fájl, 42 hozzáadás és 38 törlés
  1. 18 2
      core/alarm_plan.py
  2. 24 36
      core/alarm_plan_dispatcher.py

+ 18 - 2
core/alarm_plan.py

@@ -161,7 +161,7 @@ class EventAttr_TargetAbsence(EventAttr_Base):
         self.enter_ts_  = -1
         self.absence_time_ = -1
 
-# 事件属性 清理过期事件(无用)
+# 事件属性 清理过期事件
 class EventAttr_CleanExpireEvents(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
@@ -191,6 +191,19 @@ class Cron:
 
 
 class AlarmPlan:
+    handles_map = {
+        EventType.STAY_DETECTION.value      : lambda plan: AlarmPlan.handle_stay_detection(plan),
+        EventType.RETENTION_DETECTION.value : lambda plan: AlarmPlan.handle_retention_detection(plan),
+        EventType.TOILETING_DETECTION.value : lambda plan: AlarmPlan.handle_toileting_detection(plan),
+        EventType.TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_toileting_frequency(plan),
+        EventType.NIGHT_TOILETING_FREQUENCY.value       : lambda plan: AlarmPlan.handle_night_toileting_frequency(plan),
+        EventType.TOILETING_FREQUENCY_ABNORMAL.value    : lambda plan: AlarmPlan.handle_toileting_frequency_abnormal(plan),
+        EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_night_toileting_frequency_abnormal(plan),
+        EventType.BATHROOM_STAY_FREQUENCY.value         : lambda plan: AlarmPlan.handle_bathroom_stay_frequency(plan),
+        EventType.TARGET_ABSENCE.value                  : lambda plan: AlarmPlan.handle_target_absence(plan),
+        EventType.CLEAN_EXPIRE_EVENTS.value             : lambda plan: AlarmPlan.handle_clear_expire_events(plan),
+    }
+
     def __init__(self,
                  plan_uuid: str,
                  name: str,
@@ -230,6 +243,9 @@ class AlarmPlan:
         self.event_attr_    = self.init_event_attr()    # 事件属性
         if self.event_attr_ is None:
             raise ValueError(f"Invalid event_type: {event_type}")
+        self.handle_func_   = self.handles_map.get(event_type)
+        if self.handle_func_ is None:
+            raise ValueError(f"Invalic event_type: {event_type}")
 
         # 计划任务的开始时间
         self.cron_ = cron   # {“hour": 7, "minute": 0}
@@ -237,7 +253,7 @@ class AlarmPlan:
     def execute(self):
         if self.status_ != 1:
             return
-        g_las.g_alarm_plan_disp.dispatch(self.event_type_, self)
+        g_las.g_alarm_plan_disp.dispatch(self)
 
 
     # 初始化事件属性

+ 24 - 36
core/alarm_plan_dispatcher.py

@@ -11,63 +11,51 @@ import core.g_LAS as g_las
 
 # 分发器
 class AlarmPlanDispatcher:
-    def __init__(self):
-        self.queues = {}  # event_type -> Queue
-        self.threads = {}
+    def __init__(self, num_workers=4):
+        self.task_queue = queue.Queue()
+        self.threads = []
         self.running = False
+        self.num_workers = num_workers
 
-    def start(self, handlers: dict):
-        """handlers: {event_type: handler_func}"""
+    def start(self):
         self.running = True
-        for event_type, handler in handlers.items():
-            q = queue.Queue()
-            self.queues[event_type] = q
-
+        for i in range(self.num_workers):
             t = threading.Thread(
                 target=self.worker,
-                args=(event_type, q, handler),
                 daemon=True,
-                name=f"APDispatcherThread-{event_type}")
-            self.threads[event_type] = t
+                name=f"APDispatcherWorker-{i}"
+            )
+            self.threads.append(t)
             t.start()
 
     def stop(self):
         self.running = False
+        for _ in self.threads:
+            self.task_queue.put(None)
 
-    def dispatch(self, event_type: int, plan):
-        if event_type in self.queues:
-            self.queues[event_type].put(plan)
-        else:
-            LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
+    def dispatch(self, plan: "AlarmPlan"):
+        self.task_queue.put(plan)
 
-    def worker(self, event_type, q: queue.Queue, handler):
+    def worker(self):
         while self.running:
             try:
-                plan = q.get(timeout=1)
-                handler(plan)
+                plan = self.task_queue.get(timeout=1)
+                if plan is None:
+                    break
+                if plan.handle_func:
+                    plan.handle_func(plan)
+                else:
+                    LOGINFO(f"[Dispatcher] Plan {plan.plan_uuid_} has no handler")
             except queue.Empty:
                 continue
             except Exception as e:
-                LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
+                LOGERR(f"[Dispatcher] Worker error: {e}")
 
 
 def init_alarm_plan_disp():
-    g_las.g_alarm_plan_disp = AlarmPlanDispatcher()
+    g_las.g_alarm_plan_disp = AlarmPlanDispatcher(num_workers=4)
 
 
 def start_alarm_plan_dispatcher():
-    # 注册事件处理函数
-    handles = {
-        EventType.STAY_DETECTION.value      : AlarmPlan.handle_stay_detection,
-        EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
-        EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
-        EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
-        EventType.NIGHT_TOILETING_FREQUENCY.value       : AlarmPlan.handle_night_toileting_frequency,
-        EventType.TOILETING_FREQUENCY_ABNORMAL.value    : AlarmPlan.handle_toileting_frequency_abnormal,
-        EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal,
-        EventType.BATHROOM_STAY_FREQUENCY.value         : AlarmPlan.handle_bathroom_stay_frequency,
-        EventType.TARGET_ABSENCE.value                  : AlarmPlan.handle_target_absence,
+    g_las.g_alarm_plan_disp.start()
 
-        EventType.CLEAN_EXPIRE_EVENTS.value             : AlarmPlan.handle_clear_expire_events
-    }
-    g_las.g_alarm_plan_disp.start(handles)