Explorar el Código

优化对监测区域的防御性检查

nifangxu hace 1 semana
padre
commit
0d4749a8a9
Se han modificado 2 ficheros con 87 adiciones y 5 borrados
  1. 79 0
      core/alarm_plan_helper.py
  2. 8 5
      core/alarm_plan_manager.py

+ 79 - 0
core/alarm_plan_helper.py

@@ -1,4 +1,14 @@
 from datetime import datetime, timedelta
+import json
+
+import common.sys_comm as sys_comm
+from common.sys_comm import (
+    LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
+    get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
+    utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
+)
+
+from core.event_type import EventType
 
 
 def get_query_time_range(param: dict, now: datetime = None):
@@ -65,3 +75,72 @@ def is_point_in_rect(x, y, rect):
 
     return left <= x <= right and top <= y <= bottom
 
+
+
+def region_to_rect(region_str):
+    """
+    安全解析 region 字段:
+    - 期望输入为 JSON 字符串
+    - 合法返回:list[float] (长度为4) 或 []
+    - 非法返回:[]
+    """
+    if not region_str:
+        return []  # None 或空串
+
+    try:
+        # 尝试 JSON 解析
+        region = json.loads(region_str)
+
+        # 允许空列表 []
+        if region == []:
+            return []
+
+        # 必须是列表
+        if not isinstance(region, list):
+            LOGWARN(f"region_to_rect: not a list, got {region_str}")
+            return []
+
+        # 检查长度
+        if len(region) != 4:
+            LOGWARN(f"region_to_rect: invalid length {len(region)}, data={region_str}")
+            return []
+
+        # 检查数值类型
+        for i, v in enumerate(region):
+            if not isinstance(v, (int, float)):
+                LOGWARN(f"region_to_rect: invalid type at index {i}: {v}")
+                return []
+
+        return region  # ✅ 合法结果
+
+    except json.JSONDecodeError:
+        LOGWARN(f"region_to_rect: JSON decode failed for region={region_str}")
+        return []
+    except Exception as e:
+        LOGERR(f"region_to_rect: unexpected error: {e}, data={region_str}")
+        return []
+
+
+def check_plan_rect_valid(event_type: int, rect: list) -> bool:
+    """
+    检查指定事件类型是否要求有有效的检测区域。
+    返回 True 表示 rect 合法;False 表示应跳过。
+    """
+    no_rect_event_types = {
+        EventType.CLEAN_EXPIRE_EVENTS.value,
+        EventType.TOILETING_FREQUENCY.value,
+        EventType.NIGHT_TOILETING_FREQUENCY.value,
+        EventType.TOILETING_FREQUENCY_ABNORMAL.value,
+        EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value,
+        EventType.BATHROOM_STAY_FREQUENCY.value,
+    }
+
+    # 对不依赖检测区域的类型,直接放行
+    if event_type in no_rect_event_types:
+        return True
+
+    # 其他事件需要有合法的 rect
+    if not rect or len(rect) != 4:
+        return False
+
+    return True

+ 8 - 5
core/alarm_plan_manager.py

@@ -15,6 +15,7 @@ from core.alarm_plan import AlarmPlan
 from core.time_plan import TimePlan
 from core.event_type import EventType
 from core.linkage_action import LinkageAction
+import core.alarm_plan_helper as helper
 import core.g_LAS as g_las
 
 from db.db_process import db_req_que
@@ -263,7 +264,11 @@ class AlarmPlanManager:
                 dev_id: str     = row["dev_id"]
                 dev_name: str   = row["dev_name"]
                 enable: int     = bool(row["enable"])
-                rect: list      = json.loads(row["region"]) if row.get("region") else []
+                region          = row["region"]
+                rect            = helper.region_to_rect(region)
+                if not helper.check_plan_rect_valid(event_type, rect):
+                    LOGWARN(f"skip plan {plan_uuid}: invalid rect={rect} for event_type={event_type}")
+                    continue
                 threshold_time: int = row["threshold_time"]
                 merge_time: int = row["merge_time"]
                 param: dict     = json.loads(row["param"])
@@ -369,7 +374,6 @@ def region_to_rect(region: dict) -> list:
 
     return [left, top, width, height]
 
-
 # 回调函数,处理查询结果:查询所有的告警计划信息
 def cb_handle_query_all_alarm_plan_info(result, userdata):
     try:
@@ -381,9 +385,8 @@ def cb_handle_query_all_alarm_plan_info(result, userdata):
             dev_id: str     = row["dev_id"]
             dev_name: str   = row["dev_name"]
             enable: int     = bool(row["enable"])
-            # region          = row["region"]
-            # rect            = json.loads(region_to_rect(region))
-            rect: list      = json.loads(row["region"]) if row.get("region") else []
+            region          = row["region"]
+            rect = helper.region_to_rect(region)
             threshold_time: int = row["threshold_time"]
             merge_time: int     = row["merge_time"]
             param: dict = json.loads(row["param"]) if row.get("param") else {}