from typing import List, Tuple, Optional from datetime import datetime, time, date from threading import Thread, Lock from enum import Enum import uuid import json import traceback from datetime import datetime, timezone, timedelta from collections import deque import common.sys_comm as sys_comm from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, EC, get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s ) from core.time_plan import TimePlan from core.event_type import EventType, event_desc_map import core.g_LAS as g_las import core.alarm_plan_helper as helper from core.linkage_action import LinkageAction from db.db_process import ( db_req_que, DBRequest_Async ) from db.db_process import ( db_execute_sync, db_execute_async ) import db.db_sqls as sqls import mqtt.mqtt_send as mqtt_send import device.dev_mng as g_Dev from device.dev_mng import ( Device, g_dev_mgr ) class EventAttr_Base: def __init__(self): return # 事件属性 事件事件 class EventAttr_StayDetection(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.enter_ts_: int = -1 # 进入时间(s) self.leave_ts_: int = -1 # 离开时间(s) self.stay_time_: int = -1 # 停留时长(s) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 滞留事件 class EventAttr_RetentionDetection(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.enter_ts_: int = -1 # 进入时间(s) self.leave_ts_: int = -1 # 离开时间(s) self.stay_time_: int = -1 # 停留时长(s) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 如厕事件 class EventAttr_ToiletingDetection(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.enter_ts_: int = -1 # 进入时间(ms) self.leave_ts_: int = -1 # 离开时间(ms) self.stay_time_: int = -1 # 停留时长(ms) return def reset(self): self.enter_ts_ = -1 self.leave_ts_ = -1 self.stay_time_ = -1 # 事件属性 如厕频次统计 class EventAttr_ToiletingFrequency(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.count_: int = 0 # 统计次数 self.event_list: list = [] return def reset(self): self.count_ = 0 self.event_list = [] # 事件属性 夜间如厕频次统计 class EventAttr_NightToiletingFrequency(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.count_: int = 0 # 统计次数 self.event_list: list = [] return def reset(self): self.count_ = 0 self.event_list = [] # 事件属性 如厕频次异常 class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.count_: int = 0 # 统计次数 self.threshold_count_: int = 0 # 异常阈值 self.event_list: list = [] return def reset(self): self.count_ = 0 self.event_list = [] # 事件属性 起夜异常 class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.count_: int = 0 # 统计次数 self.threshold_count_: int = 0 # 异常阈值 self.event_list: list = [] return def reset(self): self.count_ = 0 self.event_list = [] # 事件属性 卫生间频次统计 class EventAttr_BathroomStayFrequency(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.count_: int = 0 # 统计次数 self.event_list: list = [] return def reset(self): self.count_ = 0 self.event_list = [] # 事件属性 异常消失 class EventAttr_TargetAbsence(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.leave_ts_: int = -1 # 离开时间(ms) self.enter_ts_: int = -1 # 进入时间(ms) self.absence_time_: int = -1 # 消失时长(ms) self.time_threshold_: int = 300 # 触发消失时间阈值(ms) return def reset(self): self.leave_ts_ = -1 self.enter_ts_ = -1 self.absence_time_ = -1 # 事件属性 睡眠监测 class EventAttr_SleepMonitoring(EventAttr_Base): motion_stat = ["peaceful", "micro", "active", "leave"] # 空间状态 # 呼吸率划分:r0:[0-8) r1:[8,12) r2:[12,18) r3:[18,25] r4:25以上 breathe_stat = ["r0", "r1", "r2", "r3", "r4"] # 呼吸状态:呼吸过慢/异常,深睡,浅睡,REM/清醒,呼吸过快/清醒 # 深睡(deep),浅睡(light),REM,清醒(awake),离床(leave) sleep_stat = [ ["deep", "deep", "light", "REM", "awake"], # peaceful ["deep", "light", "light", "REM", "awake"], # micro ["awake", "awake", "awake", "awake", "awake"], # active ["leave", "leave", "leave", "leave", "leave"]] # leave # 异常:呼吸过慢(slow_breathe),呼吸过快(fast_breathe) def __init__(self, event_type): self.start_sleep_ts_: int = -1 # 睡眠开始时间 self.end_sleep_ts_: int = -1 # 睡眠结束时间 self.motion_stat_ = None # 当前运动状态 self.breathe_stat_ = None # 当前呼吸状态 self.sleep_stat_ = None # 当前睡眠状态 self.sleep_segments_ = [] # 状态阶段记录 [{"ts":xxx, "stat":xxx}, ...] self.last_motion_ = 0.0 # 最新的运动状态 self.last_breath_ = 0.0 # 最新的呼吸率 self.last_pts_ = [] # 最后的点目标 self.last_update_ts_ = -1 self.miss_target_count_ = 0 return # 事件属性 清理过期事件 class EventAttr_CleanExpireEvents(EventAttr_Base): def __init__(self, event_type): self.event_type_ = event_type self.expire_range_ = 90 # 事件属性表 event_attr_map = { EventType.STAY_DETECTION.value : EventAttr_StayDetection, EventType.RETENTION_DETECTION.value : EventAttr_RetentionDetection, EventType.TOILETING_DETECTION.value : EventAttr_ToiletingDetection, EventType.TOILETING_FREQUENCY.value : EventAttr_ToiletingFrequency, EventType.NIGHT_TOILETING_FREQUENCY.value : EventAttr_NightToiletingFrequency, EventType.TOILETING_FREQUENCY_ABNORMAL.value : EventAttr_ToiletingFrequencyAbnormal, EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal, EventType.BATHROOM_STAY_FREQUENCY.value : EventAttr_BathroomStayFrequency, EventType.TARGET_ABSENCE.value : EventAttr_TargetAbsence, EventType.SLEEP_MONITORING.value : EventAttr_SleepMonitoring, EventType.CLEAN_EXPIRE_EVENTS.value : EventAttr_CleanExpireEvents, } class Cron: def __init__(self, h, m, s): self.h_ = None self.m_ = None self.s_ = None class AlarmPlan: # 注册handle handles_map = { EventType.STAY_DETECTION.value : lambda plan: AlarmPlan.handle_stay_detection(plan), EventType.RETENTION_DETECTION.value : lambda plan: AlarmPlan.handle_retention_detection(plan), EventType.TOILETING_DETECTION.value : lambda plan: AlarmPlan.handle_toileting_detection(plan), EventType.TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_toileting_frequency(plan), EventType.NIGHT_TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_night_toileting_frequency(plan), EventType.TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_toileting_frequency_abnormal(plan), EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_night_toileting_frequency_abnormal(plan), EventType.BATHROOM_STAY_FREQUENCY.value : lambda plan: AlarmPlan.handle_bathroom_stay_frequency(plan), EventType.TARGET_ABSENCE.value : lambda plan: AlarmPlan.handle_target_absence(plan), EventType.SLEEP_MONITORING.value : lambda plan: AlarmPlan.handle_sleep_monitoring(plan), # 平台事件(任务) EventType.CLEAN_EXPIRE_EVENTS.value : lambda plan: AlarmPlan.handle_clear_expire_events(plan), } def __init__(self, plan_uuid: str, name: str, dev_id: str, dev_name: str, enable: bool, time_plan: TimePlan, rect: list, event_type: int, threshold_time: int, merge_time: int, param: dict, cron: Optional[dict] = None, linkage_action: LinkageAction = LinkageAction(), tenant_id:int = 0 ): self.lock_ = Lock() self.plan_uuid_ = plan_uuid # 计划id self.name_ = name # 计划名称 self.dev_id_ = dev_id # 设备id self.dev_name_ = dev_name # 设备名称 self.enable_ = enable # 是否启用 self.time_plan_ = time_plan # 时间计划 self.param_ = param # 参数 self.linkage_action_ = linkage_action # 联动动作 self.tenant_id_ = tenant_id # 租户id # 维护状态(根据TimePlan判断) self.status_ = 0 # 0未激活,1激活,-1过期 self.status_update_ts_ = -1 # 状态更新时间,初始值为-1 # 事件触发参数 self.event_type_ = event_type # 事件类型 self.rect_ = rect # 检测区域 [left, top, width, height] self.threshold_time_ = threshold_time # 触发时间阈值 self.merge_time_ = merge_time # 归并时间窗口 self.event_attr_ = self.init_event_attr() # 事件属性 if self.event_attr_ is None: raise ValueError(f"Invalid event_type: {event_type}") self.handle_func_ = self.handles_map.get(event_type) if self.handle_func_ is None: raise ValueError(f"Invalic event_type: {event_type}") # 计划任务的开始时间 self.cron_ = cron # {“hour": 7, "minute": 0} def execute(self): if self.status_ != 1: return g_las.g_alarm_plan_disp.dispatch(self) # 初始化事件属性 def init_event_attr(self): try: event_cls = event_attr_map.get(self.event_type_) if event_cls is None: return None event_attr = event_cls(self.event_type_) if ((self.event_type_ == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or (self.event_type_ == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value)): event_attr.threshold_count_ = int(self.param_.get("count", 0)) elif ((self.event_type_ == EventType.TARGET_ABSENCE.value)): event_attr.time_threshold_ = int(self.param_.get("time_threshold", 0)) elif ((self.event_type_ == EventType.CLEAN_EXPIRE_EVENTS.value)): event_attr.expire_range_ = 90 return event_attr except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 更新激活状态 def update_status(self, now: Optional[datetime] = None) -> None: now = now or datetime.now() old_status = self.status_ if not self.enable_: self.status_ = 0 else: now_fmt = now.strftime("%Y-%m-%d") # 过期 if now_fmt > self.time_plan_.stop_date_: self.status_ = -1 elif now_fmt < self.time_plan_.start_date_: self.status_ = 0 elif self.time_plan_.is_active_now(now): self.status_ = 1 else: self.status_ = 0 if self.status_ != old_status: self.status_update_ts = int(now.timestamp()) LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}") def is_point_in_rect(self, x:float, y:float, rect:list) -> bool: rx, ry, rw, rh = rect x_min = min(rx, rx + rw) x_max = max(rx, rx + rw) y_min = min(ry, ry - rh) y_max = max(ry, ry - rh) bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max return bRet # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=5): now_s = now if now else get_utc_time_s() rtd_que_copy = device.get_rtd_que_copy() with self.lock_: for rtd_unit in reversed(rtd_que_copy): # 倒序扫描 ts_s = int(rtd_unit.get("timestamp", 0)) if now_s - ts_s > t: break # 已经超过 t 秒,可以直接结束 # 检查点是否在区域内 for pt in rtd_unit.get("target_point", []): if len(pt) >= 2: x, y = pt[0], pt[1] if self.is_point_in_rect(x, y, rect): return rtd_unit return None # 检查关键时间点, 获取停留时间 def get_stay_time(self, device: Device, t: int=5): now = get_utc_time_s() rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, t) with self.lock_: if rtd_unit: timestamp = rtd_unit["timestamp"] if self.event_attr_.enter_ts_ == -1: self.event_attr_.enter_ts_ = timestamp LOGINFO(f"detected target enter, plan_uuid: {self.plan_uuid_}") else: self.event_attr_.leave_ts_ = timestamp if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1: return EC.EC_FAILED # 归并时间内,不认为事件结束 if now - self.event_attr_.leave_ts_ < self.merge_time_: return EC.EC_FAILED self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_ return self.event_attr_.stay_time_ # 检查停留时间 def check_stay_time(self, stay_time: int): with self.lock_: # 时间小于触发时间阈值,忽略并重置 if stay_time < self.threshold_time_ : self.event_attr_.reset() LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}") return EC.EC_FAILED LOGINFO(f"detected target leave, plan_uuid: {self.plan_uuid_}") return 0 # 检查关键时间点, 获取消失时间 def get_absence_time(self, device: Device, t: int=5): now = get_utc_time_s() # 查找最新的落在检测区域的目标 rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3) if rtd_unit: return EC.EC_FAILED with self.lock_: if self.event_attr_.leave_ts_ == -1: self.event_attr_.leave_ts_ = now LOGINFO(f"detected target leave, plan_uuid: {self.plan_uuid_}") else: self.event_attr_.enter_ts_ = now if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1: return EC.EC_FAILED # 归并时间内,不认为事件结束 if now - self.event_attr_.enter_ts_ < self.merge_time_: return EC.EC_FAILED self.event_attr_.absence_time_ = self.event_attr_.enter_ts_ - self.event_attr_.leave_ts_ return self.event_attr_.absence_time_ # 检查消失时间 def check_absence_time(self, absence_time: int): with self.lock_: # 时间小于触发时间阈值,忽略并重置 if absence_time < self.event_attr_.time_threshold_: self.event_attr_.reset() LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}") return EC.EC_FAILED LOGINFO(f"detected target enter, plan_uuid: {self.plan_uuid_}") return 0 # 停留事件 def handle_stay_detection(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return # 获取停留时间 stay_time = self.get_stay_time(device) if stay_time == EC.EC_FAILED: return # 检查停留时间 if self.check_stay_time(stay_time) == EC.EC_FAILED: return # 构造事件 # 入库 info = { "start_time": utc_to_bj_s(self.event_attr_.enter_ts_), "end_time": utc_to_bj_s(self.event_attr_.leave_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(info), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 滞留事件 def handle_retention_detection(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return # 获取停留时间 stay_time = self.get_stay_time(device) if stay_time == EC.EC_FAILED: return # 检查停留时间 if self.check_stay_time(stay_time) == EC.EC_FAILED: return # 构造事件 # 入库 info = { "start_time": utc_to_bj_s(self.event_attr_.enter_ts_), "end_time": utc_to_bj_s(self.event_attr_.leave_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(info), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 如厕事件 def handle_toileting_detection(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return # 获取停留时间 stay_time = self.get_stay_time(device) if stay_time == EC.EC_FAILED: return # 检查停留时间 if self.check_stay_time(stay_time) == EC.EC_FAILED: return # 构造事件 # 入库 info = { "start_time": utc_to_bj_s(self.event_attr_.enter_ts_), "end_time": utc_to_bj_s(self.event_attr_.leave_ts_), "stay_time": stay_time } event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(info), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 如厕频次统计 def handle_toileting_frequency(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return start_dt, end_dt = helper.get_query_time_range(self.param_) params = { "dev_id" : self.dev_id_, "event_type": EventType.TOILETING_DETECTION.value, "start_dt": start_dt, "end_dt": end_dt } userdata = { "start_dt" : start_dt, "end_dt" : end_dt } db_req_que.put(DBRequest_Async( sql=sqls.sql_query_events_by_datetime, params=params, callback=self.cb_toileting_frequency, userdata=userdata)) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 夜间如厕频次统计 def handle_night_toileting_frequency(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return start_dt, end_dt = helper.get_query_time_range(self.param_) params = { "dev_id" : self.dev_id_, "event_type": EventType.TOILETING_DETECTION.value, "start_dt": start_dt, "end_dt": end_dt } userdata = { "start_dt" : start_dt, "end_dt" : end_dt } db_req_que.put(DBRequest_Async( sql=sqls.sql_query_events_by_datetime, params=params, callback=self.cb_night_toileting_frequency, userdata=userdata)) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 如厕频次异常 def handle_toileting_frequency_abnormal(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return start_dt, end_dt = helper.get_query_time_range(self.param_) params = { "dev_id" : self.dev_id_, "event_type": EventType.TOILETING_DETECTION.value, "start_dt": start_dt, "end_dt": end_dt } userdata = { "start_dt" : start_dt, "end_dt" : end_dt } db_req_que.put(DBRequest_Async( sql=sqls.sql_query_events_by_datetime, params=params, callback=self.cb_toileting_frequency_abnormal, userdata=userdata)) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 起夜异常 def handle_night_toileting_frequency_abnormal(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return start_dt, end_dt = helper.get_query_time_range(self.param_) params = { "dev_id" : self.dev_id_, "event_type": EventType.TOILETING_DETECTION.value, "start_dt": start_dt, "end_dt": end_dt } userdata = { "start_dt" : start_dt, "end_dt" : end_dt } db_req_que.put(DBRequest_Async( sql=sqls.sql_query_events_by_datetime, params=params, callback=self.cb_night_toileting_frequency_abnormal, userdata=userdata)) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 卫生间频次统计 def handle_bathroom_stay_frequency(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return start_dt, end_dt = helper.get_query_time_range(self.param_) params = { "dev_id" : self.dev_id_, "event_type": EventType.STAY_DETECTION.value, "start_dt": start_dt, "end_dt": end_dt } userdata = { "start_dt" : start_dt, "end_dt" : end_dt } db_req_que.put(DBRequest_Async( sql=sqls.sql_query_events_by_datetime, params=params, callback=self.cb_bathroom_stay_frequency, userdata=userdata)) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 异常消失 def handle_target_absence(self): try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return # 获取消失时间 absence_time = self.get_absence_time(device) if absence_time == EC.EC_FAILED: return # 检查消失时间 if self.check_absence_time(absence_time) == EC.EC_FAILED: return # 构造事件 # 入库 info = { "start_time": utc_to_bj_s(self.event_attr_.leave_ts_), "end_time": utc_to_bj_s(self.event_attr_.enter_ts_), "absence_time": absence_time } event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(info), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events") LOGDBG(f"new event: {event_desc_map[self.event_type_]}, absence_time: {absence_time}, dev: {dev_id}, plan: {self.plan_uuid_}") self.event_attr_.reset() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 睡眠监测 def handle_sleep_monitoring(self): # ---------------- 参数配置 ---------------- NO_TARGET_TIMEOUT_S = 3 # 超过3秒无数据认为无目标 NO_TARGET_MAX_COUNT = 3 # 连续3次无目标后确定离床 MOTION_SMOOTH_WINDOW = 10 # 平滑窗口大小(取最近10帧计算平均) STAY_THRESHOLD_PEACEFUL = 0.05 # 静止阈值 STAY_THRESHOLD_MICRO = 0.15 # 微动阈值 LEAVE_BED_TS = 3 # 离床判定时间阈值 try: dev_id = self.dev_id_ device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not device: return if (not self.rect_): return rtd_list = device.get_rtd_que_copy() if not rtd_list: return # 初始化状态机 if not hasattr(self, "sleep_stat_"): self.sleep_stat_: str = "leave" # 当前睡眠状态 self.update_sleep_stat_: str = "leave" # 要更新的睡眠状态 self.avg_motion_: float = 0.0 # 当前运动幅值 self.motion_stat_: str = "leave" # 当前运动状态 self.avg_breath_: float = 0.0 # 当前呼吸率 self.breathe_stat_: str = "r0" # 当前呼吸状态 self.start_sleep_ts_: int = -1 # 睡眠开始时间 self.end_sleep_ts_: int = -1 # 睡眠结束时间 self.motion_window = deque(maxlen=MOTION_SMOOTH_WINDOW) # 近 N 次运动距离均值 self.sleep_segments_ = [] # 状态阶段记录 [{"ts":xxx, "stat":xxx}, ...] self.last_pts_ = [] # 最后的点目标 self.last_update_ts_ = -1 self.miss_target_count_ = 0 self.last_leave_ts = -1 # 上次离床判定时间,离床判定时间超过5秒视为离床事件 now_ts = get_utc_time_s() rtd_unit = rtd_list[-1] ts = rtd_unit["timestamp"] target_point = rtd_unit["target_point"] ## 1. 空间状态分析 if now_ts - ts > NO_TARGET_TIMEOUT_S: ## 目标不存在 # 起夜 x, y, z, snr = target_point if not helper.is_point_in_rect(x, y, self.rect_): motion = "leave" # 检测不到体动 if self.start_sleep_ts_ == -1: return else: motion = "peaceful" else: ## 目标存在 x = target_point[0] y = target_point[1] if not helper.is_point_in_rect(x, y, self.rect_): # 不在床上 motion = "leave" else: # 在床上 # 计算体动 WINDOWS_S = 10 # 滑动事件窗口 recent_rtds = [r for r in rtd_list if now_ts - r["timestamp"] <= WINDOWS_S] if len(recent_rtds) < 5: return # 取最新点 x,y,z,snr = recent_rtds[-1]["target_point"] # 检查是否在床上 if not helper.is_point_in_rect(x, y, self.rect_): if self.last_leave_ts == -1: self.last_leave_ts = now_ts elif now_ts - self.last_leave_ts > LEAVE_BED_TS: motion = "leave" else: self.last_leave_ts = -1 # 计算位移序列 motions = [] for i in range(1, len(recent_rtds)): x1, y1, z1, _ = recent_rtds[i - 1]["target_point"] x2, y2, z2, _ = recent_rtds[i]["target_point"] dist = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5 motions.append(dist) if motions: avg_motion = sum(motions) / len(motions) self.motion_window.append(avg_motion) motion_smooth = sum(self.motion_window) / len(self.motion_window) else: motion_smooth = 0 # 状态判定 if motion_smooth < STAY_THRESHOLD_PEACEFUL: motion = "peaceful" elif motion_smooth < STAY_THRESHOLD_MICRO: motion = "mocro" else: motion = "active" self.motion_stat_ = motion ## 2. 呼吸率分析 BREATHE_WINDOWS_S = 5 # 呼吸滑动时间窗口 recent_breaths =[r["breath_rpm"] for r in rtd_list if (now_ts - r["timestamp"] <= BREATHE_WINDOWS_S) and ("breath_rpm" in r and isinstance(r["breath_rpm"], (int, float)))] if not recent_breaths: breathe_stat = "r0" # 无数据时视为异常/无呼吸 avg_breath = 0.0 else: avg_breath = sum(recent_breaths) / len(recent_breaths) if avg_breath < 8: breathe_stat = "r0" elif avg_breath < 12: breathe_stat = "r1" elif avg_breath < 18: breathe_stat = "r1" else: breathe_stat = "r1" self.avg_breath_ = avg_breath self.breathe_stat_ = breathe_stat ## 3. 睡眠状态分析 try: i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_) i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat) self.update_sleep_stat_ = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath] except ValueError: LOGERR(f"infalid i_montion or i_breath") # 3.1 更新睡眠报告 sleep_node = { "ts": now_ts, "sleep_stat": self.update_sleep_stat_ } self.sleep_segments_.append(sleep_node) self.sleep_stat_ = self.update_sleep_stat_ # 1. 分析空间状态 # 2. 分析呼吸率 # 3. 分析睡眠状态 return except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 清理过期事件 def handle_clear_expire_events(self): try: params = { "save_days": self.event_attr_.expire_range_ } db_execute_async(sqls.sql_delete_expire_events, params=params) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # ========== 一些回调 ========== # 如厕频次统计回调 def cb_toileting_frequency(self, result, userdata): try: if result: count = 0 event_list = [] for row in result: dev_id: str = row.get("dev_id") event_uuid: str = row.get("uuid") plan_uuid: str = row.get("plan_uuid") event_type: int = row.get("event_type") info: dict = json.loads(row["info"]) if row.get("info") else {} is_handle: str = row.get("is_handle") create_time: str = row.get("create_time") update_time: str = row.get("update_time") is_deleted: str = row.get("is_deleted") remark: str = row.get("remark") event_list.append(info) this_event_uuid = str(uuid.uuid4()) last_info = { "start_time" : userdata["start_dt"], "end_time" : userdata["end_dt"], "count" : len(event_list), "event_list" : event_list } # 入库 event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": this_event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(last_info, ensure_ascii=False), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events") LOGINFO(f"new event: {event_desc_map[self.event_type_]}") else: LOGDBG("cb_toileting_frequency, empty result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return # 夜间如厕频次统计回调 def cb_night_toileting_frequency(self, result, userdata): try: if result: count = 0 event_list = [] for row in result: dev_id: str = row.get("dev_id") event_uuid: str = row.get("uuid") plan_uuid: str = row.get("plan_uuid") event_type: int = row.get("event_type") info: dict = json.loads(row["info"]) if row.get("info") else {} is_handle: str = row.get("is_handle") create_time: str = row.get("create_time") update_time: str = row.get("update_time") is_deleted: str = row.get("is_deleted") remark: str = row.get("remark") event_list.append(info) this_event_uuid = str(uuid.uuid4()) last_info = { "start_time" : userdata["start_dt"], "end_time" : userdata["end_dt"], "count" : len(event_list), "event_list" : event_list } # 入库 event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": this_event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(last_info, ensure_ascii=False), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events") LOGINFO(f"new event: {event_desc_map[self.event_type_]}") else: LOGDBG("cb_night_toileting_frequency, empty result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return # 如厕频次异常回调 def cb_toileting_frequency_abnormal(self, result, userdata): try: if result: count = 0 event_list = [] for row in result: dev_id: str = row.get("dev_id") event_uuid: str = row.get("uuid") plan_uuid: str = row.get("plan_uuid") event_type: int = row.get("event_type") info: dict = json.loads(row["info"]) if row.get("info") else {} is_handle: str = row.get("is_handle") create_time: str = row.get("create_time") update_time: str = row.get("update_time") is_deleted: str = row.get("is_deleted") remark: str = row.get("remark") event_list.append(info) if len(event_list) < self.event_attr_.threshold_count_: return this_event_uuid = str(uuid.uuid4()) last_info = { "start_time" : userdata["start_dt"], "end_time" : userdata["end_dt"], "count" : len(event_list), "event_list" : event_list } # 入库 event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": this_event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(last_info, ensure_ascii=False), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events") LOGINFO(f"new event: {event_desc_map[self.event_type_]}") else: LOGDBG("cb_toileting_frequency_abnormal, empty result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return # 起夜异常回调 def cb_night_toileting_frequency_abnormal(self, result, userdata): try: if result: count = 0 event_list = [] for row in result: dev_id: str = row.get("dev_id") event_uuid: str = row.get("uuid") plan_uuid: str = row.get("plan_uuid") event_type: int = row.get("event_type") info: dict = json.loads(row["info"]) if row.get("info") else {} is_handle: str = row.get("is_handle") create_time: str = row.get("create_time") update_time: str = row.get("update_time") is_deleted: str = row.get("is_deleted") remark: str = row.get("remark") event_list.append(info) if len(event_list) < self.event_attr_.threshold_count_: return this_event_uuid = str(uuid.uuid4()) last_info = { "start_time" : userdata["start_dt"], "end_time" : userdata["end_dt"], "count" : len(event_list), "event_list" : event_list } # 入库 event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": this_event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(last_info, ensure_ascii=False), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events") LOGINFO(f"new event: {event_desc_map[self.event_type_]}") else: LOGDBG("cb_night_toileting_frequency_abnormal, empty result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return # 卫生间频次统计回调 def cb_bathroom_stay_frequency(self, result, userdata): try: if result: count = 0 event_list = [] for row in result: dev_id: str = row.get("dev_id") event_uuid: str = row.get("uuid") plan_uuid: str = row.get("plan_uuid") event_type: int = row.get("event_type") info: dict = json.loads(row["info"]) if row.get("info") else {} is_handle: str = row.get("is_handle") create_time: str = row.get("create_time") update_time: str = row.get("update_time") is_deleted: str = row.get("is_deleted") remark: str = row.get("remark") event_list.append(info) this_event_uuid = str(uuid.uuid4()) last_info = { "start_time" : userdata["start_dt"], "end_time" : userdata["end_dt"], "count" : len(event_list), "event_list" : event_list } # 入库 event_uuid = str(uuid.uuid4()) remark = { "sp_id": sys_comm.g_sys_conf["sp_id"] } params = { "dev_id": dev_id, "uuid": this_event_uuid, "plan_uuid": self.plan_uuid_, "event_type": self.event_type_, "info": json.dumps(last_info, ensure_ascii=False), "linkage_push_wechat_service": self.linkage_action_.wechat_service_, "is_handle": 0, "create_time": get_bj_time_s(), "is_deleted": 0, "tenant_id": self.tenant_id_, "remark": json.dumps(remark) } db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None)) # 通知 linkage_action = { "linkage_push_wechat_service": self.linkage_action_.wechat_service_ } mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events") LOGINFO(f"new event: {event_desc_map[self.event_type_]}") else: LOGDBG("cb_bathroom_stay_frequency, empty result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return