Sfoglia il codice sorgente

1. 支持如厕频次异常告警;2. 支持起夜异常告警

nifangxu 1 mese fa
parent
commit
13915b29d9
3 ha cambiato i file con 236 aggiunte e 37 eliminazioni
  1. 229 32
      core/alarm_plan.py
  2. 2 2
      core/g_LAS.py
  3. 5 3
      db/db_sqls.py

+ 229 - 32
core/alarm_plan.py

@@ -83,7 +83,7 @@ class EventAttr_ToiletingDetection(EventAttr_Base):
 class EventAttr_ToiletingFrequency(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
-        self.count_: int        = 0
+        self.count_: int        = 0     # 统计次数
         self.event_list: list   = []
         return
 
@@ -96,57 +96,51 @@ class EventAttr_ToiletingFrequency(EventAttr_Base):
 class EventAttr_NightToiletingFrequency(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
-        self.enter_ts_: int  = -1    # 进入时间(ms)
-        self.leave_ts_: int  = -1    # 离开时间(ms)
-        self.stay_time_: int = -1    # 停留时长(ms)
+        self.count_: int        = 0     # 统计次数
+        self.event_list: list   = []
         return
 
     def reset(self):
-        self.enter_ts_  = -1
-        self.leave_ts_  = -1
-        self.stay_time_ = -1
+        self.count_  = 0
+        self.event_list  = []
 
 # 事件属性 如厕频次异常
 class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
-        self.enter_ts_: int  = -1    # 进入时间(ms)
-        self.leave_ts_: int  = -1    # 离开时间(ms)
-        self.stay_time_: int = -1    # 停留时长(ms)
+        self.count_: int        = 0     # 统计次数
+        self.threshold_count_: int  = 0     # 异常阈值
+        self.event_list: list   = []
         return
 
     def reset(self):
-        self.enter_ts_  = -1
-        self.leave_ts_  = -1
-        self.stay_time_ = -1
+        self.count_  = 0
+        self.event_list  = []
 
 # 事件属性 起夜异常
 class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
-        self.enter_ts_: int  = -1    # 进入时间(ms)
-        self.leave_ts_: int  = -1    # 离开时间(ms)
-        self.stay_time_: int = -1    # 停留时长(ms)
+        self.count_: int        = 0     # 统计次数
+        self.threshold_count_: int  = 0     # 异常阈值
+        self.event_list: list   = []
         return
 
     def reset(self):
-        self.enter_ts_  = -1
-        self.leave_ts_  = -1
-        self.stay_time_ = -1
+        self.count_  = 0
+        self.event_list  = []
 
 # 事件属性 卫生间频次统计
 class EventAttr_BathroomStayFrequency(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
-        self.enter_ts_: int  = -1    # 进入时间(ms)
-        self.leave_ts_: int  = -1    # 离开时间(ms)
-        self.stay_time_: int = -1    # 停留时长(ms)
+        self.count_: int        = 0     # 统计次数
+        self.event_list: list   = []
         return
 
     def reset(self):
-        self.enter_ts_  = -1
-        self.leave_ts_  = -1
-        self.stay_time_ = -1
+        self.count_  = 0
+        self.event_list  = []
 
 # 事件属性 异常消失
 class EventAttr_TargetAbsence(EventAttr_Base):
@@ -228,12 +222,26 @@ class AlarmPlan:
 
     # 初始化事件属性
     def init_event_attr(self):
-        event_cls = event_attr_map.get(self.event_type_)
-        if event_cls is None:
-            return None
+        try:
+            event_cls = event_attr_map.get(self.event_type_)
+            if event_cls is None:
+                return None
 
-        event_attr    = event_cls(self.event_type_)
-        return event_attr
+            event_attr    = event_cls(self.event_type_)
+            if ((self.event_type_ == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
+                (self.event_type_ == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value)):
+                event_attr.threshold_count_ = int(self.param_.get("count", 0))
+
+            return event_attr
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
 
 
     # 更新激活状态
@@ -517,6 +525,7 @@ class AlarmPlan:
 
             start_dt, end_dt = helper.get_query_time_range(self.param_)
             params = {
+                "dev_id"    : self.dev_id_,
                 "event_type": event_desc_map[EventType.TOILETING_DETECTION.value],
                 "start_dt": start_dt,
                 "end_dt": end_dt
@@ -552,6 +561,7 @@ class AlarmPlan:
 
             start_dt, end_dt = helper.get_query_time_range(self.param_)
             params = {
+                "dev_id"    : self.dev_id_,
                 "event_type": event_desc_map[EventType.TOILETING_DETECTION.value],
                 "start_dt": start_dt,
                 "end_dt": end_dt
@@ -579,12 +589,73 @@ class AlarmPlan:
 
     # 如厕频次异常
     def handle_toileting_frequency_abnormal(self):
-        return
+        try:
+            dev_id = self.dev_id_
+            device:Device = dev_map_find(dev_id)
+            if not device:
+                return
+
+            start_dt, end_dt = helper.get_query_time_range(self.param_)
+            params = {
+                "dev_id"    : self.dev_id_,
+                "event_type": event_desc_map[EventType.TOILETING_DETECTION.value],
+                "start_dt": start_dt,
+                "end_dt": end_dt
+            }
+            userdata = {
+                "start_dt"  : start_dt,
+                "end_dt"  : end_dt
+            }
+            db_req_que.put(DBRequest(
+                sql=sqls.sql_query_events_by_datetime,
+                params=params,
+                callback=self.cb_toileting_frequency_abnormal,
+                userdata=userdata))
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
 
 
     # 起夜异常
     def handle_night_toileting_frequency_abnormal(self):
-        return
+        try:
+            dev_id = self.dev_id_
+            device:Device = dev_map_find(dev_id)
+            if not device:
+                return
+
+            start_dt, end_dt = helper.get_query_time_range(self.param_)
+            params = {
+                "dev_id"    : self.dev_id_,
+                "event_type": event_desc_map[EventType.TOILETING_DETECTION.value],
+                "start_dt": start_dt,
+                "end_dt": end_dt
+            }
+            userdata = {
+                "start_dt"  : start_dt,
+                "end_dt"  : end_dt
+            }
+            db_req_que.put(DBRequest(
+                sql=sqls.sql_query_events_by_datetime,
+                params=params,
+                callback=self.cb_toileting_frequency_abnormal,
+                userdata=userdata))
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
 
 
     # 卫生间频次统计
@@ -724,6 +795,132 @@ class AlarmPlan:
             return
 
 
+    # 如厕频次异常回调
+    def cb_toileting_frequency_abnormal(self, result, userdata):
+        try:
+            if result:
+                count = 0
+                event_list = []
+
+                for row in result:
+                    dev_id: str         = row.get("dev_id")
+                    event_uuid: str     = row.get("uuid")
+                    plan_uuid: str      = row.get("plan_uuid")
+                    event_type: int     = row.get("event_type")
+                    info: dict          = json.loads(row["info"]) if row.get("info") else {}
+                    is_handle: str      = row.get("is_handle")
+                    create_time: str    = row.get("create_time")
+                    update_time: str    = row.get("update_time")
+                    is_deleted: str     = row.get("is_deleted")
+                    remark: str         = row.get("remark")
+
+                    event_list.append(info)
+
+                if len(event_list) < self.event_attr_.threshold_count_:
+                    return
+
+                this_event_uuid = str(uuid.uuid4())
+                last_info = {
+                    "start_time"    : userdata["start_dt"],
+                    "end_time"      : userdata["end_dt"],
+                    "count"         : len(event_list),
+                    "event_list"    : event_list
+                }
+
+                # 入库
+                event_uuid = str(uuid.uuid4())
+                params = {
+                    "dev_id": dev_id,
+                    "uuid": this_event_uuid,
+                    "plan_uuid": self.plan_uuid_,
+                    "event_type": event_desc_map[self.event_type_],
+                    "info": json.dumps(last_info, ensure_ascii=False),
+                    "is_handle": 0,
+                    "create_time": get_bj_time_s(),
+                    "is_deleted": 0,
+                    "remark": json.dumps({}, ensure_ascii=False) 
+                }
+                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+                # 通知
+                mqtt_send.alarm_event(dev_id, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, "events")
+
+                LOGINFO(f"EventAttr_ToiletingFrequency succeed")
+            else:
+                LOGDBG("EventAttr_ToiletingFrequency, empty result")
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+            return
+
+
+    # 起夜异常回调
+    def cb_toileting_frequency_abnormal(self, result, userdata):
+        try:
+            if result:
+                count = 0
+                event_list = []
+
+                for row in result:
+                    dev_id: str         = row.get("dev_id")
+                    event_uuid: str     = row.get("uuid")
+                    plan_uuid: str      = row.get("plan_uuid")
+                    event_type: int     = row.get("event_type")
+                    info: dict          = json.loads(row["info"]) if row.get("info") else {}
+                    is_handle: str      = row.get("is_handle")
+                    create_time: str    = row.get("create_time")
+                    update_time: str    = row.get("update_time")
+                    is_deleted: str     = row.get("is_deleted")
+                    remark: str         = row.get("remark")
+
+                    event_list.append(info)
+
+                if len(event_list) < self.event_attr_.threshold_count_:
+                    return
 
+                this_event_uuid = str(uuid.uuid4())
+                last_info = {
+                    "start_time"    : userdata["start_dt"],
+                    "end_time"      : userdata["end_dt"],
+                    "count"         : len(event_list),
+                    "event_list"    : event_list
+                }
 
+                # 入库
+                event_uuid = str(uuid.uuid4())
+                params = {
+                    "dev_id": dev_id,
+                    "uuid": this_event_uuid,
+                    "plan_uuid": self.plan_uuid_,
+                    "event_type": event_desc_map[self.event_type_],
+                    "info": json.dumps(last_info, ensure_ascii=False),
+                    "is_handle": 0,
+                    "create_time": get_bj_time_s(),
+                    "is_deleted": 0,
+                    "remark": json.dumps({}, ensure_ascii=False) 
+                }
+                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+                # 通知
+                mqtt_send.alarm_event(dev_id, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, "events")
+
+                LOGINFO(f"EventAttr_ToiletingFrequency succeed")
+            else:
+                LOGDBG("EventAttr_ToiletingFrequency, empty result")
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+            return
 

+ 2 - 2
core/g_LAS.py

@@ -3,5 +3,5 @@
 # from core.alarm_plan_manager import  EventDispatcher
 
 # 全局变量
-g_alarm_plan_mgr    = None
-g_event_dispatcher  = None
+g_alarm_plan_mgr    = None  # 告警计划管理器
+g_event_dispatcher  = None  # 告警计划分发器

+ 5 - 3
db/db_sqls.py

@@ -117,9 +117,11 @@ VALUES (
 sql_query_events_by_datetime = """
 SELECT *
 FROM lnxx_dev.events
-WHERE event_type = %(event_type)s
-  AND create_time BETWEEN %(start_dt)s AND %(end_dt)s
-  AND is_deleted = 0;
+WHERE
+    dev_id = %(dev_id)s AND
+    event_type = %(event_type)s AND
+    create_time BETWEEN %(start_dt)s AND %(end_dt)s AND
+    is_deleted = 0;
 """