alarm_plan.py 56 KB


  1. from typing import List, Tuple, Optional
  2. from datetime import datetime, time, date
  3. from threading import Thread, Lock
  4. from enum import Enum
  5. import uuid
  6. import json
  7. import traceback
  8. from datetime import datetime, timezone, timedelta
  9. from collections import deque
  10. import g_config
  11. from common.sys_comm import (
  12. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  13. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  14. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  15. )
  16. from core.time_plan import TimePlan
  17. from core.event_type import EventType, event_desc_map
  18. import core.g_LAS as g_las
  19. import core.alarm_plan_helper as helper
  20. from core.linkage_action import LinkageAction
  21. from db.db_process import (
  22. db_req_que, DBRequest_Async
  23. )
  24. from db.db_process import (
  25. db_execute_sync, db_execute_async
  26. )
  27. import db.db_sqls as sqls
  28. import mqtt.mqtt_send as mqtt_send
  29. import device.dev_mng as g_Dev
  30. from device.dev_mng import (
  31. Device, g_dev_mgr
  32. )
  33. class EventAttr_Base:
  34. def __init__(self):
  35. return
  36. # 事件属性 事件事件
  37. class EventAttr_StayDetection(EventAttr_Base):
  38. def __init__(self, event_type):
  39. self.event_type_ = event_type
  40. self.enter_ts_: int = -1 # 进入时间(s)
  41. self.leave_ts_: int = -1 # 离开时间(s)
  42. self.stay_time_: int = -1 # 停留时长(s)
  43. return
  44. def reset(self):
  45. self.enter_ts_ = -1
  46. self.leave_ts_ = -1
  47. self.stay_time_ = -1
  48. # 事件属性 滞留事件
  49. class EventAttr_RetentionDetection(EventAttr_Base):
  50. def __init__(self, event_type):
  51. self.event_type_ = event_type
  52. self.enter_ts_: int = -1 # 进入时间(s)
  53. self.leave_ts_: int = -1 # 离开时间(s)
  54. self.stay_time_: int = -1 # 停留时长(s)
  55. return
  56. def reset(self):
  57. self.enter_ts_ = -1
  58. self.leave_ts_ = -1
  59. self.stay_time_ = -1
  60. # 事件属性 如厕事件
  61. class EventAttr_ToiletingDetection(EventAttr_Base):
  62. def __init__(self, event_type):
  63. self.event_type_ = event_type
  64. self.enter_ts_: int = -1 # 进入时间(ms)
  65. self.leave_ts_: int = -1 # 离开时间(ms)
  66. self.stay_time_: int = -1 # 停留时长(ms)
  67. return
  68. def reset(self):
  69. self.enter_ts_ = -1
  70. self.leave_ts_ = -1
  71. self.stay_time_ = -1
  72. # 事件属性 如厕频次统计
  73. class EventAttr_ToiletingFrequency(EventAttr_Base):
  74. def __init__(self, event_type):
  75. self.event_type_ = event_type
  76. self.count_: int = 0 # 统计次数
  77. self.event_list: list = []
  78. return
  79. def reset(self):
  80. self.count_ = 0
  81. self.event_list = []
  82. # 事件属性 夜间如厕频次统计
  83. class EventAttr_NightToiletingFrequency(EventAttr_Base):
  84. def __init__(self, event_type):
  85. self.event_type_ = event_type
  86. self.count_: int = 0 # 统计次数
  87. self.event_list: list = []
  88. return
  89. def reset(self):
  90. self.count_ = 0
  91. self.event_list = []
  92. # 事件属性 如厕频次异常
  93. class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base):
  94. def __init__(self, event_type):
  95. self.event_type_ = event_type
  96. self.count_: int = 0 # 统计次数
  97. self.threshold_count_: int = 0 # 异常阈值
  98. self.event_list: list = []
  99. return
  100. def reset(self):
  101. self.count_ = 0
  102. self.event_list = []
  103. # 事件属性 起夜异常
  104. class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base):
  105. def __init__(self, event_type):
  106. self.event_type_ = event_type
  107. self.count_: int = 0 # 统计次数
  108. self.threshold_count_: int = 0 # 异常阈值
  109. self.event_list: list = []
  110. return
  111. def reset(self):
  112. self.count_ = 0
  113. self.event_list = []
  114. # 事件属性 卫生间频次统计
  115. class EventAttr_BathroomStayFrequency(EventAttr_Base):
  116. def __init__(self, event_type):
  117. self.event_type_ = event_type
  118. self.count_: int = 0 # 统计次数
  119. self.event_list: list = []
  120. return
  121. def reset(self):
  122. self.count_ = 0
  123. self.event_list = []
  124. # 事件属性 异常消失
  125. class EventAttr_TargetAbsence(EventAttr_Base):
  126. def __init__(self, event_type):
  127. self.event_type_ = event_type
  128. self.leave_ts_: int = -1 # 离开时间(ms)
  129. self.enter_ts_: int = -1 # 进入时间(ms)
  130. self.absence_time_: int = -1 # 消失时长(ms)
  131. self.time_threshold_: int = 300 # 触发消失时间阈值(ms)
  132. return
  133. def reset(self):
  134. self.leave_ts_ = -1
  135. self.enter_ts_ = -1
  136. self.absence_time_ = -1
  137. # 事件属性 睡眠监测
  138. class EventAttr_SleepMonitoring(EventAttr_Base):
  139. motion_stat = ["peaceful", "micro", "active", "leave"] # 空间状态
  140. # 呼吸率划分:r0:[0-8) r1:[8,12) r2:[12,18) r3:[18,25] r4:25以上
  141. breathe_stat = ["r0", "r1", "r2", "r3", "r4"] # 呼吸状态
  142. # 睡眠状态矩阵
  143. # 深睡(deep),浅睡(light),REM,清醒(awake),离床(leave)
  144. sleep_stat = [
  145. ["deep", "deep", "light", "REM", "awake"], # peaceful
  146. ["deep", "light", "REM", "REM", "awake"], # micro
  147. ["awake", "awake", "awake", "awake", "awake"], # active
  148. ["leave", "leave", "leave", "leave", "leave"]] # leave
  149. # 异常:呼吸过慢(slow_breathe),呼吸过快(fast_breathe)
  150. def __init__(self, event_type):
  151. self.start_sleep_ts_: int = -1 # 睡眠开始时间
  152. self.end_sleep_ts_: int = -1 # 睡眠结束时间
  153. self.motion_stat_ = None # 当前运动状态
  154. self.breathe_stat_ = None # 当前呼吸状态
  155. self.sleep_stat_ = None # 当前睡眠状态
  156. self.sleep_segments_ = [] # 状态阶段记录 [{"ts":xxx, "stat":xxx}, ...]
  157. self.last_motion_ = 0.0 # 最新的运动状态
  158. self.last_breath_ = 0.0 # 最新的呼吸率
  159. self.last_pts_ = [] # 最后的点目标
  160. self.last_update_ts_ = -1
  161. self.miss_target_count_ = 0
  162. return
  163. # 事件属性 清理过期事件
  164. class EventAttr_CleanExpireEvents(EventAttr_Base):
  165. def __init__(self, event_type):
  166. self.event_type_ = event_type
  167. self.expire_range_ = 90
  168. # 事件属性表
  169. event_attr_map = {
  170. EventType.STAY_DETECTION.value : EventAttr_StayDetection,
  171. EventType.RETENTION_DETECTION.value : EventAttr_RetentionDetection,
  172. EventType.TOILETING_DETECTION.value : EventAttr_ToiletingDetection,
  173. EventType.TOILETING_FREQUENCY.value : EventAttr_ToiletingFrequency,
  174. EventType.NIGHT_TOILETING_FREQUENCY.value : EventAttr_NightToiletingFrequency,
  175. EventType.TOILETING_FREQUENCY_ABNORMAL.value : EventAttr_ToiletingFrequencyAbnormal,
  176. EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal,
  177. EventType.BATHROOM_STAY_FREQUENCY.value : EventAttr_BathroomStayFrequency,
  178. EventType.TARGET_ABSENCE.value : EventAttr_TargetAbsence,
  179. EventType.SLEEP_MONITORING.value : EventAttr_SleepMonitoring,
  180. EventType.CLEAN_EXPIRE_EVENTS.value : EventAttr_CleanExpireEvents,
  181. }
  182. class Cron:
  183. def __init__(self, h, m, s):
  184. self.h_ = None
  185. self.m_ = None
  186. self.s_ = None
  187. class AlarmPlan:
  188. # 注册handle
  189. handles_map = {
  190. EventType.STAY_DETECTION.value : lambda plan: AlarmPlan.handle_stay_detection(plan),
  191. EventType.RETENTION_DETECTION.value : lambda plan: AlarmPlan.handle_retention_detection(plan),
  192. EventType.TOILETING_DETECTION.value : lambda plan: AlarmPlan.handle_toileting_detection(plan),
  193. EventType.TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_toileting_frequency(plan),
  194. EventType.NIGHT_TOILETING_FREQUENCY.value : lambda plan: AlarmPlan.handle_night_toileting_frequency(plan),
  195. EventType.TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_toileting_frequency_abnormal(plan),
  196. EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : lambda plan: AlarmPlan.handle_night_toileting_frequency_abnormal(plan),
  197. EventType.BATHROOM_STAY_FREQUENCY.value : lambda plan: AlarmPlan.handle_bathroom_stay_frequency(plan),
  198. EventType.TARGET_ABSENCE.value : lambda plan: AlarmPlan.handle_target_absence(plan),
  199. EventType.SLEEP_MONITORING.value : lambda plan: AlarmPlan.handle_sleep_monitoring(plan),
  200. # 平台事件(任务)
  201. EventType.CLEAN_EXPIRE_EVENTS.value : lambda plan: AlarmPlan.handle_clear_expire_events(plan),
  202. }
  203. def __init__(self,
  204. plan_uuid: str,
  205. name: str,
  206. dev_id: str,
  207. dev_name: str,
  208. enable: bool,
  209. time_plan: TimePlan,
  210. rect: list,
  211. event_type: int,
  212. threshold_time: int,
  213. merge_time: int,
  214. param: dict,
  215. cron: Optional[dict] = None,
  216. linkage_action: LinkageAction = LinkageAction(),
  217. tenant_id:int = 0
  218. ):
  219. self.lock_ = Lock()
  220. self.plan_uuid_ = plan_uuid # 计划id
  221. self.name_ = name # 计划名称
  222. self.dev_id_ = dev_id # 设备id
  223. self.dev_name_ = dev_name # 设备名称
  224. self.enable_ = enable # 是否启用
  225. self.time_plan_ = time_plan # 时间计划
  226. self.param_ = param # 参数
  227. self.linkage_action_ = linkage_action # 联动动作
  228. self.tenant_id_ = tenant_id # 租户id
  229. # 维护状态(根据TimePlan判断)
  230. self.status_ = 0 # 0未激活,1激活,-1过期
  231. self.status_update_ts_ = -1 # 状态更新时间,初始值为-1
  232. # 事件触发参数
  233. self.event_type_ = event_type # 事件类型
  234. self.rect_ = rect # 检测区域 [left, top, width, height]
  235. self.threshold_time_ = threshold_time # 触发时间阈值
  236. self.merge_time_ = merge_time # 归并时间窗口
  237. self.event_attr_ = self.init_event_attr() # 事件属性
  238. if self.event_attr_ is None:
  239. raise ValueError(f"Invalid event_type: {event_type}")
  240. self.handle_func_ = self.handles_map.get(event_type)
  241. if self.handle_func_ is None:
  242. raise ValueError(f"Invalic event_type: {event_type}")
  243. # 计划任务的开始时间
  244. self.cron_ = cron # {“hour": 7, "minute": 0}
  245. def execute(self):
  246. if self.status_ != 1:
  247. return
  248. g_las.g_alarm_plan_disp.dispatch(self)
  249. # 初始化事件属性
  250. def init_event_attr(self):
  251. try:
  252. event_cls = event_attr_map.get(self.event_type_)
  253. if event_cls is None:
  254. return None
  255. event_attr = event_cls(self.event_type_)
  256. if ((self.event_type_ == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
  257. (self.event_type_ == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value)):
  258. event_attr.threshold_count_ = int(self.param_.get("count", 0))
  259. elif ((self.event_type_ == EventType.TARGET_ABSENCE.value)):
  260. event_attr.time_threshold_ = int(self.param_.get("time_threshold", 0))
  261. elif ((self.event_type_ == EventType.CLEAN_EXPIRE_EVENTS.value)):
  262. event_attr.expire_range_ = 90
  263. return event_attr
  264. except json.JSONDecodeError as e:
  265. tb_info = traceback.extract_tb(e.__traceback__)
  266. for frame in tb_info:
  267. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  268. except Exception as e:
  269. tb_info = traceback.extract_tb(e.__traceback__)
  270. for frame in tb_info:
  271. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  272. # 更新激活状态
  273. def update_status(self, now: Optional[datetime] = None) -> None:
  274. now = now or datetime.now()
  275. old_status = self.status_
  276. if not self.enable_:
  277. self.status_ = 0
  278. else:
  279. now_fmt = now.strftime("%Y-%m-%d")
  280. # 过期
  281. if now_fmt > self.time_plan_.stop_date_:
  282. self.status_ = -1
  283. elif now_fmt < self.time_plan_.start_date_:
  284. self.status_ = 0
  285. elif self.time_plan_.is_active_now(now):
  286. self.status_ = 1
  287. else:
  288. self.status_ = 0
  289. if self.status_ != old_status:
  290. self.status_update_ts = int(now.timestamp())
  291. LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}")
  292. def is_point_in_rect(self, x:float, y:float, rect:list) -> bool:
  293. rx, ry, rw, rh = rect
  294. x_min = min(rx, rx + rw)
  295. x_max = max(rx, rx + rw)
  296. y_min = min(ry, ry - rh)
  297. y_max = max(ry, ry - rh)
  298. bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max
  299. return bRet
  300. # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit
  301. def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=5):
  302. now_s = now if now else get_utc_time_s()
  303. rtd_que_copy = device.get_rtd_que_copy()
  304. with self.lock_:
  305. for rtd_unit in reversed(rtd_que_copy): # 倒序扫描
  306. ts_s = int(rtd_unit.get("timestamp", 0))
  307. if now_s - ts_s > t:
  308. break # 已经超过 t 秒,可以直接结束
  309. # 检查点是否在区域内
  310. for pt in rtd_unit.get("target_point", []):
  311. if len(pt) >= 2:
  312. x, y = pt[0], pt[1]
  313. if self.is_point_in_rect(x, y, rect):
  314. return rtd_unit
  315. return None
  316. # 检查关键时间点, 获取停留时间
  317. def get_stay_time(self, device: Device, t: int=5):
  318. now = get_utc_time_s()
  319. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, t)
  320. with self.lock_:
  321. if rtd_unit:
  322. timestamp = rtd_unit["timestamp"]
  323. if self.event_attr_.enter_ts_ == -1:
  324. self.event_attr_.enter_ts_ = timestamp
  325. LOGINFO(f"detected target enter, plan_uuid: {self.plan_uuid_}")
  326. else:
  327. self.event_attr_.leave_ts_ = timestamp
  328. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  329. return EC.EC_FAILED
  330. # 归并时间内,不认为事件结束
  331. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  332. return EC.EC_FAILED
  333. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  334. return self.event_attr_.stay_time_
  335. # 检查停留时间
  336. def check_stay_time(self, stay_time: int):
  337. with self.lock_:
  338. # 时间小于触发时间阈值,忽略并重置
  339. if stay_time < self.threshold_time_ :
  340. self.event_attr_.reset()
  341. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  342. return EC.EC_FAILED
  343. LOGINFO(f"detected target leave, plan_uuid: {self.plan_uuid_}")
  344. return 0
  345. # 检查关键时间点, 获取消失时间
  346. def get_absence_time(self, device: Device, t: int=5):
  347. now = get_utc_time_s()
  348. # 查找最新的落在检测区域的目标
  349. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  350. if rtd_unit:
  351. return EC.EC_FAILED
  352. with self.lock_:
  353. if self.event_attr_.leave_ts_ == -1:
  354. self.event_attr_.leave_ts_ = now
  355. LOGINFO(f"detected target leave, plan_uuid: {self.plan_uuid_}")
  356. else:
  357. self.event_attr_.enter_ts_ = now
  358. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  359. return EC.EC_FAILED
  360. # 归并时间内,不认为事件结束
  361. if now - self.event_attr_.enter_ts_ < self.merge_time_:
  362. return EC.EC_FAILED
  363. self.event_attr_.absence_time_ = self.event_attr_.enter_ts_ - self.event_attr_.leave_ts_
  364. return self.event_attr_.absence_time_
  365. # 检查消失时间
  366. def check_absence_time(self, absence_time: int):
  367. with self.lock_:
  368. # 时间小于触发时间阈值,忽略并重置
  369. if absence_time < self.event_attr_.time_threshold_:
  370. self.event_attr_.reset()
  371. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  372. return EC.EC_FAILED
  373. LOGINFO(f"detected target enter, plan_uuid: {self.plan_uuid_}")
  374. return 0
  375. # 停留事件
  376. def handle_stay_detection(self):
  377. try:
  378. dev_id = self.dev_id_
  379. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  380. if not device:
  381. return
  382. # 获取停留时间
  383. stay_time = self.get_stay_time(device)
  384. if stay_time == EC.EC_FAILED:
  385. return
  386. # 检查停留时间
  387. if self.check_stay_time(stay_time) == EC.EC_FAILED:
  388. return
  389. # 构造事件
  390. # 入库
  391. info = {
  392. "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  393. "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  394. "stay_time": stay_time
  395. }
  396. event_uuid = str(uuid.uuid4())
  397. remark = {
  398. "sp_id": g_config.g_sys_conf["sp_id"]
  399. }
  400. params = {
  401. "dev_id": dev_id,
  402. "uuid": event_uuid,
  403. "plan_uuid": self.plan_uuid_,
  404. "event_type": self.event_type_,
  405. "info": json.dumps(info),
  406. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  407. "is_handle": 0,
  408. "create_time": get_bj_time_s(),
  409. "is_deleted": 0,
  410. "tenant_id": self.tenant_id_,
  411. "remark": json.dumps(remark)
  412. }
  413. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  414. # 通知
  415. linkage_action = {
  416. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  417. }
  418. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  419. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
  420. self.event_attr_.reset()
  421. except json.JSONDecodeError as e:
  422. tb_info = traceback.extract_tb(e.__traceback__)
  423. for frame in tb_info:
  424. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  425. except Exception as e:
  426. tb_info = traceback.extract_tb(e.__traceback__)
  427. for frame in tb_info:
  428. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  429. # 滞留事件
  430. def handle_retention_detection(self):
  431. try:
  432. dev_id = self.dev_id_
  433. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  434. if not device:
  435. return
  436. # 获取停留时间
  437. stay_time = self.get_stay_time(device)
  438. if stay_time == EC.EC_FAILED:
  439. return
  440. # 检查停留时间
  441. if self.check_stay_time(stay_time) == EC.EC_FAILED:
  442. return
  443. # 构造事件
  444. # 入库
  445. info = {
  446. "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  447. "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  448. "stay_time": stay_time
  449. }
  450. event_uuid = str(uuid.uuid4())
  451. remark = {
  452. "sp_id": g_config.g_sys_conf["sp_id"]
  453. }
  454. params = {
  455. "dev_id": dev_id,
  456. "uuid": event_uuid,
  457. "plan_uuid": self.plan_uuid_,
  458. "event_type": self.event_type_,
  459. "info": json.dumps(info),
  460. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  461. "is_handle": 0,
  462. "create_time": get_bj_time_s(),
  463. "is_deleted": 0,
  464. "tenant_id": self.tenant_id_,
  465. "remark": json.dumps(remark)
  466. }
  467. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  468. # 通知
  469. linkage_action = {
  470. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  471. }
  472. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  473. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
  474. self.event_attr_.reset()
  475. except json.JSONDecodeError as e:
  476. tb_info = traceback.extract_tb(e.__traceback__)
  477. for frame in tb_info:
  478. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  479. except Exception as e:
  480. tb_info = traceback.extract_tb(e.__traceback__)
  481. for frame in tb_info:
  482. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  483. # 如厕事件
  484. def handle_toileting_detection(self):
  485. try:
  486. dev_id = self.dev_id_
  487. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  488. if not device:
  489. return
  490. # 获取停留时间
  491. stay_time = self.get_stay_time(device)
  492. if stay_time == EC.EC_FAILED:
  493. return
  494. # 检查停留时间
  495. if self.check_stay_time(stay_time) == EC.EC_FAILED:
  496. return
  497. # 构造事件
  498. # 入库
  499. info = {
  500. "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  501. "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  502. "stay_time": stay_time
  503. }
  504. event_uuid = str(uuid.uuid4())
  505. remark = {
  506. "sp_id": g_config.g_sys_conf["sp_id"]
  507. }
  508. params = {
  509. "dev_id": dev_id,
  510. "uuid": event_uuid,
  511. "plan_uuid": self.plan_uuid_,
  512. "event_type": self.event_type_,
  513. "info": json.dumps(info),
  514. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  515. "is_handle": 0,
  516. "create_time": get_bj_time_s(),
  517. "is_deleted": 0,
  518. "tenant_id": self.tenant_id_,
  519. "remark": json.dumps(remark)
  520. }
  521. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  522. # 通知
  523. linkage_action = {
  524. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  525. }
  526. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  527. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
  528. self.event_attr_.reset()
  529. except json.JSONDecodeError as e:
  530. tb_info = traceback.extract_tb(e.__traceback__)
  531. for frame in tb_info:
  532. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  533. except Exception as e:
  534. tb_info = traceback.extract_tb(e.__traceback__)
  535. for frame in tb_info:
  536. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  537. # 如厕频次统计
  538. def handle_toileting_frequency(self):
  539. try:
  540. dev_id = self.dev_id_
  541. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  542. if not device:
  543. return
  544. start_dt, end_dt = helper.get_query_time_range(self.param_)
  545. params = {
  546. "dev_id" : self.dev_id_,
  547. "event_type": EventType.TOILETING_DETECTION.value,
  548. "start_dt": start_dt,
  549. "end_dt": end_dt
  550. }
  551. userdata = {
  552. "start_dt" : start_dt,
  553. "end_dt" : end_dt
  554. }
  555. db_req_que.put(DBRequest_Async(
  556. sql=sqls.sql_query_events_by_datetime,
  557. params=params,
  558. callback=self.cb_toileting_frequency,
  559. userdata=userdata))
  560. except json.JSONDecodeError as e:
  561. tb_info = traceback.extract_tb(e.__traceback__)
  562. for frame in tb_info:
  563. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  564. except Exception as e:
  565. tb_info = traceback.extract_tb(e.__traceback__)
  566. for frame in tb_info:
  567. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  568. # 夜间如厕频次统计
  569. def handle_night_toileting_frequency(self):
  570. try:
  571. dev_id = self.dev_id_
  572. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  573. if not device:
  574. return
  575. start_dt, end_dt = helper.get_query_time_range(self.param_)
  576. params = {
  577. "dev_id" : self.dev_id_,
  578. "event_type": EventType.TOILETING_DETECTION.value,
  579. "start_dt": start_dt,
  580. "end_dt": end_dt
  581. }
  582. userdata = {
  583. "start_dt" : start_dt,
  584. "end_dt" : end_dt
  585. }
  586. db_req_que.put(DBRequest_Async(
  587. sql=sqls.sql_query_events_by_datetime,
  588. params=params,
  589. callback=self.cb_night_toileting_frequency,
  590. userdata=userdata))
  591. except json.JSONDecodeError as e:
  592. tb_info = traceback.extract_tb(e.__traceback__)
  593. for frame in tb_info:
  594. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  595. except Exception as e:
  596. tb_info = traceback.extract_tb(e.__traceback__)
  597. for frame in tb_info:
  598. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  599. # 如厕频次异常
  600. def handle_toileting_frequency_abnormal(self):
  601. try:
  602. dev_id = self.dev_id_
  603. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  604. if not device:
  605. return
  606. start_dt, end_dt = helper.get_query_time_range(self.param_)
  607. params = {
  608. "dev_id" : self.dev_id_,
  609. "event_type": EventType.TOILETING_DETECTION.value,
  610. "start_dt": start_dt,
  611. "end_dt": end_dt
  612. }
  613. userdata = {
  614. "start_dt" : start_dt,
  615. "end_dt" : end_dt
  616. }
  617. db_req_que.put(DBRequest_Async(
  618. sql=sqls.sql_query_events_by_datetime,
  619. params=params,
  620. callback=self.cb_toileting_frequency_abnormal,
  621. userdata=userdata))
  622. except json.JSONDecodeError as e:
  623. tb_info = traceback.extract_tb(e.__traceback__)
  624. for frame in tb_info:
  625. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  626. except Exception as e:
  627. tb_info = traceback.extract_tb(e.__traceback__)
  628. for frame in tb_info:
  629. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  630. # 起夜异常
  631. def handle_night_toileting_frequency_abnormal(self):
  632. try:
  633. dev_id = self.dev_id_
  634. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  635. if not device:
  636. return
  637. start_dt, end_dt = helper.get_query_time_range(self.param_)
  638. params = {
  639. "dev_id" : self.dev_id_,
  640. "event_type": EventType.TOILETING_DETECTION.value,
  641. "start_dt": start_dt,
  642. "end_dt": end_dt
  643. }
  644. userdata = {
  645. "start_dt" : start_dt,
  646. "end_dt" : end_dt
  647. }
  648. db_req_que.put(DBRequest_Async(
  649. sql=sqls.sql_query_events_by_datetime,
  650. params=params,
  651. callback=self.cb_night_toileting_frequency_abnormal,
  652. userdata=userdata))
  653. except json.JSONDecodeError as e:
  654. tb_info = traceback.extract_tb(e.__traceback__)
  655. for frame in tb_info:
  656. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  657. except Exception as e:
  658. tb_info = traceback.extract_tb(e.__traceback__)
  659. for frame in tb_info:
  660. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  661. # 卫生间频次统计
  662. def handle_bathroom_stay_frequency(self):
  663. try:
  664. dev_id = self.dev_id_
  665. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  666. if not device:
  667. return
  668. start_dt, end_dt = helper.get_query_time_range(self.param_)
  669. params = {
  670. "dev_id" : self.dev_id_,
  671. "event_type": EventType.STAY_DETECTION.value,
  672. "start_dt": start_dt,
  673. "end_dt": end_dt
  674. }
  675. userdata = {
  676. "start_dt" : start_dt,
  677. "end_dt" : end_dt
  678. }
  679. db_req_que.put(DBRequest_Async(
  680. sql=sqls.sql_query_events_by_datetime,
  681. params=params,
  682. callback=self.cb_bathroom_stay_frequency,
  683. userdata=userdata))
  684. except json.JSONDecodeError as e:
  685. tb_info = traceback.extract_tb(e.__traceback__)
  686. for frame in tb_info:
  687. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  688. except Exception as e:
  689. tb_info = traceback.extract_tb(e.__traceback__)
  690. for frame in tb_info:
  691. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  692. # 异常消失
  693. def handle_target_absence(self):
  694. try:
  695. dev_id = self.dev_id_
  696. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  697. if not device:
  698. return
  699. # 获取消失时间
  700. absence_time = self.get_absence_time(device)
  701. if absence_time == EC.EC_FAILED:
  702. return
  703. # 检查消失时间
  704. if self.check_absence_time(absence_time) == EC.EC_FAILED:
  705. return
  706. # 构造事件
  707. # 入库
  708. info = {
  709. "start_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  710. "end_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  711. "absence_time": absence_time
  712. }
  713. event_uuid = str(uuid.uuid4())
  714. remark = {
  715. "sp_id": g_config.g_sys_conf["sp_id"]
  716. }
  717. params = {
  718. "dev_id": dev_id,
  719. "uuid": event_uuid,
  720. "plan_uuid": self.plan_uuid_,
  721. "event_type": self.event_type_,
  722. "info": json.dumps(info),
  723. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  724. "is_handle": 0,
  725. "create_time": get_bj_time_s(),
  726. "is_deleted": 0,
  727. "tenant_id": self.tenant_id_,
  728. "remark": json.dumps(remark)
  729. }
  730. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  731. # 通知
  732. linkage_action = {
  733. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  734. }
  735. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  736. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, absence_time: {absence_time}, dev: {dev_id}, plan: {self.plan_uuid_}")
  737. self.event_attr_.reset()
  738. except json.JSONDecodeError as e:
  739. tb_info = traceback.extract_tb(e.__traceback__)
  740. for frame in tb_info:
  741. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  742. except Exception as e:
  743. tb_info = traceback.extract_tb(e.__traceback__)
  744. for frame in tb_info:
  745. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  746. # 睡眠监测
  747. def handle_sleep_monitoring(self):
  748. # ---------------- 参数配置 ----------------
  749. NO_TARGET_TIMEOUT_S = 3 # 超过3秒无数据认为无目标
  750. NO_TARGET_MAX_COUNT = 3 # 连续3次无目标后确定离床
  751. MOTION_SMOOTH_WINDOW = 10 # 平滑窗口大小(取最近10帧计算平均)
  752. STAY_THRESHOLD_PEACEFUL = 0.05 # 静止阈值
  753. STAY_THRESHOLD_MICRO = 0.15 # 微动阈值
  754. LEAVE_BED_TS = 3 # 离床判定时间阈值
  755. try:
  756. dev_id = self.dev_id_
  757. device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  758. if not device or not self.rect_:
  759. return
  760. rtd_list = device.get_rtd_que_copy()
  761. if not rtd_list:
  762. return
  763. # 初始化状态机
  764. if not hasattr(self, "sleep_stat_"):
  765. self.sleep_stat_: str = "leave" # 当前睡眠状态
  766. self.update_sleep_stat_: str = "leave"
  767. self.avg_motion_: float = 0.0
  768. self.motion_stat_: str = "leave"
  769. self.avg_breath_: float = 0.0
  770. self.breathe_stat_: str = "r0"
  771. self.start_sleep_ts_: int = -1
  772. self.end_sleep_ts_: int = -1
  773. self.motion_window = deque(maxlen=MOTION_SMOOTH_WINDOW)
  774. self.sleep_segments_ = [] # [{"ts":xxx, "stat":xxx}, ...]
  775. self.last_pts_ = []
  776. self.last_update_ts_ = -1
  777. self.miss_target_count_ = 0
  778. self.last_leave_ts = -1 # 离床检测起点
  779. now_ts = get_utc_time_s()
  780. rtd_unit = rtd_list[-1]
  781. ts = rtd_unit["timestamp"]
  782. target_point = rtd_unit["target_point"]
  783. ## 1. 空间状态分析(motion)
  784. if now_ts - ts > NO_TARGET_TIMEOUT_S:
  785. # 长时间无数据 → 目标消失
  786. self.miss_target_count_ += 1
  787. if self.miss_target_count_ >= NO_TARGET_MAX_COUNT:
  788. motion = "leave"
  789. else:
  790. motion = self.motion_stat_
  791. else:
  792. self.miss_target_count_ = 0
  793. x, y, z, snr = target_point
  794. if not helper.is_point_in_rect(x, y, self.rect_):
  795. # 不在床上
  796. if self.last_leave_ts == -1:
  797. self.last_leave_ts = now_ts
  798. elif now_ts - self.last_leave_ts > LEAVE_BED_TS:
  799. motion = "leave"
  800. else:
  801. motion = "active"
  802. else:
  803. self.last_leave_ts = -1
  804. # 在床上,计算位移
  805. WINDOWS_S = 10
  806. recent_rtds = [r for r in rtd_list if now_ts - r["timestamp"] <= WINDOWS_S]
  807. if len(recent_rtds) < 2:
  808. motion = self.motion_stat_
  809. else:
  810. motions = []
  811. for i in range(1, len(recent_rtds)):
  812. x1, y1, z1, _ = recent_rtds[i - 1]["target_point"]
  813. x2, y2, z2, _ = recent_rtds[i]["target_point"]
  814. dist = ((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)**0.5
  815. motions.append(dist)
  816. if motions:
  817. avg_motion = sum(motions) / len(motions)
  818. self.motion_window.append(avg_motion)
  819. motion_smooth = sum(self.motion_window) / len(self.motion_window)
  820. else:
  821. motion_smooth = 0.0
  822. # 判定运动状态
  823. if motion_smooth < STAY_THRESHOLD_PEACEFUL:
  824. motion = "peaceful"
  825. elif motion_smooth < STAY_THRESHOLD_MICRO:
  826. motion = "micro"
  827. else:
  828. motion = "active"
  829. self.motion_stat_ = motion
  830. ## 2. 呼吸状态分析(breathe)
  831. BREATHE_WINDOWS_S = 5
  832. recent_breaths = [
  833. r.get("breath_rpm", 0.0)
  834. for r in rtd_list
  835. if (now_ts - r["timestamp"] <= BREATHE_WINDOWS_S)
  836. and isinstance(r.get("breath_rpm"), (int, float))
  837. ]
  838. if not recent_breaths:
  839. avg_breath = 0.0
  840. breathe_stat = "r0"
  841. else:
  842. avg_breath = sum(recent_breaths) / len(recent_breaths)
  843. if avg_breath < 8:
  844. breathe_stat = "r0"
  845. elif avg_breath < 12:
  846. breathe_stat = "r1"
  847. elif avg_breath < 18:
  848. breathe_stat = "r2"
  849. elif avg_breath < 25:
  850. breathe_stat = "r3"
  851. else:
  852. breathe_stat = "r4"
  853. self.avg_breath_ = avg_breath
  854. self.breathe_stat_ = breathe_stat
  855. ## 3. 睡眠状态分析(sleep)
  856. try:
  857. i_motion = EventAttr_SleepMonitoring.motion_stat.index(self.motion_stat_)
  858. i_breath = EventAttr_SleepMonitoring.breathe_stat.index(breathe_stat)
  859. self.update_sleep_stat_ = EventAttr_SleepMonitoring.sleep_stat[i_motion][i_breath]
  860. except ValueError:
  861. LOGERR(f"[SleepMonitoring] invalid motion/breath index")
  862. return
  863. ## 4. 记录阶段变化
  864. if (not self.sleep_segments_) or (self.update_sleep_stat_ != self.sleep_segments_[-1]["sleep_stat"]):
  865. sleep_node = {
  866. "ts": now_ts,
  867. "sleep_stat": self.update_sleep_stat_
  868. }
  869. self.sleep_segments_.append(sleep_node)
  870. LOGINFO(f"update sleep_stat, current: {self.update_sleep_stat_}")
  871. self.sleep_stat_ = self.update_sleep_stat_
  872. ## 5. 调试输出(可选)
  873. LOGINFO(f"[SleepMonitoring] {dev_id} motion={self.motion_stat_}, breath={self.avg_breath_:.1f}, sleep={self.sleep_stat_}")
  874. return
  875. except json.JSONDecodeError as e:
  876. tb_info = traceback.extract_tb(e.__traceback__)
  877. for frame in tb_info:
  878. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  879. except Exception as e:
  880. tb_info = traceback.extract_tb(e.__traceback__)
  881. for frame in tb_info:
  882. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  883. # 清理过期事件
  884. def handle_clear_expire_events(self):
  885. try:
  886. params = {
  887. "save_days": self.event_attr_.expire_range_
  888. }
  889. db_execute_async(sqls.sql_delete_expire_events, params=params)
  890. except json.JSONDecodeError as e:
  891. tb_info = traceback.extract_tb(e.__traceback__)
  892. for frame in tb_info:
  893. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  894. except Exception as e:
  895. tb_info = traceback.extract_tb(e.__traceback__)
  896. for frame in tb_info:
  897. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  898. # ========== 一些回调 ==========
  899. # 如厕频次统计回调
  900. def cb_toileting_frequency(self, result, userdata):
  901. try:
  902. if result:
  903. count = 0
  904. event_list = []
  905. for row in result:
  906. dev_id: str = row.get("dev_id")
  907. event_uuid: str = row.get("uuid")
  908. plan_uuid: str = row.get("plan_uuid")
  909. event_type: int = row.get("event_type")
  910. info: dict = json.loads(row["info"]) if row.get("info") else {}
  911. is_handle: str = row.get("is_handle")
  912. create_time: str = row.get("create_time")
  913. update_time: str = row.get("update_time")
  914. is_deleted: str = row.get("is_deleted")
  915. remark: str = row.get("remark")
  916. event_list.append(info)
  917. this_event_uuid = str(uuid.uuid4())
  918. last_info = {
  919. "start_time" : userdata["start_dt"],
  920. "end_time" : userdata["end_dt"],
  921. "count" : len(event_list),
  922. "event_list" : event_list
  923. }
  924. # 入库
  925. event_uuid = str(uuid.uuid4())
  926. remark = {
  927. "sp_id": g_config.g_sys_conf["sp_id"]
  928. }
  929. params = {
  930. "dev_id": dev_id,
  931. "uuid": this_event_uuid,
  932. "plan_uuid": self.plan_uuid_,
  933. "event_type": self.event_type_,
  934. "info": json.dumps(last_info, ensure_ascii=False),
  935. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  936. "is_handle": 0,
  937. "create_time": get_bj_time_s(),
  938. "is_deleted": 0,
  939. "tenant_id": self.tenant_id_,
  940. "remark": json.dumps(remark)
  941. }
  942. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  943. # 通知
  944. linkage_action = {
  945. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  946. }
  947. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  948. LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
  949. else:
  950. LOGDBG("cb_toileting_frequency, empty result")
  951. except json.JSONDecodeError as e:
  952. tb_info = traceback.extract_tb(e.__traceback__)
  953. for frame in tb_info:
  954. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  955. except Exception as e:
  956. tb_info = traceback.extract_tb(e.__traceback__)
  957. for frame in tb_info:
  958. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  959. return
  960. # 夜间如厕频次统计回调
  961. def cb_night_toileting_frequency(self, result, userdata):
  962. try:
  963. if result:
  964. count = 0
  965. event_list = []
  966. for row in result:
  967. dev_id: str = row.get("dev_id")
  968. event_uuid: str = row.get("uuid")
  969. plan_uuid: str = row.get("plan_uuid")
  970. event_type: int = row.get("event_type")
  971. info: dict = json.loads(row["info"]) if row.get("info") else {}
  972. is_handle: str = row.get("is_handle")
  973. create_time: str = row.get("create_time")
  974. update_time: str = row.get("update_time")
  975. is_deleted: str = row.get("is_deleted")
  976. remark: str = row.get("remark")
  977. event_list.append(info)
  978. this_event_uuid = str(uuid.uuid4())
  979. last_info = {
  980. "start_time" : userdata["start_dt"],
  981. "end_time" : userdata["end_dt"],
  982. "count" : len(event_list),
  983. "event_list" : event_list
  984. }
  985. # 入库
  986. event_uuid = str(uuid.uuid4())
  987. remark = {
  988. "sp_id": g_config.g_sys_conf["sp_id"]
  989. }
  990. params = {
  991. "dev_id": dev_id,
  992. "uuid": this_event_uuid,
  993. "plan_uuid": self.plan_uuid_,
  994. "event_type": self.event_type_,
  995. "info": json.dumps(last_info, ensure_ascii=False),
  996. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  997. "is_handle": 0,
  998. "create_time": get_bj_time_s(),
  999. "is_deleted": 0,
  1000. "tenant_id": self.tenant_id_,
  1001. "remark": json.dumps(remark)
  1002. }
  1003. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  1004. # 通知
  1005. linkage_action = {
  1006. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  1007. }
  1008. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  1009. LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
  1010. else:
  1011. LOGDBG("cb_night_toileting_frequency, empty result")
  1012. except json.JSONDecodeError as e:
  1013. tb_info = traceback.extract_tb(e.__traceback__)
  1014. for frame in tb_info:
  1015. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  1016. except Exception as e:
  1017. tb_info = traceback.extract_tb(e.__traceback__)
  1018. for frame in tb_info:
  1019. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  1020. return
  1021. # 如厕频次异常回调
  1022. def cb_toileting_frequency_abnormal(self, result, userdata):
  1023. try:
  1024. if result:
  1025. count = 0
  1026. event_list = []
  1027. for row in result:
  1028. dev_id: str = row.get("dev_id")
  1029. event_uuid: str = row.get("uuid")
  1030. plan_uuid: str = row.get("plan_uuid")
  1031. event_type: int = row.get("event_type")
  1032. info: dict = json.loads(row["info"]) if row.get("info") else {}
  1033. is_handle: str = row.get("is_handle")
  1034. create_time: str = row.get("create_time")
  1035. update_time: str = row.get("update_time")
  1036. is_deleted: str = row.get("is_deleted")
  1037. remark: str = row.get("remark")
  1038. event_list.append(info)
  1039. if len(event_list) < self.event_attr_.threshold_count_:
  1040. return
  1041. this_event_uuid = str(uuid.uuid4())
  1042. last_info = {
  1043. "start_time" : userdata["start_dt"],
  1044. "end_time" : userdata["end_dt"],
  1045. "count" : len(event_list),
  1046. "event_list" : event_list
  1047. }
  1048. # 入库
  1049. event_uuid = str(uuid.uuid4())
  1050. remark = {
  1051. "sp_id": g_config.g_sys_conf["sp_id"]
  1052. }
  1053. params = {
  1054. "dev_id": dev_id,
  1055. "uuid": this_event_uuid,
  1056. "plan_uuid": self.plan_uuid_,
  1057. "event_type": self.event_type_,
  1058. "info": json.dumps(last_info, ensure_ascii=False),
  1059. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  1060. "is_handle": 0,
  1061. "create_time": get_bj_time_s(),
  1062. "is_deleted": 0,
  1063. "tenant_id": self.tenant_id_,
  1064. "remark": json.dumps(remark)
  1065. }
  1066. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  1067. # 通知
  1068. linkage_action = {
  1069. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  1070. }
  1071. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  1072. LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
  1073. else:
  1074. LOGDBG("cb_toileting_frequency_abnormal, empty result")
  1075. except json.JSONDecodeError as e:
  1076. tb_info = traceback.extract_tb(e.__traceback__)
  1077. for frame in tb_info:
  1078. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  1079. except Exception as e:
  1080. tb_info = traceback.extract_tb(e.__traceback__)
  1081. for frame in tb_info:
  1082. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  1083. return
  1084. # 起夜异常回调
  1085. def cb_night_toileting_frequency_abnormal(self, result, userdata):
  1086. try:
  1087. if result:
  1088. count = 0
  1089. event_list = []
  1090. for row in result:
  1091. dev_id: str = row.get("dev_id")
  1092. event_uuid: str = row.get("uuid")
  1093. plan_uuid: str = row.get("plan_uuid")
  1094. event_type: int = row.get("event_type")
  1095. info: dict = json.loads(row["info"]) if row.get("info") else {}
  1096. is_handle: str = row.get("is_handle")
  1097. create_time: str = row.get("create_time")
  1098. update_time: str = row.get("update_time")
  1099. is_deleted: str = row.get("is_deleted")
  1100. remark: str = row.get("remark")
  1101. event_list.append(info)
  1102. if len(event_list) < self.event_attr_.threshold_count_:
  1103. return
  1104. this_event_uuid = str(uuid.uuid4())
  1105. last_info = {
  1106. "start_time" : userdata["start_dt"],
  1107. "end_time" : userdata["end_dt"],
  1108. "count" : len(event_list),
  1109. "event_list" : event_list
  1110. }
  1111. # 入库
  1112. event_uuid = str(uuid.uuid4())
  1113. remark = {
  1114. "sp_id": g_config.g_sys_conf["sp_id"]
  1115. }
  1116. params = {
  1117. "dev_id": dev_id,
  1118. "uuid": this_event_uuid,
  1119. "plan_uuid": self.plan_uuid_,
  1120. "event_type": self.event_type_,
  1121. "info": json.dumps(last_info, ensure_ascii=False),
  1122. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  1123. "is_handle": 0,
  1124. "create_time": get_bj_time_s(),
  1125. "is_deleted": 0,
  1126. "tenant_id": self.tenant_id_,
  1127. "remark": json.dumps(remark)
  1128. }
  1129. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  1130. # 通知
  1131. linkage_action = {
  1132. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  1133. }
  1134. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  1135. LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
  1136. else:
  1137. LOGDBG("cb_night_toileting_frequency_abnormal, empty result")
  1138. except json.JSONDecodeError as e:
  1139. tb_info = traceback.extract_tb(e.__traceback__)
  1140. for frame in tb_info:
  1141. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  1142. except Exception as e:
  1143. tb_info = traceback.extract_tb(e.__traceback__)
  1144. for frame in tb_info:
  1145. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  1146. return
  1147. # 卫生间频次统计回调
  1148. def cb_bathroom_stay_frequency(self, result, userdata):
  1149. try:
  1150. if result:
  1151. count = 0
  1152. event_list = []
  1153. for row in result:
  1154. dev_id: str = row.get("dev_id")
  1155. event_uuid: str = row.get("uuid")
  1156. plan_uuid: str = row.get("plan_uuid")
  1157. event_type: int = row.get("event_type")
  1158. info: dict = json.loads(row["info"]) if row.get("info") else {}
  1159. is_handle: str = row.get("is_handle")
  1160. create_time: str = row.get("create_time")
  1161. update_time: str = row.get("update_time")
  1162. is_deleted: str = row.get("is_deleted")
  1163. remark: str = row.get("remark")
  1164. event_list.append(info)
  1165. this_event_uuid = str(uuid.uuid4())
  1166. last_info = {
  1167. "start_time" : userdata["start_dt"],
  1168. "end_time" : userdata["end_dt"],
  1169. "count" : len(event_list),
  1170. "event_list" : event_list
  1171. }
  1172. # 入库
  1173. event_uuid = str(uuid.uuid4())
  1174. remark = {
  1175. "sp_id": g_config.g_sys_conf["sp_id"]
  1176. }
  1177. params = {
  1178. "dev_id": dev_id,
  1179. "uuid": this_event_uuid,
  1180. "plan_uuid": self.plan_uuid_,
  1181. "event_type": self.event_type_,
  1182. "info": json.dumps(last_info, ensure_ascii=False),
  1183. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  1184. "is_handle": 0,
  1185. "create_time": get_bj_time_s(),
  1186. "is_deleted": 0,
  1187. "tenant_id": self.tenant_id_,
  1188. "remark": json.dumps(remark)
  1189. }
  1190. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  1191. # 通知
  1192. linkage_action = {
  1193. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  1194. }
  1195. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  1196. LOGINFO(f"new event: {event_desc_map[self.event_type_]}")
  1197. else:
  1198. LOGDBG("cb_bathroom_stay_frequency, empty result")
  1199. except json.JSONDecodeError as e:
  1200. tb_info = traceback.extract_tb(e.__traceback__)
  1201. for frame in tb_info:
  1202. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  1203. except Exception as e:
  1204. tb_info = traceback.extract_tb(e.__traceback__)
  1205. for frame in tb_info:
  1206. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  1207. return