from typing import List, Tuple, Optional from datetime import datetime, time, date from threading import Thread, Lock from enum import Enum import uuid import json import traceback from datetime import datetime, timezone, timedelta from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s ) from core.time_plan import TimePlan from core.event_type import EventType, event_desc_map import core.g_LAS as g_las from device.dev_mng import ( Device, dev_map_push, dev_map_pop, dev_map_find, dev_map_delete ) from db.db_process import ( db_req_que, DBRequest ) import db.db_sqls as sqls import mqtt.mqtt_send as mqtt_send from device.dev_mng import g_dev_map, g_dev_map_lock class EventAttr_Base: def __init__(self): return # 事件属性 事件事件 class EventAttr_StayDetection(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(s) self.leave_ts_: int = -1 # 离开时间(s) self.stay_time_: int = -1 # 停留时长(s) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 滞留事件 class EventAttr_RetentionDetection(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(s) self.leave_ts_: int = -1 # 离开时间(s) self.stay_time_: int = -1 # 停留时长(s) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 如厕事件 class EventAttr_ToiletingDetection(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 如厕频次统计 class EventAttr_ToiletingFrequency(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 夜间如厕频次统计 class EventAttr_NightToiletingFrequency(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 如厕频次异常 class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 起夜异常 class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 卫生间频次统计 class EventAttr_BathroomStayFrequency(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 异常消失 class EventAttr_TargetAbsence(EventAttr_Base): def __init__(self): self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 class AlarmPlan: def __init__(self, plan_uuid: str, name: str, dev_id: str, enable: bool, time_plan: TimePlan, rect: list, event_type: int, threshold_time: int, merge_time: int, param: dict ): self.lock_ = Lock() self.plan_uuid_ = plan_uuid # 计划id self.name_ = name # 计划名称 self.dev_id_ = dev_id # 设备id self.enable_ = enable # 是否启用 self.time_plan_ = time_plan # 时间计划 self.param_ = param # 参数 # 维护状态(根据TimePlanu判断) self.status_ = 0 # 0未激活,1激活,-1过期 self.status_update_ts_ = -1 # 状态更新时间,初始值为-1 # 事件属性表 self.event_attr_map = { EventType.STAY_DETECTION.value : EventAttr_StayDetection, EventType.RETENTION_DETECTION.value : EventAttr_RetentionDetection, EventType.TOILETING_DETECTION.value : EventAttr_ToiletingDetection, EventType.TOILETING_FREQUENCY.value : EventAttr_ToiletingFrequency, EventType.NIGHT_TOILETING_FREQUENCY.value : EventAttr_NightToiletingFrequency, EventType.TOILETING_FREQUENCY_ABNORMAL.value : EventAttr_ToiletingFrequencyAbnormal, EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal, EventType.BATHROOM_STAY_FREQUENCY.value : EventAttr_BathroomStayFrequency, EventType.TARGET_ABSENCE.value : EventAttr_TargetAbsence, } # 事件触发参数 self.rect_ = rect # 检测区域 [left, top, width, height] self.threshold_time_ = threshold_time # 触发时间阈值 self.merge_time_ = merge_time # 归并时间窗口 self.event_type_ = event_type # 事件类型 self.event_attr_ = self.init_event_attr() # 事件属性 if self.event_attr_ is None: raise ValueError(f"Invalid event_type: {event_type}") def execute(self): if self.status_ != 1: return g_las.g_event_dispatcher.dispatch(self.event_type_, self) # 更新激活状态 def update_status(self, now: Optional[datetime] = None) -> None: now = now or datetime.now() old_status = self.status_ if not self.enable_: self.status_ = 0 else: now_fmt = now.strftime("%Y-%m-%d") # 过期 if now_fmt > self.time_plan_.stop_date_: self.status_ = -1 elif now_fmt < self.time_plan_.start_date_: self.status_ = 0 elif self.time_plan_.is_active_now(now): self.status_ = 1 else: self.status_ = 0 if self.status_ != old_status: self.status_update_ts = int(now.timestamp()) LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}") def is_point_in_rect(self, x:float, y:float, rect:list) -> bool: rx, ry, rw, rh = rect x_min = min(rx, rx + rw) x_max = max(rx, rx + rw) y_min = min(ry, ry - rh) y_max = max(ry, ry - rh) bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max return bRet # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=1): now_s = now if now else get_utc_time_s() rtd_que_copy = device.get_rtd_que_copy() with self.lock_: for rtd_unit in reversed(rtd_que_copy): # 倒序扫描 ts_s = int(rtd_unit.get("timestamp", 0)) if now_s - ts_s > t: break # 已经超过 t 秒,可以直接结束 # 检查点是否在区域内 for pt in rtd_unit.get("target_point", []): if len(pt) >= 2: x, y = pt[0], pt[1] if self.is_point_in_rect(x, y, rect): return rtd_unit return None # 初始化事件属性 def init_event_attr(self): event_cls = self.event_attr_map.get(self.event_type_) if event_cls is None: return None event_attr = event_cls() return event_attr # 停留事件 def handle_stay_detection(self): try: dev_id = self.dev_id_ device:Device = dev_map_find(dev_id) if not device: return now = get_utc_time_s() # 查找最新的落在检测区域的目标 rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3) if rtd_unit: timestamp = rtd_unit["timestamp"] pose = rtd_unit["pose"] target_point = rtd_unit["target_point"] if self.event_attr_.enter_ts_ == -1: self.event_attr_.enter_ts_ = timestamp else: self.event_attr_.leave_ts_ = timestamp if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1: return # 归并时间内,不认为事件结束 if now - self.event_attr_.leave_ts_ < self.merge_time_: return self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_ stay_time =self.event_attr_.stay_time_ # 时间小于触发时间阈值,忽略并重置 if stay_time < self.threshold_time_ : self.event_attr_.reset() LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}") return # 构造事件 # 入库 info = { "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_), "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": event_desc_map[self.event_type_], "info": json.dumps(info), "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "remark": {} } db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 滞留事件 def handle_retention_detection(self): try: dev_id = self.dev_id_ device:Device = dev_map_find(dev_id) if not device: return now = get_utc_time_s() # 查找最新的落在检测区域的目标 rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3) if rtd_unit: timestamp = rtd_unit["timestamp"] pose = rtd_unit["pose"] target_point = rtd_unit["target_point"] if self.event_attr_.enter_ts_ == -1: self.event_attr_.enter_ts_ = timestamp else: self.event_attr_.leave_ts_ = timestamp if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1: return # 归并时间内,不认为事件结束 if now - self.event_attr_.leave_ts_ < self.merge_time_: return self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_ stay_time =self.event_attr_.stay_time_ # 时间小于触发时间阈值,忽略并重置 if stay_time < self.threshold_time_ : self.event_attr_.reset() LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}") return # 构造事件 # 入库 info = { "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_), "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": event_desc_map[self.event_type_], "info": json.dumps(info), "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "remark": {} } db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 如厕事件 def handle_toileting_detection(self): try: dev_id = self.dev_id_ device:Device = dev_map_find(dev_id) if not device: return now = get_utc_time_s() # 查找最新的落在检测区域的目标 rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3) if rtd_unit: timestamp = rtd_unit["timestamp"] pose = rtd_unit["pose"] target_point = rtd_unit["target_point"] if self.event_attr_.enter_ts_ == -1: self.event_attr_.enter_ts_ = timestamp else: self.event_attr_.leave_ts_ = timestamp if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1: return # 归并时间内,不认为事件结束 if now - self.event_attr_.leave_ts_ < self.merge_time_: return self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_ stay_time =self.event_attr_.stay_time_ # 时间小于触发时间阈值,忽略并重置 if stay_time < self.threshold_time_ : self.event_attr_.reset() LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}") return # 构造事件 # 入库 info = { "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_), "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": event_desc_map[self.event_type_], "info": json.dumps(info), "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "remark": {} } db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 如厕频次统计 def handle_toileting_frequency(self): try: dev_id = self.dev_id_ device:Device = dev_map_find(dev_id) if not device: return now = get_utc_time_s() # 查找最新的落在检测区域的目标 rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3) if rtd_unit: timestamp = rtd_unit["timestamp"] pose = rtd_unit["pose"] target_point = rtd_unit["target_point"] if self.event_attr_.enter_ts_ == -1: self.event_attr_.enter_ts_ = timestamp else: self.event_attr_.leave_ts_ = timestamp if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1: return # 归并时间内,不认为事件结束 if now - self.event_attr_.leave_ts_ < self.merge_time_: return self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_ stay_time =self.event_attr_.stay_time_ # 时间小于触发时间阈值,忽略并重置 if stay_time < self.threshold_time_ : self.event_attr_.reset() LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}") return # 构造事件 # 入库 info = { "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_), "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": event_desc_map[self.event_type_], "info": json.dumps(info), "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "remark": {} } db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 夜间如厕频次统计 def handle_night_toileting_frequency(self): return # 如厕频次异常 def handle_toileting_frequency_abnormal(self): return # 起夜异常 def handle_night_toileting_frequency_abnormal(self): return # 卫生间频次统计 def handle_bathroom_stay_frequency(self): return # 异常消失 def handle_target_absence(self): return