Procházet zdrojové kódy

如厕频次统计

nifangxu před 1 měsícem
rodič
revize
b9bfc30ced
5 změnil soubory, kde provedl 187 přidání a 89 odebrání
  1. 94 69
      core/alarm_plan.py
  2. 55 0
      core/alarm_plan_helper.py
  3. 25 17
      core/alarm_plan_manager.py
  4. 10 0
      db/db_sqls.py
  5. 3 3
      device/dev_mng.py

+ 94 - 69
core/alarm_plan.py

@@ -16,6 +16,8 @@ from common.sys_comm import (
 from core.time_plan import TimePlan
 from core.event_type import EventType, event_desc_map
 import core.g_LAS as g_las
+import core.alarm_plan_helper as helper
+
 from device.dev_mng import (
     Device,
     dev_map_push, dev_map_pop, dev_map_find, dev_map_delete
@@ -35,7 +37,8 @@ class EventAttr_Base:
 
 # 事件属性 事件事件
 class EventAttr_StayDetection(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(s)
         self.leave_ts_: int  = -1    # 离开时间(s)
         self.stay_time_: int = -1    # 停留时长(s)
@@ -49,7 +52,8 @@ class EventAttr_StayDetection(EventAttr_Base):
 
 # 事件属性 滞留事件
 class EventAttr_RetentionDetection(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(s)
         self.leave_ts_: int  = -1    # 离开时间(s)
         self.stay_time_: int = -1    # 停留时长(s)
@@ -63,7 +67,8 @@ class EventAttr_RetentionDetection(EventAttr_Base):
 
 # 事件属性 如厕事件
 class EventAttr_ToiletingDetection(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(ms)
         self.leave_ts_: int  = -1    # 离开时间(ms)
         self.stay_time_: int = -1    # 停留时长(ms)
@@ -76,7 +81,8 @@ class EventAttr_ToiletingDetection(EventAttr_Base):
 
 # 事件属性 如厕频次统计
 class EventAttr_ToiletingFrequency(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.count_: int        = 0
         self.event_list: list   = []
         return
@@ -85,9 +91,11 @@ class EventAttr_ToiletingFrequency(EventAttr_Base):
         self.count_  = 0
         self.event_list  = []
 
+
 # 事件属性 夜间如厕频次统计
 class EventAttr_NightToiletingFrequency(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(ms)
         self.leave_ts_: int  = -1    # 离开时间(ms)
         self.stay_time_: int = -1    # 停留时长(ms)
@@ -100,7 +108,8 @@ class EventAttr_NightToiletingFrequency(EventAttr_Base):
 
 # 事件属性 如厕频次异常
 class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(ms)
         self.leave_ts_: int  = -1    # 离开时间(ms)
         self.stay_time_: int = -1    # 停留时长(ms)
@@ -113,7 +122,8 @@ class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base):
 
 # 事件属性 起夜异常
 class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(ms)
         self.leave_ts_: int  = -1    # 离开时间(ms)
         self.stay_time_: int = -1    # 停留时长(ms)
@@ -126,7 +136,8 @@ class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base):
 
 # 事件属性 卫生间频次统计
 class EventAttr_BathroomStayFrequency(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(ms)
         self.leave_ts_: int  = -1    # 离开时间(ms)
         self.stay_time_: int = -1    # 停留时长(ms)
@@ -139,7 +150,8 @@ class EventAttr_BathroomStayFrequency(EventAttr_Base):
 
 # 事件属性 异常消失
 class EventAttr_TargetAbsence(EventAttr_Base):
-    def __init__(self):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
         self.enter_ts_: int  = -1    # 进入时间(ms)
         self.leave_ts_: int  = -1    # 离开时间(ms)
         self.stay_time_: int = -1    # 停留时长(ms)
@@ -211,8 +223,7 @@ class AlarmPlan:
     def execute(self):
         if self.status_ != 1:
             return
-        if not self.cron_:
-            g_las.g_event_dispatcher.dispatch(self.event_type_, self)
+        g_las.g_event_dispatcher.dispatch(self.event_type_, self)
 
 
     # 初始化事件属性
@@ -221,7 +232,7 @@ class AlarmPlan:
         if event_cls is None:
             return None
 
-        event_attr    = event_cls()
+        event_attr    = event_cls(self.event_type_)
         return event_attr
 
 
@@ -318,7 +329,6 @@ class AlarmPlan:
                 "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
                 "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
                 "stay_time": stay_time
-
             }
             event_uuid = str(uuid.uuid4())
             params = {
@@ -330,7 +340,7 @@ class AlarmPlan:
                 "is_handle": 0,
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
-                "remark": {}
+                "remark": json.dumps({}, ensure_ascii=False) 
             }
             db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -392,7 +402,6 @@ class AlarmPlan:
                 "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
                 "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
                 "stay_time": stay_time
-
             }
             event_uuid = str(uuid.uuid4())
             params = {
@@ -404,7 +413,7 @@ class AlarmPlan:
                 "is_handle": 0,
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
-                "remark": {}
+                "remark": json.dumps({}, ensure_ascii=False) 
             }
             db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -466,7 +475,6 @@ class AlarmPlan:
                 "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
                 "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
                 "stay_time": stay_time
-
             }
             event_uuid = str(uuid.uuid4())
             params = {
@@ -478,7 +486,7 @@ class AlarmPlan:
                 "is_handle": 0,
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
-                "remark": {}
+                "remark": json.dumps({}, ensure_ascii=False) 
             }
             db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -506,62 +514,78 @@ class AlarmPlan:
             device:Device = dev_map_find(dev_id)
             if not device:
                 return
-            now = get_utc_time_s()
-
-            # 查找最新的落在检测区域的目标
-            rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
-            if rtd_unit:
-                timestamp = rtd_unit["timestamp"]
-                pose = rtd_unit["pose"]
-                target_point = rtd_unit["target_point"]
-
-                if self.event_attr_.enter_ts_ == -1:
-                    self.event_attr_.enter_ts_ = timestamp
-                else:
-                    self.event_attr_.leave_ts_ = timestamp
-
-            if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
-                return
-
-            # 归并时间内,不认为事件结束
-            if now - self.event_attr_.leave_ts_  < self.merge_time_:
-                return
-
-            self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
-            stay_time =self.event_attr_.stay_time_
-            # 时间小于触发时间阈值,忽略并重置
-            if stay_time < self.threshold_time_ :
-                self.event_attr_.reset()
-                LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
-                return
 
-            # 构造事件
-            # 入库
-            info = {
-                "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
-                "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
-                "stay_time": stay_time
-
-            }
-            event_uuid = str(uuid.uuid4())
+            start_dt, end_dt = helper.get_query_time_range(self.param_)
             params = {
-                "dev_id": dev_id,
-                "uuid": event_uuid,
-                "plan_uuid": self.plan_uuid_,
-                "event_type": event_desc_map[self.event_type_],
-                "info": json.dumps(info),
-                "is_handle": 0,
-                "create_time": get_bj_time_s(),
-                "is_deleted": 0,
-                "remark": {}
+                "event_type": event_desc_map[EventType.TOILETING_DETECTION.value],
+                "start_dt": start_dt,
+                "end_dt": end_dt
             }
-            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+            db_req_que.put(DBRequest(
+                sql=sqls.sql_query_events_by_datetime,
+                params=params,
+                callback=self.cb_toileting_frequency))
 
-            # 通知
-            mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
-            LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
 
-            self.event_attr_.reset()
+
+    def cb_toileting_frequency(self, result):
+        try:
+            if result:
+                count = 0
+                event_list = []
+
+                for row in result:
+                    dev_id: str         = row.get("dev_id")
+                    event_uuid: str           = row.get("uuid")
+                    plan_uuid: str      = row.get("plan_uuid")
+                    event_type: int     = row.get("event_type")
+                    info: dict           = json.loads(row["info"]) if row.get("info") else {}
+                    is_handle: str      = row.get("is_handle")
+                    create_time: str    = row.get("create_time")
+                    update_time: str    = row.get("update_time")
+                    is_deleted: str     = row.get("is_deleted")
+                    remark: str         = row.get("remark")
+
+                    event_list.append(info)
+
+                this_event_uuid = str(uuid.uuid4())
+                new_param = helper.normalize_param_time(self.param_)
+                last_info = {
+                    "start_time"    : new_param["start_time"],
+                    "end_time"      : new_param["end_time"],
+                    "count"         : len(event_list),
+                    "event_list"    : event_list
+                }
+
+                # 入库
+                event_uuid = str(uuid.uuid4())
+                params = {
+                    "dev_id": dev_id,
+                    "uuid": this_event_uuid,
+                    "plan_uuid": self.plan_uuid_,
+                    "event_type": event_desc_map[self.event_type_],
+                    "info": json.dumps(last_info, ensure_ascii=False),
+                    "is_handle": 0,
+                    "create_time": get_bj_time_s(),
+                    "is_deleted": 0,
+                    "remark": json.dumps({}, ensure_ascii=False) 
+                }
+                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+                # 通知
+                mqtt_send.alarm_event(dev_id, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, "events")
+
+                LOGINFO(f"EventAttr_ToiletingFrequency succeed")
+            else:
+                LOGDBG("EventAttr_ToiletingFrequency, empty result")
 
         except json.JSONDecodeError as e:
             tb_info = traceback.extract_tb(e.__traceback__)
@@ -571,6 +595,7 @@ class AlarmPlan:
             tb_info = traceback.extract_tb(e.__traceback__)
             for frame in tb_info:
                 LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+            return
 
 
     # 夜间如厕频次统计

+ 55 - 0
core/alarm_plan_helper.py

@@ -0,0 +1,55 @@
+from datetime import datetime, timedelta
+
+
+def get_query_time_range(param: dict, now: datetime = None):
+    """
+    根据 param 的 start_time / end_time 生成查询区间,返回格式化后的字符串
+
+    规则:
+    - 如果 start_time < end_time,则区间为 [昨天.start_time, 昨天.end_time]
+    - 否则,区间为 [昨天.end_time, 今天.start_time]
+    """
+
+    now = now or datetime.now()
+
+    # 解析时间
+    start_time = datetime.strptime(param["start_time"], "%H:%M").time()
+    end_time = datetime.strptime(param["end_time"], "%H:%M").time()
+
+    today = now.date()
+    yesterday = today - timedelta(days=1)
+
+    if start_time < end_time:
+        # 区间在同一天 (昨天)
+        start_dt = datetime.combine(yesterday, start_time)
+        end_dt = datetime.combine(yesterday, end_time)
+    else:
+        # 跨天
+        start_dt = datetime.combine(yesterday, end_time)
+        end_dt = datetime.combine(today, start_time)
+
+    # 返回字符串
+    return start_dt.strftime("%Y-%m-%d %H:%M:%S"), end_dt.strftime("%Y-%m-%d %H:%M:%S")
+
+
+def normalize_param_time(param: dict, now: datetime = None):
+    """
+    把 param['start_time'], param['end_time'] (HH:MM) 转换成昨天的完整日期时间字符串
+    格式:YYYY-MM-DD HH:MM:SS
+    """
+    now = now or datetime.now()
+    yesterday = (now - timedelta(days=1)).date()
+
+    def to_datetime_str(hhmm: str) -> str:
+        t = datetime.strptime(hhmm, "%H:%M").time()
+        dt = datetime.combine(yesterday, t)
+        return dt.strftime("%Y-%m-%d %H:%M:%S")
+
+    return {
+        "start_time": to_datetime_str(param["start_time"]),
+        "end_time": to_datetime_str(param["end_time"]),
+    }
+
+
+
+

+ 25 - 17
core/alarm_plan_manager.py

@@ -108,9 +108,15 @@ class AlarmPlanManager:
 
     # 调度定时任务
     def _cron_scheduler(self):
-        last_run = {}  # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
+        import datetime
+        last_run_map = {}  # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
         while self.running:
-            now = datetime.now()
+            now = datetime.datetime.now()
+            today = now.date()
+
+            # 每天 00:00 重置执行标记
+            # todo
+
             plans = self.list_all_cron()
             plan: AlarmPlan = None
             for plan in plans:
@@ -121,23 +127,24 @@ class AlarmPlanManager:
 
                     hour = cron.get("hour", None)
                     minute = cron.get("minute", 0)
+                    run_time = datetime.datetime.combine(today, datetime.time(hour or 0, minute or 0))
 
-                    # 判断是否到达执行点
-                    if ((hour is not None and now.hour != hour) or
-                        (minute is not None and now.minute != minute)):
-                        continue
-
-                    # 防止一分钟内重复执行
-                    if last_run.get(plan.plan_uuid_) == now.date():
-                        continue
+                     # 判断是否到达执行点
+                    if now >= run_time:
+                        # 检查今天是否已经执行过
+                        if last_run_map.get(plan.plan_uuid_) == today:
+                            continue
 
-                    plan.execute()
-                    last_run[plan.plan_uuid_] = now.date()
-                    LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_}")
+                        # 执行任务
+                        plan.update_status()       # 更新状态
+                        if plan.status_ == 1:       # 激活状态才执行
+                            plan.execute()
+                            last_run_map[plan.plan_uuid_] = today
+                            LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_} at {now}")
 
                 except Exception as e:
                     LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
-            time.sleep(30)  # 每 30 秒检查一次
+            time.sleep(5)  # 每 30 秒检查一次
 
 # 分发器
 class EventDispatcher:
@@ -229,7 +236,7 @@ def cb_handle_query_all_alarm_plan_info(result):
                 enable: int     = bool(row["enable"])
                 # region          = row["region"]
                 # rect            = json.loads(region_to_rect(region))
-                rect: list      = json.loads(row["region"])
+                rect: list      = json.loads(row["region"]) if row.get("region") else []
                 threshold_time: int = row["threshold_time"]
                 merge_time: int     = row["merge_time"]
                 param: dict     = json.loads(row["param"])
@@ -245,7 +252,7 @@ def cb_handle_query_all_alarm_plan_info(result):
                 time_range  = json.loads(row["time_range"])
                 month_days = None
                 if row["month_days"]:
-                    month_days = ast.literal_eval(row["mozhenth_days"])
+                    month_days = ast.literal_eval(row["month_days"])
                 weekdays = None
                 if row["weekdays"]:
                     weekdays = ast.literal_eval(row["weekdays"])
@@ -257,7 +264,7 @@ def cb_handle_query_all_alarm_plan_info(result):
                     (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
                     (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
                     cron = {
-                        "hour": 6,
+                        "hour": 1,
                         "minute": 0
                     }
 
@@ -282,6 +289,7 @@ def cb_handle_query_all_alarm_plan_info(result):
                     param       = param,
                     cron        = cron
                 )
+
                 if alarm_plan.event_attr_ is None:
                     LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
                     continue

+ 10 - 0
db/db_sqls.py

@@ -113,4 +113,14 @@ VALUES (
 );
 """
 
+# 查询events
+sql_query_events_by_datetime = """
+SELECT *
+FROM lnxx_dev.events
+WHERE event_type = %(event_type)s
+  AND create_time BETWEEN %(start_dt)s AND %(end_dt)s
+  AND is_deleted = 0;
+"""
+
+
 

+ 3 - 3
device/dev_mng.py

@@ -164,9 +164,9 @@ def update_dev_info(dev_id:str, dev_instance:Device):
     with g_dev_map_lock:
         if dev_id in g_dev_map:
             g_dev_map[dev_id] = None
-            LOGDBG(f"update dev: {dev_id}")
-        else:
-            LOGDBG(f"new dev: {dev_id}")
+        #     LOGDBG(f"update dev: {dev_id}")
+        # else:
+        #     LOGDBG(f"new dev: {dev_id}")
 
         # todo 更新设备保活时间(伪)
         # dev_instance.set_keepalive(get_utc_time_ms())