Pārlūkot izejas kodu

支持动态维护告警计划

nifangxu 1 mēnesi atpakaļ
vecāks
revīzija
82ea18f447
2 mainītis faili ar 120 papildinājumiem un 8 dzēšanām
  1. 117 3
      core/alarm_plan_manager.py
  2. 3 5
      mqtt/mqtt_recv.py

+ 117 - 3
core/alarm_plan_manager.py

@@ -10,14 +10,15 @@ import ast
 import traceback
 
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
-
 from core.alarm_event import AlarmEvent
 from core.alarm_plan import AlarmPlan
 from core.time_plan import TimePlan
 from core.event_type import EventType
 import core.g_LAS as g_las
 
-
+from db.db_process import db_req_que
+from db.db_process import DBRequest
+import db.db_sqls as sqls
 
 class AlarmPlanManager:
     def __init__(self,
@@ -77,6 +78,17 @@ class AlarmPlanManager:
         with self.lock_:
             return list(self.alarm_plan_cron_map_.values())
 
+    def remove_one_plan(self, plan_uuid: str) -> bool:
+        with self.lock_:
+            removed = False
+            if plan_uuid in self.alarm_plan_map_:
+                del self.alarm_plan_map_[plan_uuid]
+                removed = True
+            if plan_uuid in self.alarm_plan_cron_map_:
+                del self.alarm_plan_cron_map_[plan_uuid]
+                removed = True
+            return removed
+
 
     def start_scheduler(self, interval=5):
         if self.running:
@@ -147,6 +159,103 @@ class AlarmPlanManager:
             time.sleep(5)  # 每 30 秒检查一次
 
 
+    def query_one_alarm_plan(self, plan_uuid: str):
+        # 查询单个告警计划
+        params = {
+            "plan_uuid": plan_uuid
+        }
+
+        db_req_que.put(DBRequest(sql=sqls.sql_query_one_alarm_plan, params=params,
+                                 callback=self.cb_query_one_alarm_plan))
+
+
+# 如厕频次统计回调
+    def cb_query_one_alarm_plan(self, result, userdata):
+        try:
+            if not result:
+                LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
+
+            for row in result:
+                plan_uuid: str  = row["plan_uuid"]
+                plan_name: str  = row["plan_name"]
+                dev_id: str     = row["dev_id"]
+                dev_name: str   = row["dev_name"]
+                enable: int     = bool(row["enable"])
+                rect: list      = json.loads(row["region"]) if row.get("region") else []
+                threshold_time: int = row["threshold_time"]
+                merge_time: int     = row["merge_time"]
+                param: dict     = json.loads(row["param"])
+
+                event_val: int   = row["event_val"]
+                event_type      = event_val
+                event_str: str   = row["event_str"]
+                event_desc: str  = row["event_desc"]
+
+                start_date  = row["start_date"]
+                stop_date   = row["stop_date"]
+                time_range  = json.loads(row["time_range"])
+                month_days = None
+                if row["month_days"]:
+                    month_days = ast.literal_eval(row["month_days"])
+                weekdays = None
+                if row["weekdays"]:
+                    weekdays = ast.literal_eval(row["weekdays"])
+
+                cron = None
+                if ((event_type == EventType.TOILETING_FREQUENCY.value) or
+                    (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
+                    (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
+                    (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
+                    (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
+                    cron = {
+                        "hour": 6,
+                        "minute": 0
+                    }
+
+                time_plan = TimePlan(
+                    time_range  = time_range,
+                    start_date  = start_date,
+                    stop_date   = stop_date,
+                    weekdays    = weekdays,
+                    month_days  = month_days
+                )
+
+                alarm_plan = AlarmPlan(
+                    plan_uuid   = plan_uuid,
+                    name        = plan_name,
+                    dev_id      = dev_id,
+                    dev_name    = dev_name,
+                    enable      = enable,
+                    time_plan   = time_plan,
+                    rect        = rect,
+                    event_type  = event_type,
+                    threshold_time  = threshold_time,
+                    merge_time  = merge_time,
+                    param       = param,
+                    cron        = cron
+                )
+                if alarm_plan.event_attr_ is None:
+                    LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
+                    continue
+                # 塞入告警计划
+                if not alarm_plan.cron_:
+                    self.push(plan_uuid, alarm_plan)
+                else:
+                    self.push_cron(plan_uuid, alarm_plan)
+            LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+            return
+
+
+
 def init_alarm_plan_mgr():
     g_las.g_alarm_plan_mgr = AlarmPlanManager()
 
@@ -155,6 +264,10 @@ def start_alarm_plan_mgr():
     g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
 
 
+# --------------------------------------
+# 数据库响应的回调函数
+# --------------------------------------
+
 # 将region字典转换为xy平面的rect [left, top, width, height]
 def region_to_rect(region: dict) -> list:
     x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
@@ -170,6 +283,7 @@ def region_to_rect(region: dict) -> list:
 
     return [left, top, width, height]
 
+
 # 回调函数,处理查询结果:查询所有的告警计划信息
 def cb_handle_query_all_alarm_plan_info(result):
     try:
@@ -194,7 +308,6 @@ def cb_handle_query_all_alarm_plan_info(result):
 
                 start_date  = row["start_date"]
                 stop_date   = row["stop_date"]
-
                 time_range  = json.loads(row["time_range"])
                 month_days = None
                 if row["month_days"]:
@@ -226,6 +339,7 @@ def cb_handle_query_all_alarm_plan_info(result):
                     plan_uuid   = plan_uuid,
                     name        = plan_name,
                     dev_id      = dev_id,
+                    dev_name    = dev_name,
                     enable      = enable,
                     time_plan   = time_plan,
                     rect        = rect,

+ 3 - 5
mqtt/mqtt_recv.py

@@ -24,6 +24,7 @@ import mqtt.mqtt_send as mqtt_send
 import device.dev_mng as dev_mng
 from device.dev_mng import g_dev_map, g_dev_map_lock, Device
 
+import core.g_LAS as g_las
 
 
 # 数据队列
@@ -132,8 +133,6 @@ def deal_realtime_pos(msg:mqtt.MQTTMessage):
             LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
 
 
-
-
 # 告警计划更新
 def deal_alarm_plan_update(msg:mqtt.MQTTMessage):
     try:
@@ -141,12 +140,11 @@ def deal_alarm_plan_update(msg:mqtt.MQTTMessage):
         plan_uuid   = payload.get("plan_uuid")
         operation   = payload.get("operation")
 
-        # todo
         if operation == "update":
+            g_las.g_alarm_plan_mgr.query_one_alarm_plan(plan_uuid)
             return
         elif operation == "delete":
-            return
-
+            g_las.g_alarm_plan_mgr.remove_one_plan(plan_uuid)
 
     except json.JSONDecodeError as e:
         tb_info = traceback.extract_tb(e.__traceback__)