nifangxu 1 місяць тому
батько
коміт
4a29f58e54
6 змінених файлів з 120 додано та 66 видалено
  1. 7 3
      LAS.py
  2. 1 1
      core/alarm_plan.py
  3. 67 0
      core/alarm_plan_dispatcher.py
  4. 0 55
      core/alarm_plan_manager.py
  5. 2 2
      core/g_LAS.py
  6. 43 5
      db/db_sqls.py

+ 7 - 3
LAS.py

@@ -27,13 +27,16 @@ from device.dev_mng import (
 
 
 import core.alarm_plan_manager as ap_mgr
-from core.alarm_plan_manager import(
+from core.alarm_plan_manager import (
     AlarmPlanManager,
     init_alarm_plan_mgr,
-    start_event_dispatcher,
     start_alarm_plan_mgr,
     cb_handle_query_all_alarm_plan_info
 )
+from core.alarm_plan_dispatcher import (
+    init_alarm_plan_disp,
+    start_alarm_plan_dispatcher
+)
 
 import core.g_LAS as g_las
 
@@ -191,6 +194,7 @@ def main():
 
     # 初始化LAS
     init_alarm_plan_mgr()
+    init_alarm_plan_disp()
 
     # 数据库处理线程
     db_process.create_db_process().start()
@@ -202,7 +206,7 @@ def main():
     mqtt_consumer.start()
 
     # 事件分发器
-    start_event_dispatcher()
+    start_alarm_plan_dispatcher()
     # 告警计划管理器
     start_alarm_plan_mgr()
 

+ 1 - 1
core/alarm_plan.py

@@ -220,7 +220,7 @@ class AlarmPlan:
     def execute(self):
         if self.status_ != 1:
             return
-        g_las.g_event_dispatcher.dispatch(self.event_type_, self)
+        g_las.g_alarm_plan_disp.dispatch(self.event_type_, self)
 
 
     # 初始化事件属性

+ 67 - 0
core/alarm_plan_dispatcher.py

@@ -0,0 +1,67 @@
+import queue
+import threading
+
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from core.alarm_plan import AlarmPlan
+from core.event_type import EventType
+from core.alarm_plan_manager import AlarmPlanManager
+import core.g_LAS as g_las
+
+
+
+# 分发器
+class AlarmPlanDispatcher:
+    def __init__(self):
+        self.queues = {}  # event_type -> Queue
+        self.threads = {}
+        self.running = False
+
+    def start(self, handlers: dict):
+        """handlers: {event_type: handler_func}"""
+        self.running = True
+        for event_type, handler in handlers.items():
+            q = queue.Queue()
+            self.queues[event_type] = q
+
+            t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
+            self.threads[event_type] = t
+            t.start()
+
+    def stop(self):
+        self.running = False
+
+    def dispatch(self, event_type: int, plan):
+        if event_type in self.queues:
+            self.queues[event_type].put(plan)
+        else:
+            LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
+
+    def worker(self, event_type, q: queue.Queue, handler):
+        while self.running:
+            try:
+                plan = q.get(timeout=1)
+                handler(plan)
+            except queue.Empty:
+                continue
+            except Exception as e:
+                LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
+
+
+def init_alarm_plan_disp():
+    g_las.g_alarm_plan_disp = AlarmPlanDispatcher()
+
+
+def start_alarm_plan_dispatcher():
+    # 注册事件处理函数
+    handles = {
+        EventType.STAY_DETECTION.value      : AlarmPlan.handle_stay_detection,
+        EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
+        EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
+        EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
+        EventType.NIGHT_TOILETING_FREQUENCY.value       : AlarmPlan.handle_night_toileting_frequency,
+        EventType.TOILETING_FREQUENCY_ABNORMAL.value    : AlarmPlan.handle_toileting_frequency_abnormal,
+        EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal,
+        EventType.BATHROOM_STAY_FREQUENCY.value         : AlarmPlan.handle_bathroom_stay_frequency,
+        EventType.TARGET_ABSENCE.value                  : AlarmPlan.handle_target_absence,
+    }
+    g_las.g_alarm_plan_disp.start(handles)

+ 0 - 55
core/alarm_plan_manager.py

@@ -146,70 +146,15 @@ class AlarmPlanManager:
                     LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
             time.sleep(5)  # 每 30 秒检查一次
 
-# 分发器
-class EventDispatcher:
-    def __init__(self):
-        self.queues = {}  # event_type -> Queue
-        self.threads = {}
-        self.running = False
-
-    def start(self, handlers: dict):
-        """handlers: {event_type: handler_func}"""
-        self.running = True
-        for event_type, handler in handlers.items():
-            q = queue.Queue()
-            self.queues[event_type] = q
-
-            t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
-            self.threads[event_type] = t
-            t.start()
-
-    def stop(self):
-        self.running = False
-
-    def dispatch(self, event_type: int, plan):
-        if event_type in self.queues:
-            self.queues[event_type].put(plan)
-        else:
-            LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
-
-    def worker(self, event_type, q: queue.Queue, handler):
-        while self.running:
-            try:
-                plan = q.get(timeout=1)
-                handler(plan)
-            except queue.Empty:
-                continue
-            except Exception as e:
-                LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
-
 
 def init_alarm_plan_mgr():
     g_las.g_alarm_plan_mgr = AlarmPlanManager()
-    g_las.g_event_dispatcher = EventDispatcher()
-
 
 
 def start_alarm_plan_mgr():
     g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
 
 
-def start_event_dispatcher():
-    # 注册事件处理函数
-    handles = {
-        EventType.STAY_DETECTION.value      : AlarmPlan.handle_stay_detection,
-        EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
-        EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
-        EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
-        EventType.NIGHT_TOILETING_FREQUENCY.value       : AlarmPlan.handle_night_toileting_frequency,
-        EventType.TOILETING_FREQUENCY_ABNORMAL.value    : AlarmPlan.handle_toileting_frequency_abnormal,
-        EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal,
-        EventType.BATHROOM_STAY_FREQUENCY.value         : AlarmPlan.handle_bathroom_stay_frequency,
-        EventType.TARGET_ABSENCE.value                  : AlarmPlan.handle_target_absence,
-    }
-    g_las.g_event_dispatcher.start(handles)
-
-
 # 将region字典转换为xy平面的rect [left, top, width, height]
 def region_to_rect(region: dict) -> list:
     x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]

+ 2 - 2
core/g_LAS.py

@@ -1,7 +1,7 @@
 # from core.alarm_plan import  AlarmPlan
 # from core.alarm_plan_manager import  AlarmPlanManager
-# from core.alarm_plan_manager import  EventDispatcher
+# from core.alarm_plan_manager import  AlarmPlanDispatcher
 
 # 全局变量
 g_alarm_plan_mgr    = None  # 告警计划管理器
-g_event_dispatcher  = None  # 告警计划分发器
+g_alarm_plan_disp  = None  # 告警计划分发器

+ 43 - 5
db/db_sqls.py

@@ -74,9 +74,9 @@ SELECT
     atp.stop_date       AS stop_date,
     atp.time_range      AS time_range,
     atp.month_days      AS month_days,
-    atp.weekdays        AS weekdays
+    atp.weekdays        AS weekdays,
 
-    di.dev_name         AS dev_name,    -- 设备名称
+    di.dev_name         AS dev_name
 
 FROM alarm_plan ap
 LEFT JOIN event_type et 
@@ -85,11 +85,49 @@ LEFT JOIN alarm_time_plan atp
        ON ap.alarm_time_plan_id = atp.id
 LEFT JOIN dev_info di
        ON ap.dev_id = di.client_id  -- 匹配设备id
-WHERE ap.enable = 1;
+WHERE
+    ap.enable = 1;
 """
 
 # 查询单个告警计划
-# todo
+sql_query_one_alarm_plan = """
+SELECT
+    ap.id               AS plan_id,
+    ap.uuid             AS plan_uuid,
+    ap.name             AS plan_name,
+    ap.dev_id           AS dev_id,
+    ap.enable           AS enable,
+    ap.region           AS region,
+    ap.threshold_time   AS threshold_time,
+    ap.merge_time       AS merge_time,
+    ap.param            AS param,
+    ap.create_time      AS create_time,
+    ap.update_time      AS update_time,
+
+    et.event_val        AS event_val,
+    et.event_str        AS event_str,
+    et.event_desc       AS event_desc,
+
+    atp.id              AS time_plan_id,
+    atp.start_date      AS start_date,
+    atp.stop_date       AS stop_date,
+    atp.time_range      AS time_range,
+    atp.month_days      AS month_days,
+    atp.weekdays        AS weekdays,
+
+    di.dev_name         AS dev_name    -- 设备名称
+FROM alarm_plan ap
+LEFT JOIN event_type et 
+       ON ap.event_val = et.event_val
+LEFT JOIN alarm_time_plan atp 
+       ON ap.alarm_time_plan_id = atp.id
+LEFT JOIN dev_info di
+       ON ap.dev_id = di.client_id  -- 匹配设备id
+WHERE
+    ap.enable = 1 AND
+    ap.uuid = %(plan_uuid)s;
+"""
+
 
 
 # 插入events
@@ -120,7 +158,7 @@ VALUES (
 # 查询events
 sql_query_events_by_datetime = """
 SELECT *
-FROM lnxx_dev.events
+    FROM lnxx_dev.events
 WHERE
     dev_id = %(dev_id)s AND
     event_type = %(event_type)s AND