123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476 |
- from typing import List, Tuple, Optional
- from datetime import datetime, time, date
- from threading import Thread, Lock
- from enum import Enum
- import uuid
- import json
- import traceback
- from datetime import datetime, timezone, timedelta
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR,
- get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
- utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
- )
- from core.time_plan import TimePlan
- from core.event_type import EventType, event_desc_map
- import core.g_LAS as g_las
- from device.dev_mng import (
- Device,
- dev_map_push, dev_map_pop, dev_map_find, dev_map_delete
- )
- from db.db_process import (
- db_req_que, DBRequest
- )
- import db.db_sqls as sqls
- import mqtt.mqtt_send as mqtt_send
- from device.dev_mng import g_dev_map, g_dev_map_lock
- class EventAttr_Base:
- def __init__(self):
- return
- # 事件属性 事件事件
- class EventAttr_StayDetection(EventAttr_Base):
- def __init__(self):
- self.enter_ts_: int = -1 # 进入时间(s)
- self.leave_ts_: int = -1 # 离开时间(s)
- self.stay_time_: int = -1 # 停留时长(s)
- return
- def reset(self):
- self.enter_ts_ = -1
- self.leave_ts_ = -1
- self.stay_time_ = -1
- # 事件属性 滞留事件
- class EventAttr_RetentionDetection(EventAttr_Base):
- def __init__(self):
- self.enter_ts_: int = -1 # 进入时间(s)
- self.leave_ts_: int = -1 # 离开时间(s)
- self.stay_time_: int = -1 # 停留时长(s)
- return
- def reset(self):
- self.enter_ts_ = -1
- self.leave_ts_ = -1
- self.stay_time_ = -1
- # 事件属性 如厕事件
- class EventAttr_ToiletingDetection(EventAttr_Base):
- def __init__(self):
- self.enter_ts_: int = -1 # 进入时间(ms)
- self.leave_ts_: int = -1 # 离开时间(ms)
- self.stay_time_: int = -1 # 停留时长(ms)
- return
- def reset(self):
- self.enter_ts_ = -1
- self.leave_ts_ = -1
- self.stay_time_ = -1
- class AlarmPlan:
- def __init__(self,
- plan_uuid: str,
- name: str,
- dev_id: str,
- enable: bool,
- time_plan: TimePlan,
- rect: list,
- event_type: int,
- threshold_time: int,
- merge_time: int
- ):
- self.lock_ = Lock()
- self.plan_uuid_ = plan_uuid # 计划id
- self.name_ = name # 计划名称
- self.dev_id_ = dev_id # 设备id
- self.enable_ = enable # 是否启用
- self.time_plan_ = time_plan # 时间计划
- # 维护状态(根据TimePlanu判断)
- self.status_ = 0 # 0未激活,1激活,-1过期
- self.status_update_ts_ = -1 # 状态更新时间,初始值为-1
- # 事件属性表
- self.event_attr_map = {
- EventType.STAY_DETECTION.value: EventAttr_StayDetection,
- EventType.RETENTION_DETECTION.value: EventAttr_RetentionDetection,
- EventType.TOILETING_DETECTION.value: EventAttr_ToiletingDetection
- }
- # 事件触发参数
- self.rect_ = rect # 检测区域 [left, top, width, height]
- self.threshold_time_ = threshold_time # 触发时间阈值
- self.merge_time_ = merge_time # 归并时间窗口
- self.event_type_ = event_type # 事件类型
- self.event_attr_ = self.init_event_attr() # 事件属性
- if self.event_attr_ is None:
- raise ValueError(f"Invalid event_type: {event_type}")
- def execute(self):
- if self.status_ != 1:
- return
- g_las.g_event_dispatcher.dispatch(self.event_type_, self)
- # 更新激活状态
- def update_status(self, now: Optional[datetime] = None) -> None:
- now = now or datetime.now()
- old_status = self.status_
- if not self.enable_:
- self.status_ = 0
- else:
- now_fmt = now.strftime("%Y-%m-%d")
- # 过期
- if now_fmt > self.time_plan_.stop_date_:
- self.status_ = -1
- elif now_fmt < self.time_plan_.start_date_:
- self.status_ = 0
- elif self.time_plan_.is_active_now(now):
- self.status_ = 1
- else:
- self.status_ = 0
- if self.status_ != old_status:
- self.status_update_ts = int(now.timestamp())
- LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}")
- def is_point_in_rect(self, x:float, y:float, rect:list) -> bool:
- rx, ry, rw, rh = rect
- x_min = min(rx, rx + rw)
- x_max = max(rx, rx + rw)
- y_min = min(ry, ry - rh)
- y_max = max(ry, ry - rh)
- bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max
- return bRet
- # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit
- def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=1):
- now_s = now if now else get_utc_time_s()
- rtd_que_copy = device.get_rtd_que_copy()
- with self.lock_:
- for rtd_unit in reversed(rtd_que_copy): # 倒序扫描
- ts_s = int(rtd_unit.get("timestamp", 0))
- if now_s - ts_s > t:
- break # 已经超过 t 秒,可以直接结束
- # 检查点是否在区域内
- for pt in rtd_unit.get("target_point", []):
- if len(pt) >= 2:
- x, y = pt[0], pt[1]
- if self.is_point_in_rect(x, y, rect):
- return rtd_unit
- return None
- # 初始化事件属性
- def init_event_attr(self):
- event_cls = self.event_attr_map.get(self.event_type_)
- if event_cls is None:
- return None
- event_attr = event_cls()
- return event_attr
- # 停留事件
- def handle_stay_detection(self):
- try:
- dev_id = self.dev_id_
- device:Device = dev_map_find(dev_id)
- if not device:
- return
- now = get_utc_time_s()
- # 查找最新的落在检测区域的目标
- rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
- if rtd_unit:
- timestamp = rtd_unit["timestamp"]
- pose = rtd_unit["pose"]
- target_point = rtd_unit["target_point"]
- if self.event_attr_.enter_ts_ == -1:
- self.event_attr_.enter_ts_ = timestamp
- else:
- self.event_attr_.leave_ts_ = timestamp
- if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
- return
- # 归并时间内,不认为事件结束
- if now - self.event_attr_.leave_ts_ < self.merge_time_:
- return
- self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
- stay_time =self.event_attr_.stay_time_
- # 时间小于触发时间阈值,忽略并重置
- if stay_time < self.threshold_time_ :
- self.event_attr_.reset()
- LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
- return
- # 构造事件
- # 入库
- info = {
- "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": event_desc_map[self.event_type_],
- "info": json.dumps(info),
- "is_handle": 0,
- "is_deleted": 0
- }
- db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 滞留事件
- def handle_retention_detection(self):
- try:
- dev_id = self.dev_id_
- device:Device = dev_map_find(dev_id)
- if not device:
- return
- now = get_utc_time_s()
- # 查找最新的落在检测区域的目标
- rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
- if rtd_unit:
- timestamp = rtd_unit["timestamp"]
- pose = rtd_unit["pose"]
- target_point = rtd_unit["target_point"]
- if self.event_attr_.enter_ts_ == -1:
- self.event_attr_.enter_ts_ = timestamp
- else:
- self.event_attr_.leave_ts_ = timestamp
- if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
- return
- # 归并时间内,不认为事件结束
- if now - self.event_attr_.leave_ts_ < self.merge_time_:
- return
- self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
- stay_time =self.event_attr_.stay_time_
- # 时间小于触发时间阈值,忽略并重置
- if stay_time < self.threshold_time_ :
- self.event_attr_.reset()
- LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
- return
- # 构造事件
- # 入库
- info = {
- "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": event_desc_map[self.event_type_],
- "info": json.dumps(info),
- "is_handle": 0,
- "is_deleted": 0
- }
- db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 如厕事件
- def handle_toileting_detection(self):
- try:
- dev_id = self.dev_id_
- device:Device = dev_map_find(dev_id)
- if not device:
- return
- now = get_utc_time_s()
- # 查找最新的落在检测区域的目标
- rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
- if rtd_unit:
- timestamp = rtd_unit["timestamp"]
- pose = rtd_unit["pose"]
- target_point = rtd_unit["target_point"]
- if self.event_attr_.enter_ts_ == -1:
- self.event_attr_.enter_ts_ = timestamp
- else:
- self.event_attr_.leave_ts_ = timestamp
- if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
- return
- # 归并时间内,不认为事件结束
- if now - self.event_attr_.leave_ts_ < self.merge_time_:
- return
- self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
- stay_time =self.event_attr_.stay_time_
- # 时间小于触发时间阈值,忽略并重置
- if stay_time < self.threshold_time_ :
- self.event_attr_.reset()
- LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
- return
- # 构造事件
- # 入库
- info = {
- "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": event_desc_map[self.event_type_],
- "info": json.dumps(info),
- "is_handle": 0,
- "is_deleted": 0
- }
- db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 如厕频次统计
- def handle_toileting_frequency(self):
- try:
- dev_id = self.dev_id_
- device:Device = dev_map_find(dev_id)
- if not device:
- return
- now = get_utc_time_s()
- # 查找最新的落在检测区域的目标
- rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
- if rtd_unit:
- timestamp = rtd_unit["timestamp"]
- pose = rtd_unit["pose"]
- target_point = rtd_unit["target_point"]
- if self.event_attr_.enter_ts_ == -1:
- self.event_attr_.enter_ts_ = timestamp
- else:
- self.event_attr_.leave_ts_ = timestamp
- if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
- return
- # 归并时间内,不认为事件结束
- if now - self.event_attr_.leave_ts_ < self.merge_time_:
- return
- self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
- stay_time =self.event_attr_.stay_time_
- # 时间小于触发时间阈值,忽略并重置
- if stay_time < self.threshold_time_ :
- self.event_attr_.reset()
- LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
- return
- # 构造事件
- # 入库
- info = {
- "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": event_desc_map[self.event_type_],
- "info": json.dumps(info),
- "is_handle": 0,
- "is_deleted": 0
- }
- db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
|