12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421 |
- from typing import List, Tuple, Optional
- from datetime import datetime, time, date
- from threading import Thread, Lock
- from enum import Enum
- import uuid
- import json
- import traceback
- from datetime import datetime, timezone, timedelta
- from collections import deque
- import common.sys_comm as sys_comm
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
- get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
- utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
- )
- from core.time_plan import TimePlan
- from core.event_type import EventType, event_desc_map
- import core.g_LAS as g_las
- import core.alarm_plan_helper as helper
- from core.linkage_action import LinkageAction
- from db.db_process import (
- db_req_que, DBRequest_Async
- )
- from db.db_process import (
- db_execute_sync, db_execute_async
- )
- import db.db_sqls as sqls
- import mqtt.mqtt_send as mqtt_send
- import device.dev_mng as g_Dev
- from device.dev_mng import (
- Device, g_dev_mgr
- )
- class EventAttr_Base:
- def __init__(self):
- return
- # 事件属性 事件事件
- class EventAttr_StayDetection(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.enter_ts_: int = -1 # 进入时间(s)
- self.leave_ts_: int = -1 # 离开时间(s)
- self.stay_time_: int = -1 # 停留时长(s)
- return
- def reset(self):
- self.enter_ts_ = -1
- self.leave_ts_ = -1
- self.stay_time_ = -1
- # 事件属性 滞留事件
- class EventAttr_RetentionDetection(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.enter_ts_: int = -1 # 进入时间(s)
- self.leave_ts_: int = -1 # 离开时间(s)
- self.stay_time_: int = -1 # 停留时长(s)
- return
- def reset(self):
- self.enter_ts_ = -1
- self.leave_ts_ = -1
- self.stay_time_ = -1
- # 事件属性 如厕事件
- class EventAttr_ToiletingDetection(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.enter_ts_: int = -1 # 进入时间(ms)
- self.leave_ts_: int = -1 # 离开时间(ms)
- self.stay_time_: int = -1 # 停留时长(ms)
- return
- def reset(self):
- self.enter_ts_ = -1
- self.leave_ts_ = -1
- self.stay_time_ = -1
- # 事件属性 如厕频次统计
- class EventAttr_ToiletingFrequency(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.count_: int = 0 # 统计次数
- self.event_list: list = []
- return
- def reset(self):
- self.count_ = 0
- self.event_list = []
- # 事件属性 夜间如厕频次统计
- class EventAttr_NightToiletingFrequency(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.count_: int = 0 # 统计次数
- self.event_list: list = []
- return
- def reset(self):
- self.count_ = 0
- self.event_list = []
- # 事件属性 如厕频次异常
- class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.count_: int = 0 # 统计次数
- self.threshold_count_: int = 0 # 异常阈值
- self.event_list: list = []
- return
- def reset(self):
- self.count_ = 0
- self.event_list = []
- # 事件属性 起夜异常
- class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.count_: int = 0 # 统计次数
- self.threshold_count_: int = 0 # 异常阈值
- self.event_list: list = []
- return
- def reset(self):
- self.count_ = 0
- self.event_list = []
- # 事件属性 卫生间频次统计
- class EventAttr_BathroomStayFrequency(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.count_: int = 0 # 统计次数
- self.event_list: list = []
- return
- def reset(self):
- self.count_ = 0
- self.event_list = []
- # 事件属性 异常消失
- class EventAttr_TargetAbsence(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.leave_ts_: int = -1 # 离开时间(ms)
- self.enter_ts_: int = -1 # 进入时间(ms)
- self.absence_time_: int = -1 # 消失时长(ms)
- self.time_threshold_: int = 300 # 触发消失时间阈值(ms)
- return
- def reset(self):
- self.leave_ts_ = -1
- self.enter_ts_ = -1
- self.absence_time_ = -1
- # 事件属性 睡眠监测
- class EventAttr_SleepMonitoring(EventAttr_Base):
- motion_stat = ["peaceful", "micro", "active", "leave"] # 空间状态
- # 呼吸率划分:r0:[0-8) r1:[8,12) r2:[12,18) r3:[18,25] r4:25以上
- breathe_stat = ["r0", "r1", "r2", "r3", "r4"] # 呼吸状态:呼吸过慢/异常,深睡,浅睡,REM/清醒,呼吸过快/清醒
- # 深睡(deep),浅睡(light),REM,清醒(awake),离床(leave)
- sleep_stat = [
- ["deep", "deep", "light", "REM", "awake"], # peaceful
- ["deep", "light", "light", "REM", "awake"], # micro
- ["awake", "awake", "awake", "awake", "awake"], # active
- ["leave", "leave", "leave", "leave", "leave"]] # leave
- # 异常:呼吸过慢(slow_breathe),呼吸过快(fast_breathe)
- def __init__(self, event_type):
- self.start_sleep_ts_: int = -1 # 睡眠开始时间
- self.end_sleep_ts_: int = -1 # 睡眠结束时间
- self.motion_stat_ = None # 当前运动状态
- self.breathe_stat_ = None # 当前呼吸状态
- self.sleep_stat_ = None # 当前睡眠状态
- self.sleep_segments_ = [] # 状态阶段记录 [{"ts":xxx, "stat":xxx}, ...]
- self.last_motion_ = 0.0 # 最新的运动状态
- self.last_breath_ = 0.0 # 最新的呼吸率
- self.last_pts_ = [] # 最后的点目标
- self.last_update_ts_ = -1
- self.miss_target_count_ = 0
- return
- # 事件属性 清理过期事件
- class EventAttr_CleanExpireEvents(EventAttr_Base):
- def __init__(self, event_type):
- self.event_type_ = event_type
- self.expire_range_ = 90
- # 事件属性表
- event_attr_map = {
- EventType.STAY_DETECTION.value : EventAttr_StayDetection,
- EventType.RETENTION_DETECTION.value : EventAttr_RetentionDetection,
- EventType.TOILETING_DETECTION.value : EventAttr_ToiletingDetection,
- EventType.TOILETING_FREQUENCY.value : EventAttr_ToiletingFrequency,
- EventType.NIGHT_TOILETING_FREQUENCY.value : EventAttr_NightToiletingFrequency,
- EventType.TOILETING_FREQUENCY_ABNORMAL.value : EventAttr_ToiletingFrequencyAbnormal,
- EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal,
- EventType.BATHROOM_STAY_FREQUENCY.value : EventAttr_BathroomStayFrequency,
- EventType.TARGET_ABSENCE.value : EventAttr_TargetAbsence,
- EventType.SLEEP_MONITORING.value : EventAttr_SleepMonitoring,
- EventType.CLEAN_EXPIRE_EVENTS.value : EventAttr_CleanExpireEvents,
- }
- class Cron:
- def __init__(self, h, m, s):
- self.h_ = None
- self.m_ = None
- self.s_ = None
- class AlarmPlan:
- # 注册handle
- handles_map = {
- EventType.STAY_DETECTION.value : lambda plan: AlarmPlan.handle_stay_detection(plan),
- EventType.RETENTION_DETECTION.value : lambda plan: AlarmPlan.handle_retention_detection(plan),
- EventType.TOILETING_DETECTION.value : lambda plan: AlarmPlan.handle_toileting_detection(plan),
- EventType.TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_toileting_frequency(plan),
- EventType.NIGHT_TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_night_toileting_frequency(plan),
- EventType.TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_toileting_frequency_abnormal(plan),
- EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_night_toileting_frequency_abnormal(plan),
- EventType.BATHROOM_STAY_FREQUENCY.value : lambda plan: AlarmPlan.handle_bathroom_stay_frequency(plan),
- EventType.TARGET_ABSENCE.value : lambda plan: AlarmPlan.handle_target_absence(plan),
- EventType.SLEEP_MONITORING.value : lambda plan: AlarmPlan.handle_sleep_monitoring(plan),
- # 平台事件(任务)
- EventType.CLEAN_EXPIRE_EVENTS.value : lambda plan: AlarmPlan.handle_clear_expire_events(plan),
- }
- def __init__(self,
- plan_uuid: str,
- name: str,
- dev_id: str,
- dev_name: str,
- enable: bool,
- time_plan: TimePlan,
- rect: list,
- event_type: int,
- threshold_time: int,
- merge_time: int,
- param: dict,
- cron: Optional[dict] = None,
- linkage_action: LinkageAction = LinkageAction(),
- tenant_id:int = 0
- ):
- self.lock_ = Lock()
- self.plan_uuid_ = plan_uuid # 计划id
- self.name_ = name # 计划名称
- self.dev_id_ = dev_id # 设备id
- self.dev_name_ = dev_name # 设备名称
- self.enable_ = enable # 是否启用
- self.time_plan_ = time_plan # 时间计划
- self.param_ = param # 参数
- self.linkage_action_ = linkage_action # 联动动作
- self.tenant_id_ = tenant_id # 租户id
- # 维护状态(根据TimePlan判断)
- self.status_ = 0 # 0未激活,1激活,-1过期
- self.status_update_ts_ = -1 # 状态更新时间,初始值为-1
- # 事件触发参数
- self.event_type_ = event_type # 事件类型
- self.rect_ = rect # 检测区域 [left, top, width, height]
- self.threshold_time_ = threshold_time # 触发时间阈值
- self.merge_time_ = merge_time # 归并时间窗口
- self.event_attr_ = self.init_event_attr() # 事件属性
- if self.event_attr_ is None:
- raise ValueError(f"Invalid event_type: {event_type}")
- self.handle_func_ = self.handles_map.get(event_type)
- if self.handle_func_ is None:
- raise ValueError(f"Invalic event_type: {event_type}")
- # 计划任务的开始时间
- self.cron_ = cron # {“hour": 7, "minute": 0}
- def execute(self):
- if self.status_ != 1:
- return
- g_las.g_alarm_plan_disp.dispatch(self)
- # 初始化事件属性
- def init_event_attr(self):
- try:
- event_cls = event_attr_map.get(self.event_type_)
- if event_cls is None:
- return None
- event_attr = event_cls(self.event_type_)
- if ((self.event_type_ == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
- (self.event_type_ == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value)):
- event_attr.threshold_count_ = int(self.param_.get("count", 0))
- elif ((self.event_type_ == EventType.TARGET_ABSENCE.value)):
- event_attr.time_threshold_ = int(self.param_.get("time_threshold", 0))
- elif ((self.event_type_ == EventType.CLEAN_EXPIRE_EVENTS.value)):
- event_attr.expire_range_ = 90
- return event_attr
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 更新激活状态
- def update_status(self, now: Optional[datetime] = None) -> None:
- now = now or datetime.now()
- old_status = self.status_
- if not self.enable_:
- self.status_ = 0
- else:
- now_fmt = now.strftime("%Y-%m-%d")
- # 过期
- if now_fmt > self.time_plan_.stop_date_:
- self.status_ = -1
- elif now_fmt < self.time_plan_.start_date_:
- self.status_ = 0
- elif self.time_plan_.is_active_now(now):
- self.status_ = 1
- else:
- self.status_ = 0
- if self.status_ != old_status:
- self.status_update_ts = int(now.timestamp())
- LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}")
- def is_point_in_rect(self, x:float, y:float, rect:list) -> bool:
- rx, ry, rw, rh = rect
- x_min = min(rx, rx + rw)
- x_max = max(rx, rx + rw)
- y_min = min(ry, ry - rh)
- y_max = max(ry, ry - rh)
- bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max
- return bRet
- # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit
- def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=5):
- now_s = now if now else get_utc_time_s()
- rtd_que_copy = device.get_rtd_que_copy()
- with self.lock_:
- for rtd_unit in reversed(rtd_que_copy): # 倒序扫描
- ts_s = int(rtd_unit.get("timestamp", 0))
- if now_s - ts_s > t:
- break # 已经超过 t 秒,可以直接结束
- # 检查点是否在区域内
- for pt in rtd_unit.get("target_point", []):
- if len(pt) >= 2:
- x, y = pt[0], pt[1]
- if self.is_point_in_rect(x, y, rect):
- return rtd_unit
- return None
- # 检查关键时间点, 获取停留时间
- def get_stay_time(self, device: Device, t: int=5):
- now = get_utc_time_s()
- rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, t)
- with self.lock_:
- if rtd_unit:
- timestamp = rtd_unit["timestamp"]
- if self.event_attr_.enter_ts_ == -1:
- self.event_attr_.enter_ts_ = timestamp
- LOGINFO(f"detected target enter, plan_uuid: {self.plan_uuid_}")
- else:
- self.event_attr_.leave_ts_ = timestamp
- if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
- return EC.EC_FAILED
- # 归并时间内,不认为事件结束
- if now - self.event_attr_.leave_ts_ < self.merge_time_:
- return EC.EC_FAILED
- self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
- return self.event_attr_.stay_time_
- # 检查停留时间
- def check_stay_time(self, stay_time: int):
- with self.lock_:
- # 时间小于触发时间阈值,忽略并重置
- if stay_time < self.threshold_time_ :
- self.event_attr_.reset()
- LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
- return EC.EC_FAILED
- LOGINFO(f"detected target leave, plan_uuid: {self.plan_uuid_}")
- return 0
- # 检查关键时间点, 获取消失时间
- def get_absence_time(self, device: Device, t: int=5):
- now = get_utc_time_s()
- # 查找最新的落在检测区域的目标
- rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
- if rtd_unit:
- return EC.EC_FAILED
- with self.lock_:
- if self.event_attr_.leave_ts_ == -1:
- self.event_attr_.leave_ts_ = now
- LOGINFO(f"detected target leave, plan_uuid: {self.plan_uuid_}")
- else:
- self.event_attr_.enter_ts_ = now
- if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
- return EC.EC_FAILED
- # 归并时间内,不认为事件结束
- if now - self.event_attr_.enter_ts_ < self.merge_time_:
- return EC.EC_FAILED
- self.event_attr_.absence_time_ = self.event_attr_.enter_ts_ - self.event_attr_.leave_ts_
- return self.event_attr_.absence_time_
- # 检查消失时间
- def check_absence_time(self, absence_time: int):
- with self.lock_:
- # 时间小于触发时间阈值,忽略并重置
- if absence_time < self.event_attr_.time_threshold_:
- self.event_attr_.reset()
- LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
- return EC.EC_FAILED
- LOGINFO(f"detected target enter, plan_uuid: {self.plan_uuid_}")
- return 0
- # 停留事件
- def handle_stay_detection(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- # 获取停留时间
- stay_time = self.get_stay_time(device)
- if stay_time == EC.EC_FAILED:
- return
- # 检查停留时间
- if self.check_stay_time(stay_time) == EC.EC_FAILED:
- return
- # 构造事件
- # 入库
- info = {
- "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(info),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 滞留事件
- def handle_retention_detection(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- # 获取停留时间
- stay_time = self.get_stay_time(device)
- if stay_time == EC.EC_FAILED:
- return
- # 检查停留时间
- if self.check_stay_time(stay_time) == EC.EC_FAILED:
- return
- # 构造事件
- # 入库
- info = {
- "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(info),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 如厕事件
- def handle_toileting_detection(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- # 获取停留时间
- stay_time = self.get_stay_time(device)
- if stay_time == EC.EC_FAILED:
- return
- # 检查停留时间
- if self.check_stay_time(stay_time) == EC.EC_FAILED:
- return
- # 构造事件
- # 入库
- info = {
- "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
- "stay_time": stay_time
- }
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(info),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 如厕频次统计
- def handle_toileting_frequency(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- start_dt, end_dt = helper.get_query_time_range(self.param_)
- params = {
- "dev_id" : self.dev_id_,
- "event_type": EventType.TOILETING_DETECTION.value,
- "start_dt": start_dt,
- "end_dt": end_dt
- }
- userdata = {
- "start_dt" : start_dt,
- "end_dt" : end_dt
- }
- db_req_que.put(DBRequest_Async(
- sql=sqls.sql_query_events_by_datetime,
- params=params,
- callback=self.cb_toileting_frequency,
- userdata=userdata))
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 夜间如厕频次统计
- def handle_night_toileting_frequency(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- start_dt, end_dt = helper.get_query_time_range(self.param_)
- params = {
- "dev_id" : self.dev_id_,
- "event_type": EventType.TOILETING_DETECTION.value,
- "start_dt": start_dt,
- "end_dt": end_dt
- }
- userdata = {
- "start_dt" : start_dt,
- "end_dt" : end_dt
- }
- db_req_que.put(DBRequest_Async(
- sql=sqls.sql_query_events_by_datetime,
- params=params,
- callback=self.cb_night_toileting_frequency,
- userdata=userdata))
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 如厕频次异常
- def handle_toileting_frequency_abnormal(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- start_dt, end_dt = helper.get_query_time_range(self.param_)
- params = {
- "dev_id" : self.dev_id_,
- "event_type": EventType.TOILETING_DETECTION.value,
- "start_dt": start_dt,
- "end_dt": end_dt
- }
- userdata = {
- "start_dt" : start_dt,
- "end_dt" : end_dt
- }
- db_req_que.put(DBRequest_Async(
- sql=sqls.sql_query_events_by_datetime,
- params=params,
- callback=self.cb_toileting_frequency_abnormal,
- userdata=userdata))
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 起夜异常
- def handle_night_toileting_frequency_abnormal(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- start_dt, end_dt = helper.get_query_time_range(self.param_)
- params = {
- "dev_id" : self.dev_id_,
- "event_type": EventType.TOILETING_DETECTION.value,
- "start_dt": start_dt,
- "end_dt": end_dt
- }
- userdata = {
- "start_dt" : start_dt,
- "end_dt" : end_dt
- }
- db_req_que.put(DBRequest_Async(
- sql=sqls.sql_query_events_by_datetime,
- params=params,
- callback=self.cb_night_toileting_frequency_abnormal,
- userdata=userdata))
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 卫生间频次统计
- def handle_bathroom_stay_frequency(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- start_dt, end_dt = helper.get_query_time_range(self.param_)
- params = {
- "dev_id" : self.dev_id_,
- "event_type": EventType.STAY_DETECTION.value,
- "start_dt": start_dt,
- "end_dt": end_dt
- }
- userdata = {
- "start_dt" : start_dt,
- "end_dt" : end_dt
- }
- db_req_que.put(DBRequest_Async(
- sql=sqls.sql_query_events_by_datetime,
- params=params,
- callback=self.cb_bathroom_stay_frequency,
- userdata=userdata))
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 异常消失
- def handle_target_absence(self):
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- # 获取消失时间
- absence_time = self.get_absence_time(device)
- if absence_time == EC.EC_FAILED:
- return
- # 检查消失时间
- if self.check_absence_time(absence_time) == EC.EC_FAILED:
- return
- # 构造事件
- # 入库
- info = {
- "start_time": utc_to_bj_s(self.event_attr_.leave_ts_),
- "end_time": utc_to_bj_s(self.event_attr_.enter_ts_),
- "absence_time": absence_time
- }
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(info),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
- LOGDBG(f"new event: {event_desc_map[self.event_type_]}, absence_time: {absence_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
- self.event_attr_.reset()
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 睡眠监测
- def handle_sleep_monitoring(self):
- # ---------------- 参数配置 ----------------
- NO_TARGET_TIMEOUT_S = 3 # 超过3秒无数据认为无目标
- NO_TARGET_MAX_COUNT = 3 # 连续3次无目标后确定离床
- MOTION_SMOOTH_WINDOW = 10 # 平滑窗口大小(取最近10帧计算平均)
- STAY_THRESHOLD_PEACEFUL = 0.05 # 静止阈值
- STAY_THRESHOLD_MICRO = 0.15 # 微动阈值
- LEAVE_BED_TS = 3 # 离床判定时间阈值
- try:
- dev_id = self.dev_id_
- device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
- if not device:
- return
- if (not self.rect_):
- return
- rtd_list = device.get_rtd_que_copy()
- if not rtd_list:
- return
- # 初始化状态机
- if not hasattr(self, "sleep_stat_"):
- self.sleep_stat_: str = "leave" # 当前睡眠状态
- self.update_sleep_stat_: str = "leave" # 要更新的睡眠状态
- self.avg_motion_: float = 0.0 # 当前运动幅值
- self.motion_stat_: str = "leave" # 当前运动状态
- self.avg_breath_: float = 0.0 # 当前呼吸率
- self.breathe_stat_: str = "r0" # 当前呼吸状态
- self.start_sleep_ts_: int = -1 # 睡眠开始时间
- self.end_sleep_ts_: int = -1 # 睡眠结束时间
- self.motion_window = deque(maxlen=MOTION_SMOOTH_WINDOW) # 近 N 次运动距离均值
- self.sleep_segments_ = [] # 状态阶段记录 [{"ts":xxx, "stat":xxx}, ...]
- self.last_pts_ = [] # 最后的点目标
- self.last_update_ts_ = -1
- self.miss_target_count_ = 0
- self.last_leave_ts = -1 # 上次离床判定时间,离床判定时间超过5秒视为离床事件
- now_ts = get_utc_time_s()
- rtd_unit = rtd_list[-1]
- ts = rtd_unit["timestamp"]
- target_point = rtd_unit["target_point"]
- ## 1. 空间状态分析
- if now_ts - ts > NO_TARGET_TIMEOUT_S:
- ## 目标不存在
- # 起夜
- x, y, z, snr = target_point
- if not helper.is_point_in_rect(x, y, self.rect_):
- motion = "leave"
- # 检测不到体动
- if self.start_sleep_ts_ == -1:
- return
- else:
- motion = "peaceful"
- else:
- ## 目标存在
- x = target_point[0]
- y = target_point[1]
- if not helper.is_point_in_rect(x, y, self.rect_):
- # 不在床上
- motion = "leave"
- else:
- # 在床上
- # 计算体动
- WINDOWS_S = 10 # 滑动事件窗口
- recent_rtds = [r for r in rtd_list if now_ts - r["timestamp"] <= WINDOWS_S]
- if len(recent_rtds) < 5:
- return
-
- # 取最新点
- x,y,z,snr = recent_rtds[-1]["target_point"]
- # 检查是否在床上
- if not helper.is_point_in_rect(x, y, self.rect_):
- if self.last_leave_ts == -1:
- self.last_leave_ts = now_ts
- elif now_ts - self.last_leave_ts > LEAVE_BED_TS:
- motion = "leave"
- else:
- self.last_leave_ts = -1
- # 计算位移序列
- motions = []
- for i in range(1, len(recent_rtds)):
- x1, y1, z1, _ = recent_rtds[i - 1]["target_point"]
- x2, y2, z2, _ = recent_rtds[i]["target_point"]
- dist = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5
- motions.append(dist)
- if motions:
- avg_motion = sum(motions) / len(motions)
- self.motion_window.append(avg_motion)
- motion_smooth = sum(self.motion_window) / len(self.motion_window)
- else:
- motion_smooth = 0
- # 状态判定
- if motion_smooth < STAY_THRESHOLD_PEACEFUL:
- motion = "peaceful"
- elif motion_smooth < STAY_THRESHOLD_MICRO:
- motion = "mocro"
- else:
- motion = "active"
- self.motion_stat_ = motion
- ## 2. 呼吸率分析
- BREATHE_WINDOWS_S = 5 # 呼吸滑动时间窗口
- recent_breaths =[r["breath_rpm"] for r in rtd_list
- if (now_ts - r["timestamp"] <= BREATHE_WINDOWS_S) and
- ("breath_rpm" in r and isinstance(r["breath_rpm"], (int, float)))]
-
- if not recent_breaths:
- breathe_stat = "r0" # 无数据时视为异常/无呼吸
- avg_breath = 0.0
- else:
- avg_breath = sum(recent_breaths) / len(recent_breaths)
- if avg_breath < 8:
- breathe_stat = "r0"
- elif avg_breath < 12:
- breathe_stat = "r1"
- elif avg_breath < 18:
- breathe_stat = "r1"
- else:
- breathe_stat = "r1"
- self.avg_breath_ = avg_breath
- self.breathe_stat_ = breathe_stat
- ## 3. 睡眠状态分析
- try:
- i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
- i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
- self.update_sleep_stat_ = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
- except ValueError:
- LOGERR(f"infalid i_montion or i_breath")
- # 3.1 更新睡眠报告
- sleep_node = {
- "ts": now_ts,
- "sleep_stat": self.update_sleep_stat_
- }
- self.sleep_segments_.append(sleep_node)
- self.sleep_stat_ = self.update_sleep_stat_
- # 1. 分析空间状态
- # 2. 分析呼吸率
- # 3. 分析睡眠状态
- return
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # 清理过期事件
- def handle_clear_expire_events(self):
- try:
- params = {
- "save_days": self.event_attr_.expire_range_
- }
- db_execute_async(sqls.sql_delete_expire_events, params=params)
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- # ========== 一些回调 ==========
- # 如厕频次统计回调
- def cb_toileting_frequency(self, result, userdata):
- try:
- if result:
- count = 0
- event_list = []
- for row in result:
- dev_id: str = row.get("dev_id")
- event_uuid: str = row.get("uuid")
- plan_uuid: str = row.get("plan_uuid")
- event_type: int = row.get("event_type")
- info: dict = json.loads(row["info"]) if row.get("info") else {}
- is_handle: str = row.get("is_handle")
- create_time: str = row.get("create_time")
- update_time: str = row.get("update_time")
- is_deleted: str = row.get("is_deleted")
- remark: str = row.get("remark")
- event_list.append(info)
- this_event_uuid = str(uuid.uuid4())
- last_info = {
- "start_time" : userdata["start_dt"],
- "end_time" : userdata["end_dt"],
- "count" : len(event_list),
- "event_list" : event_list
- }
- # 入库
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": this_event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(last_info, ensure_ascii=False),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
- LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
- else:
- LOGDBG("cb_toileting_frequency, empty result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return
- # 夜间如厕频次统计回调
- def cb_night_toileting_frequency(self, result, userdata):
- try:
- if result:
- count = 0
- event_list = []
- for row in result:
- dev_id: str = row.get("dev_id")
- event_uuid: str = row.get("uuid")
- plan_uuid: str = row.get("plan_uuid")
- event_type: int = row.get("event_type")
- info: dict = json.loads(row["info"]) if row.get("info") else {}
- is_handle: str = row.get("is_handle")
- create_time: str = row.get("create_time")
- update_time: str = row.get("update_time")
- is_deleted: str = row.get("is_deleted")
- remark: str = row.get("remark")
- event_list.append(info)
- this_event_uuid = str(uuid.uuid4())
- last_info = {
- "start_time" : userdata["start_dt"],
- "end_time" : userdata["end_dt"],
- "count" : len(event_list),
- "event_list" : event_list
- }
- # 入库
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": this_event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(last_info, ensure_ascii=False),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
- LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
- else:
- LOGDBG("cb_night_toileting_frequency, empty result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return
- # 如厕频次异常回调
- def cb_toileting_frequency_abnormal(self, result, userdata):
- try:
- if result:
- count = 0
- event_list = []
- for row in result:
- dev_id: str = row.get("dev_id")
- event_uuid: str = row.get("uuid")
- plan_uuid: str = row.get("plan_uuid")
- event_type: int = row.get("event_type")
- info: dict = json.loads(row["info"]) if row.get("info") else {}
- is_handle: str = row.get("is_handle")
- create_time: str = row.get("create_time")
- update_time: str = row.get("update_time")
- is_deleted: str = row.get("is_deleted")
- remark: str = row.get("remark")
- event_list.append(info)
- if len(event_list) < self.event_attr_.threshold_count_:
- return
- this_event_uuid = str(uuid.uuid4())
- last_info = {
- "start_time" : userdata["start_dt"],
- "end_time" : userdata["end_dt"],
- "count" : len(event_list),
- "event_list" : event_list
- }
- # 入库
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": this_event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(last_info, ensure_ascii=False),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events")
- LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
- else:
- LOGDBG("cb_toileting_frequency_abnormal, empty result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return
- # 起夜异常回调
- def cb_night_toileting_frequency_abnormal(self, result, userdata):
- try:
- if result:
- count = 0
- event_list = []
- for row in result:
- dev_id: str = row.get("dev_id")
- event_uuid: str = row.get("uuid")
- plan_uuid: str = row.get("plan_uuid")
- event_type: int = row.get("event_type")
- info: dict = json.loads(row["info"]) if row.get("info") else {}
- is_handle: str = row.get("is_handle")
- create_time: str = row.get("create_time")
- update_time: str = row.get("update_time")
- is_deleted: str = row.get("is_deleted")
- remark: str = row.get("remark")
- event_list.append(info)
- if len(event_list) < self.event_attr_.threshold_count_:
- return
- this_event_uuid = str(uuid.uuid4())
- last_info = {
- "start_time" : userdata["start_dt"],
- "end_time" : userdata["end_dt"],
- "count" : len(event_list),
- "event_list" : event_list
- }
- # 入库
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": this_event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(last_info, ensure_ascii=False),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events")
- LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
- else:
- LOGDBG("cb_night_toileting_frequency_abnormal, empty result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return
- # 卫生间频次统计回调
- def cb_bathroom_stay_frequency(self, result, userdata):
- try:
- if result:
- count = 0
- event_list = []
- for row in result:
- dev_id: str = row.get("dev_id")
- event_uuid: str = row.get("uuid")
- plan_uuid: str = row.get("plan_uuid")
- event_type: int = row.get("event_type")
- info: dict = json.loads(row["info"]) if row.get("info") else {}
- is_handle: str = row.get("is_handle")
- create_time: str = row.get("create_time")
- update_time: str = row.get("update_time")
- is_deleted: str = row.get("is_deleted")
- remark: str = row.get("remark")
- event_list.append(info)
- this_event_uuid = str(uuid.uuid4())
- last_info = {
- "start_time" : userdata["start_dt"],
- "end_time" : userdata["end_dt"],
- "count" : len(event_list),
- "event_list" : event_list
- }
- # 入库
- event_uuid = str(uuid.uuid4())
- remark = {
- "sp_id": sys_comm.g_sys_conf["sp_id"]
- }
- params = {
- "dev_id": dev_id,
- "uuid": this_event_uuid,
- "plan_uuid": self.plan_uuid_,
- "event_type": self.event_type_,
- "info": json.dumps(last_info, ensure_ascii=False),
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
- "is_handle": 0,
- "create_time": get_bj_time_s(),
- "is_deleted": 0,
- "tenant_id": self.tenant_id_,
- "remark": json.dumps(remark)
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
- # 通知
- linkage_action = {
- "linkage_push_wechat_service": self.linkage_action_.wechat_service_
- }
- mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
- LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
- else:
- LOGDBG("cb_bathroom_stay_frequency, empty result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return
|