alarm_plan.py 46 KB


  1. from typing import List, Tuple, Optional
  2. from datetime import datetime, time, date
  3. from threading import Thread, Lock
  4. from enum import Enum
  5. import uuid
  6. import json
  7. import traceback
  8. from datetime import datetime, timezone, timedelta
  9. from common.sys_comm import (
  10. LOGDBG, LOGINFO, LOGWARN, LOGERR,
  11. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  12. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  13. )
  14. from core.time_plan import TimePlan
  15. from core.event_type import EventType, event_desc_map
  16. import core.g_LAS as g_las
  17. import core.alarm_plan_helper as helper
  18. from core.linkage_action import LinkageAction
  19. from db.db_process import (
  20. db_req_que, DBRequest_Async
  21. )
  22. from db.db_process import (
  23. db_execute_sync, db_execute_async
  24. )
  25. import db.db_sqls as sqls
  26. import mqtt.mqtt_send as mqtt_send
  27. import device.dev_mng as g_Dev
  28. from device.dev_mng import (
  29. Device, g_dev_mgr
  30. )
  31. class EventAttr_Base:
  32. def __init__(self):
  33. return
  34. # 事件属性 事件事件
  35. class EventAttr_StayDetection(EventAttr_Base):
  36. def __init__(self, event_type):
  37. self.event_type_ = event_type
  38. self.enter_ts_: int = -1 # 进入时间(s)
  39. self.leave_ts_: int = -1 # 离开时间(s)
  40. self.stay_time_: int = -1 # 停留时长(s)
  41. return
  42. def reset(self):
  43. self.enter_ts_ = -1
  44. self.leave_ts_ = -1
  45. self.stay_time_ = -1
  46. # 事件属性 滞留事件
  47. class EventAttr_RetentionDetection(EventAttr_Base):
  48. def __init__(self, event_type):
  49. self.event_type_ = event_type
  50. self.enter_ts_: int = -1 # 进入时间(s)
  51. self.leave_ts_: int = -1 # 离开时间(s)
  52. self.stay_time_: int = -1 # 停留时长(s)
  53. return
  54. def reset(self):
  55. self.enter_ts_ = -1
  56. self.leave_ts_ = -1
  57. self.stay_time_ = -1
  58. # 事件属性 如厕事件
  59. class EventAttr_ToiletingDetection(EventAttr_Base):
  60. def __init__(self, event_type):
  61. self.event_type_ = event_type
  62. self.enter_ts_: int = -1 # 进入时间(ms)
  63. self.leave_ts_: int = -1 # 离开时间(ms)
  64. self.stay_time_: int = -1 # 停留时长(ms)
  65. return
  66. def reset(self):
  67. self.enter_ts_ = -1
  68. self.leave_ts_ = -1
  69. self.stay_time_ = -1
  70. # 事件属性 如厕频次统计
  71. class EventAttr_ToiletingFrequency(EventAttr_Base):
  72. def __init__(self, event_type):
  73. self.event_type_ = event_type
  74. self.count_: int = 0 # 统计次数
  75. self.event_list: list = []
  76. return
  77. def reset(self):
  78. self.count_ = 0
  79. self.event_list = []
  80. # 事件属性 夜间如厕频次统计
  81. class EventAttr_NightToiletingFrequency(EventAttr_Base):
  82. def __init__(self, event_type):
  83. self.event_type_ = event_type
  84. self.count_: int = 0 # 统计次数
  85. self.event_list: list = []
  86. return
  87. def reset(self):
  88. self.count_ = 0
  89. self.event_list = []
  90. # 事件属性 如厕频次异常
  91. class EventAttr_ToiletingFrequencyAbnormal(EventAttr_Base):
  92. def __init__(self, event_type):
  93. self.event_type_ = event_type
  94. self.count_: int = 0 # 统计次数
  95. self.threshold_count_: int = 0 # 异常阈值
  96. self.event_list: list = []
  97. return
  98. def reset(self):
  99. self.count_ = 0
  100. self.event_list = []
  101. # 事件属性 起夜异常
  102. class EventAttr_NightToiletingFrequencyAbnormal(EventAttr_Base):
  103. def __init__(self, event_type):
  104. self.event_type_ = event_type
  105. self.count_: int = 0 # 统计次数
  106. self.threshold_count_: int = 0 # 异常阈值
  107. self.event_list: list = []
  108. return
  109. def reset(self):
  110. self.count_ = 0
  111. self.event_list = []
  112. # 事件属性 卫生间频次统计
  113. class EventAttr_BathroomStayFrequency(EventAttr_Base):
  114. def __init__(self, event_type):
  115. self.event_type_ = event_type
  116. self.count_: int = 0 # 统计次数
  117. self.event_list: list = []
  118. return
  119. def reset(self):
  120. self.count_ = 0
  121. self.event_list = []
  122. # 事件属性 异常消失
  123. class EventAttr_TargetAbsence(EventAttr_Base):
  124. def __init__(self, event_type):
  125. self.event_type_ = event_type
  126. self.leave_ts_: int = -1 # 离开时间(ms)
  127. self.enter_ts_: int = -1 # 进入时间(ms)
  128. self.absence_time_: int = -1 # 消失时长(ms)
  129. self.time_threshold_: int = 300 # 触发消失时间阈值(ms)
  130. return
  131. def reset(self):
  132. self.leave_ts_ = -1
  133. self.enter_ts_ = -1
  134. self.absence_time_ = -1
  135. # 事件属性 清理过期事件(无用)
  136. class EventAttr_CleanExpiredEvents(EventAttr_Base):
  137. def __init__(self, event_type):
  138. self.event_type_ = event_type
  139. self.leave_ts_: int = -1 # 离开时间(ms)
  140. self.enter_ts_: int = -1 # 进入时间(ms)
  141. self.absence_time_: int = -1 # 消失时长(ms)
  142. self.time_threshold_: int = 300 # 触发消失时间阈值(ms)
  143. return
  144. def reset(self):
  145. self.leave_ts_ = -1
  146. self.enter_ts_ = -1
  147. self.absen
  148. # 事件属性表
  149. event_attr_map = {
  150. EventType.STAY_DETECTION.value : EventAttr_StayDetection,
  151. EventType.RETENTION_DETECTION.value : EventAttr_RetentionDetection,
  152. EventType.TOILETING_DETECTION.value : EventAttr_ToiletingDetection,
  153. EventType.TOILETING_FREQUENCY.value : EventAttr_ToiletingFrequency,
  154. EventType.NIGHT_TOILETING_FREQUENCY.value : EventAttr_NightToiletingFrequency,
  155. EventType.TOILETING_FREQUENCY_ABNORMAL.value : EventAttr_ToiletingFrequencyAbnormal,
  156. EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal,
  157. EventType.BATHROOM_STAY_FREQUENCY.value : EventAttr_BathroomStayFrequency,
  158. EventType.TARGET_ABSENCE.value : EventAttr_TargetAbsence,
  159. EventType.CLEAN_EXPIRED_EVENTS.value : EventAttr_CleanExpiredEvents,
  160. }
  161. class Cron:
  162. def __init__(self, h, m, s):
  163. self.h_ = None
  164. self.m_ = None
  165. self.s_ = None
  166. class AlarmPlan:
  167. def __init__(self,
  168. plan_uuid: str,
  169. name: str,
  170. dev_id: str,
  171. dev_name: str,
  172. enable: bool,
  173. time_plan: TimePlan,
  174. rect: list,
  175. event_type: int,
  176. threshold_time: int,
  177. merge_time: int,
  178. param: dict,
  179. cron: Optional[dict] = None,
  180. linkage_action: LinkageAction = LinkageAction(),
  181. tenant_id:int = 0
  182. ):
  183. self.lock_ = Lock()
  184. self.plan_uuid_ = plan_uuid # 计划id
  185. self.name_ = name # 计划名称
  186. self.dev_id_ = dev_id # 设备id
  187. self.dev_name_ = dev_name # 设备名称
  188. self.enable_ = enable # 是否启用
  189. self.time_plan_ = time_plan # 时间计划
  190. self.param_ = param # 参数
  191. self.linkage_action_ = linkage_action # 联动动作
  192. self.tenant_id_ = tenant_id # 租户id
  193. # 维护状态(根据TimePlanu判断)
  194. self.status_ = 0 # 0未激活,1激活,-1过期
  195. self.status_update_ts_ = -1 # 状态更新时间,初始值为-1
  196. # 事件触发参数
  197. self.rect_ = rect # 检测区域 [left, top, width, height]
  198. self.threshold_time_ = threshold_time # 触发时间阈值
  199. self.merge_time_ = merge_time # 归并时间窗口
  200. self.event_type_ = event_type # 事件类型
  201. self.event_attr_ = self.init_event_attr() # 事件属性
  202. if self.event_attr_ is None:
  203. raise ValueError(f"Invalid event_type: {event_type}")
  204. # 计划任务的开始时间
  205. self.cron_ = cron # {“hour": 7, "minute": 0}
  206. def execute(self):
  207. if self.status_ != 1:
  208. return
  209. g_las.g_alarm_plan_disp.dispatch(self.event_type_, self)
  210. # 初始化事件属性
  211. def init_event_attr(self):
  212. try:
  213. event_cls = event_attr_map.get(self.event_type_)
  214. if event_cls is None:
  215. return None
  216. event_attr = event_cls(self.event_type_)
  217. if ((self.event_type_ == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
  218. (self.event_type_ == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value)):
  219. event_attr.threshold_count_ = int(self.param_.get("count", 0))
  220. if ((self.event_type_ == EventType.TARGET_ABSENCE.value)):
  221. event_attr.time_threshold_ = int(self.param_.get("time_threshold", 0))
  222. return event_attr
  223. except json.JSONDecodeError as e:
  224. tb_info = traceback.extract_tb(e.__traceback__)
  225. for frame in tb_info:
  226. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  227. except Exception as e:
  228. tb_info = traceback.extract_tb(e.__traceback__)
  229. for frame in tb_info:
  230. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  231. # 更新激活状态
  232. def update_status(self, now: Optional[datetime] = None) -> None:
  233. now = now or datetime.now()
  234. old_status = self.status_
  235. if not self.enable_:
  236. self.status_ = 0
  237. else:
  238. now_fmt = now.strftime("%Y-%m-%d")
  239. # 过期
  240. if now_fmt > self.time_plan_.stop_date_:
  241. self.status_ = -1
  242. elif now_fmt < self.time_plan_.start_date_:
  243. self.status_ = 0
  244. elif self.time_plan_.is_active_now(now):
  245. self.status_ = 1
  246. else:
  247. self.status_ = 0
  248. if self.status_ != old_status:
  249. self.status_update_ts = int(now.timestamp())
  250. LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}")
  251. def is_point_in_rect(self, x:float, y:float, rect:list) -> bool:
  252. rx, ry, rw, rh = rect
  253. x_min = min(rx, rx + rw)
  254. x_max = max(rx, rx + rw)
  255. y_min = min(ry, ry - rh)
  256. y_max = max(ry, ry - rh)
  257. bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max
  258. return bRet
  259. # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit
  260. def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=1):
  261. now_s = now if now else get_utc_time_s()
  262. rtd_que_copy = device.get_rtd_que_copy()
  263. with self.lock_:
  264. for rtd_unit in reversed(rtd_que_copy): # 倒序扫描
  265. ts_s = int(rtd_unit.get("timestamp", 0))
  266. if now_s - ts_s > t:
  267. break # 已经超过 t 秒,可以直接结束
  268. # 检查点是否在区域内
  269. for pt in rtd_unit.get("target_point", []):
  270. if len(pt) >= 2:
  271. x, y = pt[0], pt[1]
  272. if self.is_point_in_rect(x, y, rect):
  273. return rtd_unit
  274. return None
  275. # 停留事件
  276. def handle_stay_detection(self):
  277. try:
  278. dev_id = self.dev_id_
  279. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  280. if not device:
  281. return
  282. now = get_utc_time_s()
  283. # 查找最新的落在检测区域的目标
  284. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  285. if rtd_unit:
  286. timestamp = rtd_unit["timestamp"]
  287. pose = rtd_unit["pose"]
  288. target_point = rtd_unit["target_point"]
  289. if self.event_attr_.enter_ts_ == -1:
  290. self.event_attr_.enter_ts_ = timestamp
  291. else:
  292. self.event_attr_.leave_ts_ = timestamp
  293. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  294. return
  295. # 归并时间内,不认为事件结束
  296. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  297. return
  298. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  299. stay_time =self.event_attr_.stay_time_
  300. # 时间小于触发时间阈值,忽略并重置
  301. if stay_time < self.threshold_time_ :
  302. self.event_attr_.reset()
  303. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  304. return
  305. # 构造事件
  306. # 入库
  307. info = {
  308. "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  309. "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  310. "stay_time": stay_time
  311. }
  312. event_uuid = str(uuid.uuid4())
  313. params = {
  314. "dev_id": dev_id,
  315. "uuid": event_uuid,
  316. "plan_uuid": self.plan_uuid_,
  317. "event_type": self.event_type_,
  318. "info": json.dumps(info),
  319. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  320. "is_handle": 0,
  321. "create_time": get_bj_time_s(),
  322. "is_deleted": 0,
  323. "tenant_id": self.tenant_id_,
  324. "remark": json.dumps({}, ensure_ascii=False)
  325. }
  326. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  327. # 通知
  328. linkage_action = {
  329. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  330. }
  331. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  332. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  333. self.event_attr_.reset()
  334. except json.JSONDecodeError as e:
  335. tb_info = traceback.extract_tb(e.__traceback__)
  336. for frame in tb_info:
  337. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  338. except Exception as e:
  339. tb_info = traceback.extract_tb(e.__traceback__)
  340. for frame in tb_info:
  341. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  342. # 滞留事件
  343. def handle_retention_detection(self):
  344. try:
  345. dev_id = self.dev_id_
  346. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  347. if not device:
  348. return
  349. now = get_utc_time_s()
  350. # 查找最新的落在检测区域的目标
  351. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  352. if rtd_unit:
  353. timestamp = rtd_unit["timestamp"]
  354. pose = rtd_unit["pose"]
  355. target_point = rtd_unit["target_point"]
  356. if self.event_attr_.enter_ts_ == -1:
  357. self.event_attr_.enter_ts_ = timestamp
  358. else:
  359. self.event_attr_.leave_ts_ = timestamp
  360. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  361. return
  362. # 归并时间内,不认为事件结束
  363. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  364. return
  365. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  366. stay_time =self.event_attr_.stay_time_
  367. # 时间小于触发时间阈值,忽略并重置
  368. if stay_time < self.threshold_time_ :
  369. self.event_attr_.reset()
  370. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  371. return
  372. # 构造事件
  373. # 入库
  374. info = {
  375. "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  376. "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  377. "stay_time": stay_time
  378. }
  379. event_uuid = str(uuid.uuid4())
  380. params = {
  381. "dev_id": dev_id,
  382. "uuid": event_uuid,
  383. "plan_uuid": self.plan_uuid_,
  384. "event_type": self.event_type_,
  385. "info": json.dumps(info),
  386. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  387. "is_handle": 0,
  388. "create_time": get_bj_time_s(),
  389. "is_deleted": 0,
  390. "tenant_id": self.tenant_id_,
  391. "remark": json.dumps({}, ensure_ascii=False)
  392. }
  393. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  394. # 通知
  395. linkage_action = {
  396. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  397. }
  398. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  399. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  400. self.event_attr_.reset()
  401. except json.JSONDecodeError as e:
  402. tb_info = traceback.extract_tb(e.__traceback__)
  403. for frame in tb_info:
  404. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  405. except Exception as e:
  406. tb_info = traceback.extract_tb(e.__traceback__)
  407. for frame in tb_info:
  408. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  409. # 如厕事件
  410. def handle_toileting_detection(self):
  411. try:
  412. dev_id = self.dev_id_
  413. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  414. if not device:
  415. return
  416. now = get_utc_time_s()
  417. # 查找最新的落在检测区域的目标
  418. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  419. if rtd_unit:
  420. timestamp = rtd_unit["timestamp"]
  421. pose = rtd_unit["pose"]
  422. target_point = rtd_unit["target_point"]
  423. if self.event_attr_.enter_ts_ == -1:
  424. self.event_attr_.enter_ts_ = timestamp
  425. else:
  426. self.event_attr_.leave_ts_ = timestamp
  427. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  428. return
  429. # 归并时间内,不认为事件结束
  430. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  431. return
  432. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  433. stay_time =self.event_attr_.stay_time_
  434. # 时间小于触发时间阈值,忽略并重置
  435. if stay_time < self.threshold_time_ :
  436. self.event_attr_.reset()
  437. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  438. return
  439. # 构造事件
  440. # 入库
  441. info = {
  442. "start_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  443. "end_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  444. "stay_time": stay_time
  445. }
  446. event_uuid = str(uuid.uuid4())
  447. params = {
  448. "dev_id": dev_id,
  449. "uuid": event_uuid,
  450. "plan_uuid": self.plan_uuid_,
  451. "event_type": self.event_type_,
  452. "info": json.dumps(info),
  453. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  454. "is_handle": 0,
  455. "create_time": get_bj_time_s(),
  456. "is_deleted": 0,
  457. "tenant_id": self.tenant_id_,
  458. "remark": json.dumps({}, ensure_ascii=False)
  459. }
  460. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  461. # 通知
  462. linkage_action = {
  463. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  464. }
  465. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  466. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  467. self.event_attr_.reset()
  468. except json.JSONDecodeError as e:
  469. tb_info = traceback.extract_tb(e.__traceback__)
  470. for frame in tb_info:
  471. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  472. except Exception as e:
  473. tb_info = traceback.extract_tb(e.__traceback__)
  474. for frame in tb_info:
  475. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  476. # 如厕频次统计
  477. def handle_toileting_frequency(self):
  478. try:
  479. dev_id = self.dev_id_
  480. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  481. if not device:
  482. return
  483. start_dt, end_dt = helper.get_query_time_range(self.param_)
  484. params = {
  485. "dev_id" : self.dev_id_,
  486. "event_type": EventType.TOILETING_DETECTION.value,
  487. "start_dt": start_dt,
  488. "end_dt": end_dt
  489. }
  490. userdata = {
  491. "start_dt" : start_dt,
  492. "end_dt" : end_dt
  493. }
  494. db_req_que.put(DBRequest_Async(
  495. sql=sqls.sql_query_events_by_datetime,
  496. params=params,
  497. callback=self.cb_toileting_frequency,
  498. userdata=userdata))
  499. except json.JSONDecodeError as e:
  500. tb_info = traceback.extract_tb(e.__traceback__)
  501. for frame in tb_info:
  502. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  503. except Exception as e:
  504. tb_info = traceback.extract_tb(e.__traceback__)
  505. for frame in tb_info:
  506. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  507. # 夜间如厕频次统计
  508. def handle_night_toileting_frequency(self):
  509. try:
  510. dev_id = self.dev_id_
  511. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  512. if not device:
  513. return
  514. start_dt, end_dt = helper.get_query_time_range(self.param_)
  515. params = {
  516. "dev_id" : self.dev_id_,
  517. "event_type": EventType.TOILETING_DETECTION.value,
  518. "start_dt": start_dt,
  519. "end_dt": end_dt
  520. }
  521. userdata = {
  522. "start_dt" : start_dt,
  523. "end_dt" : end_dt
  524. }
  525. db_req_que.put(DBRequest_Async(
  526. sql=sqls.sql_query_events_by_datetime,
  527. params=params,
  528. callback=self.cb_night_toileting_frequency,
  529. userdata=userdata))
  530. except json.JSONDecodeError as e:
  531. tb_info = traceback.extract_tb(e.__traceback__)
  532. for frame in tb_info:
  533. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  534. except Exception as e:
  535. tb_info = traceback.extract_tb(e.__traceback__)
  536. for frame in tb_info:
  537. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  538. # 如厕频次异常
  539. def handle_toileting_frequency_abnormal(self):
  540. try:
  541. dev_id = self.dev_id_
  542. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  543. if not device:
  544. return
  545. start_dt, end_dt = helper.get_query_time_range(self.param_)
  546. params = {
  547. "dev_id" : self.dev_id_,
  548. "event_type": EventType.TOILETING_DETECTION.value,
  549. "start_dt": start_dt,
  550. "end_dt": end_dt
  551. }
  552. userdata = {
  553. "start_dt" : start_dt,
  554. "end_dt" : end_dt
  555. }
  556. db_req_que.put(DBRequest_Async(
  557. sql=sqls.sql_query_events_by_datetime,
  558. params=params,
  559. callback=self.cb_toileting_frequency_abnormal,
  560. userdata=userdata))
  561. except json.JSONDecodeError as e:
  562. tb_info = traceback.extract_tb(e.__traceback__)
  563. for frame in tb_info:
  564. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  565. except Exception as e:
  566. tb_info = traceback.extract_tb(e.__traceback__)
  567. for frame in tb_info:
  568. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  569. # 起夜异常
  570. def handle_night_toileting_frequency_abnormal(self):
  571. try:
  572. dev_id = self.dev_id_
  573. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  574. if not device:
  575. return
  576. start_dt, end_dt = helper.get_query_time_range(self.param_)
  577. params = {
  578. "dev_id" : self.dev_id_,
  579. "event_type": EventType.TOILETING_DETECTION.value,
  580. "start_dt": start_dt,
  581. "end_dt": end_dt
  582. }
  583. userdata = {
  584. "start_dt" : start_dt,
  585. "end_dt" : end_dt
  586. }
  587. db_req_que.put(DBRequest_Async(
  588. sql=sqls.sql_query_events_by_datetime,
  589. params=params,
  590. callback=self.cb_night_toileting_frequency_abnormal,
  591. userdata=userdata))
  592. except json.JSONDecodeError as e:
  593. tb_info = traceback.extract_tb(e.__traceback__)
  594. for frame in tb_info:
  595. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  596. except Exception as e:
  597. tb_info = traceback.extract_tb(e.__traceback__)
  598. for frame in tb_info:
  599. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  600. # 卫生间频次统计
  601. def handle_bathroom_stay_frequency(self):
  602. try:
  603. dev_id = self.dev_id_
  604. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  605. if not device:
  606. return
  607. start_dt, end_dt = helper.get_query_time_range(self.param_)
  608. params = {
  609. "dev_id" : self.dev_id_,
  610. "event_type": EventType.STAY_DETECTION.value,
  611. "start_dt": start_dt,
  612. "end_dt": end_dt
  613. }
  614. userdata = {
  615. "start_dt" : start_dt,
  616. "end_dt" : end_dt
  617. }
  618. db_req_que.put(DBRequest_Async(
  619. sql=sqls.sql_query_events_by_datetime,
  620. params=params,
  621. callback=self.cb_bathroom_stay_frequency,
  622. userdata=userdata))
  623. except json.JSONDecodeError as e:
  624. tb_info = traceback.extract_tb(e.__traceback__)
  625. for frame in tb_info:
  626. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  627. except Exception as e:
  628. tb_info = traceback.extract_tb(e.__traceback__)
  629. for frame in tb_info:
  630. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  631. # 异常消失
  632. def handle_target_absence(self):
  633. try:
  634. dev_id = self.dev_id_
  635. device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
  636. if not device:
  637. return
  638. now = get_utc_time_s()
  639. threshold: int = self.event_attr_.time_threshold_
  640. # 查找最新的落在检测区域的目标
  641. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  642. if rtd_unit:
  643. return
  644. timestamp = get_utc_time_s()
  645. if self.event_attr_.leave_ts_ == -1:
  646. self.event_attr_.leave_ts_ = timestamp
  647. else:
  648. self.event_attr_.enter_ts_ = timestamp
  649. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  650. return
  651. # 归并时间内,不认为事件结束
  652. if now - self.event_attr_.enter_ts_ < self.merge_time_:
  653. return
  654. self.event_attr_.absence_time_ = self.event_attr_.enter_ts_ - self.event_attr_.leave_ts_
  655. absence_time = self.event_attr_.absence_time_
  656. # 时间小于触发时间阈值,忽略并重置
  657. if absence_time < threshold :
  658. self.event_attr_.reset()
  659. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  660. return
  661. # 构造事件
  662. # 入库
  663. info = {
  664. "start_time": utc_to_bj_s(self.event_attr_.leave_ts_),
  665. "end_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  666. "absence_time": absence_time
  667. }
  668. event_uuid = str(uuid.uuid4())
  669. params = {
  670. "dev_id": dev_id,
  671. "uuid": event_uuid,
  672. "plan_uuid": self.plan_uuid_,
  673. "event_type": self.event_type_,
  674. "info": json.dumps(info),
  675. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  676. "is_handle": 0,
  677. "create_time": get_bj_time_s(),
  678. "is_deleted": 0,
  679. "tenant_id": self.tenant_id_,
  680. "remark": json.dumps({}, ensure_ascii=False)
  681. }
  682. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  683. # 通知
  684. linkage_action = {
  685. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  686. }
  687. mqtt_send.alarm_event(dev_id, self.dev_name_, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, linkage_action, "events")
  688. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, absence_time: {absence_time}")
  689. self.event_attr_.reset()
  690. except json.JSONDecodeError as e:
  691. tb_info = traceback.extract_tb(e.__traceback__)
  692. for frame in tb_info:
  693. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  694. except Exception as e:
  695. tb_info = traceback.extract_tb(e.__traceback__)
  696. for frame in tb_info:
  697. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  698. # 清理过期事件
  699. def handle_clear_expired_events(self):
  700. try:
  701. db_execute_async(sqls.sql_delete_expired_events)
  702. except json.JSONDecodeError as e:
  703. tb_info = traceback.extract_tb(e.__traceback__)
  704. for frame in tb_info:
  705. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  706. except Exception as e:
  707. tb_info = traceback.extract_tb(e.__traceback__)
  708. for frame in tb_info:
  709. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  710. # ========== 一些回调 ==========
  711. # 如厕频次统计回调
  712. def cb_toileting_frequency(self, result, userdata):
  713. try:
  714. if result:
  715. count = 0
  716. event_list = []
  717. for row in result:
  718. dev_id: str = row.get("dev_id")
  719. event_uuid: str = row.get("uuid")
  720. plan_uuid: str = row.get("plan_uuid")
  721. event_type: int = row.get("event_type")
  722. info: dict = json.loads(row["info"]) if row.get("info") else {}
  723. is_handle: str = row.get("is_handle")
  724. create_time: str = row.get("create_time")
  725. update_time: str = row.get("update_time")
  726. is_deleted: str = row.get("is_deleted")
  727. remark: str = row.get("remark")
  728. event_list.append(info)
  729. this_event_uuid = str(uuid.uuid4())
  730. last_info = {
  731. "start_time" : userdata["start_dt"],
  732. "end_time" : userdata["end_dt"],
  733. "count" : len(event_list),
  734. "event_list" : event_list
  735. }
  736. # 入库
  737. event_uuid = str(uuid.uuid4())
  738. params = {
  739. "dev_id": dev_id,
  740. "uuid": this_event_uuid,
  741. "plan_uuid": self.plan_uuid_,
  742. "event_type": self.event_type_,
  743. "info": json.dumps(last_info, ensure_ascii=False),
  744. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  745. "is_handle": 0,
  746. "create_time": get_bj_time_s(),
  747. "is_deleted": 0,
  748. "tenant_id": self.tenant_id_,
  749. "remark": json.dumps({}, ensure_ascii=False)
  750. }
  751. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  752. # 通知
  753. linkage_action = {
  754. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  755. }
  756. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  757. LOGINFO(f"cb_toileting_frequency succeed")
  758. else:
  759. LOGDBG("cb_toileting_frequency, empty result")
  760. except json.JSONDecodeError as e:
  761. tb_info = traceback.extract_tb(e.__traceback__)
  762. for frame in tb_info:
  763. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  764. except Exception as e:
  765. tb_info = traceback.extract_tb(e.__traceback__)
  766. for frame in tb_info:
  767. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  768. return
  769. # 夜间如厕频次统计回调
  770. def cb_night_toileting_frequency(self, result, userdata):
  771. try:
  772. if result:
  773. count = 0
  774. event_list = []
  775. for row in result:
  776. dev_id: str = row.get("dev_id")
  777. event_uuid: str = row.get("uuid")
  778. plan_uuid: str = row.get("plan_uuid")
  779. event_type: int = row.get("event_type")
  780. info: dict = json.loads(row["info"]) if row.get("info") else {}
  781. is_handle: str = row.get("is_handle")
  782. create_time: str = row.get("create_time")
  783. update_time: str = row.get("update_time")
  784. is_deleted: str = row.get("is_deleted")
  785. remark: str = row.get("remark")
  786. event_list.append(info)
  787. this_event_uuid = str(uuid.uuid4())
  788. last_info = {
  789. "start_time" : userdata["start_dt"],
  790. "end_time" : userdata["end_dt"],
  791. "count" : len(event_list),
  792. "event_list" : event_list
  793. }
  794. # 入库
  795. event_uuid = str(uuid.uuid4())
  796. params = {
  797. "dev_id": dev_id,
  798. "uuid": this_event_uuid,
  799. "plan_uuid": self.plan_uuid_,
  800. "event_type": self.event_type_,
  801. "info": json.dumps(last_info, ensure_ascii=False),
  802. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  803. "is_handle": 0,
  804. "create_time": get_bj_time_s(),
  805. "is_deleted": 0,
  806. "tenant_id": self.tenant_id_,
  807. "remark": json.dumps({}, ensure_ascii=False)
  808. }
  809. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  810. # 通知
  811. linkage_action = {
  812. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  813. }
  814. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  815. LOGINFO(f"cb_night_toileting_frequency succeed")
  816. else:
  817. LOGDBG("cb_night_toileting_frequency, empty result")
  818. except json.JSONDecodeError as e:
  819. tb_info = traceback.extract_tb(e.__traceback__)
  820. for frame in tb_info:
  821. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  822. except Exception as e:
  823. tb_info = traceback.extract_tb(e.__traceback__)
  824. for frame in tb_info:
  825. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  826. return
  827. # 如厕频次异常回调
  828. def cb_toileting_frequency_abnormal(self, result, userdata):
  829. try:
  830. if result:
  831. count = 0
  832. event_list = []
  833. for row in result:
  834. dev_id: str = row.get("dev_id")
  835. event_uuid: str = row.get("uuid")
  836. plan_uuid: str = row.get("plan_uuid")
  837. event_type: int = row.get("event_type")
  838. info: dict = json.loads(row["info"]) if row.get("info") else {}
  839. is_handle: str = row.get("is_handle")
  840. create_time: str = row.get("create_time")
  841. update_time: str = row.get("update_time")
  842. is_deleted: str = row.get("is_deleted")
  843. remark: str = row.get("remark")
  844. event_list.append(info)
  845. if len(event_list) < self.event_attr_.threshold_count_:
  846. return
  847. this_event_uuid = str(uuid.uuid4())
  848. last_info = {
  849. "start_time" : userdata["start_dt"],
  850. "end_time" : userdata["end_dt"],
  851. "count" : len(event_list),
  852. "event_list" : event_list
  853. }
  854. # 入库
  855. event_uuid = str(uuid.uuid4())
  856. params = {
  857. "dev_id": dev_id,
  858. "uuid": this_event_uuid,
  859. "plan_uuid": self.plan_uuid_,
  860. "event_type": self.event_type_,
  861. "info": json.dumps(last_info, ensure_ascii=False),
  862. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  863. "is_handle": 0,
  864. "create_time": get_bj_time_s(),
  865. "is_deleted": 0,
  866. "tenant_id": self.tenant_id_,
  867. "remark": json.dumps({}, ensure_ascii=False)
  868. }
  869. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  870. # 通知
  871. linkage_action = {
  872. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  873. }
  874. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  875. LOGINFO(f"cb_toileting_frequency_abnormal succeed")
  876. else:
  877. LOGDBG("cb_toileting_frequency_abnormal, empty result")
  878. except json.JSONDecodeError as e:
  879. tb_info = traceback.extract_tb(e.__traceback__)
  880. for frame in tb_info:
  881. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  882. except Exception as e:
  883. tb_info = traceback.extract_tb(e.__traceback__)
  884. for frame in tb_info:
  885. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  886. return
  887. # 起夜异常回调
  888. def cb_night_toileting_frequency_abnormal(self, result, userdata):
  889. try:
  890. if result:
  891. count = 0
  892. event_list = []
  893. for row in result:
  894. dev_id: str = row.get("dev_id")
  895. event_uuid: str = row.get("uuid")
  896. plan_uuid: str = row.get("plan_uuid")
  897. event_type: int = row.get("event_type")
  898. info: dict = json.loads(row["info"]) if row.get("info") else {}
  899. is_handle: str = row.get("is_handle")
  900. create_time: str = row.get("create_time")
  901. update_time: str = row.get("update_time")
  902. is_deleted: str = row.get("is_deleted")
  903. remark: str = row.get("remark")
  904. event_list.append(info)
  905. if len(event_list) < self.event_attr_.threshold_count_:
  906. return
  907. this_event_uuid = str(uuid.uuid4())
  908. last_info = {
  909. "start_time" : userdata["start_dt"],
  910. "end_time" : userdata["end_dt"],
  911. "count" : len(event_list),
  912. "event_list" : event_list
  913. }
  914. # 入库
  915. event_uuid = str(uuid.uuid4())
  916. params = {
  917. "dev_id": dev_id,
  918. "uuid": this_event_uuid,
  919. "plan_uuid": self.plan_uuid_,
  920. "event_type": self.event_type_,
  921. "info": json.dumps(last_info, ensure_ascii=False),
  922. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  923. "is_handle": 0,
  924. "create_time": get_bj_time_s(),
  925. "is_deleted": 0,
  926. "tenant_id": self.tenant_id_,
  927. "remark": json.dumps({}, ensure_ascii=False)
  928. }
  929. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  930. # 通知
  931. linkage_action = {
  932. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  933. }
  934. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  935. LOGINFO(f"cb_night_toileting_frequency_abnormal succeed")
  936. else:
  937. LOGDBG("cb_night_toileting_frequency_abnormal, empty result")
  938. except json.JSONDecodeError as e:
  939. tb_info = traceback.extract_tb(e.__traceback__)
  940. for frame in tb_info:
  941. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  942. except Exception as e:
  943. tb_info = traceback.extract_tb(e.__traceback__)
  944. for frame in tb_info:
  945. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  946. return
  947. # 卫生间频次统计回调
  948. def cb_bathroom_stay_frequency(self, result, userdata):
  949. try:
  950. if result:
  951. count = 0
  952. event_list = []
  953. for row in result:
  954. dev_id: str = row.get("dev_id")
  955. event_uuid: str = row.get("uuid")
  956. plan_uuid: str = row.get("plan_uuid")
  957. event_type: int = row.get("event_type")
  958. info: dict = json.loads(row["info"]) if row.get("info") else {}
  959. is_handle: str = row.get("is_handle")
  960. create_time: str = row.get("create_time")
  961. update_time: str = row.get("update_time")
  962. is_deleted: str = row.get("is_deleted")
  963. remark: str = row.get("remark")
  964. event_list.append(info)
  965. this_event_uuid = str(uuid.uuid4())
  966. last_info = {
  967. "start_time" : userdata["start_dt"],
  968. "end_time" : userdata["end_dt"],
  969. "count" : len(event_list),
  970. "event_list" : event_list
  971. }
  972. # 入库
  973. event_uuid = str(uuid.uuid4())
  974. params = {
  975. "dev_id": dev_id,
  976. "uuid": this_event_uuid,
  977. "plan_uuid": self.plan_uuid_,
  978. "event_type": self.event_type_,
  979. "info": json.dumps(last_info, ensure_ascii=False),
  980. "linkage_push_wechat_service": self.linkage_action_.wechat_service_,
  981. "is_handle": 0,
  982. "create_time": get_bj_time_s(),
  983. "is_deleted": 0,
  984. "tenant_id": self.tenant_id_,
  985. "remark": json.dumps({}, ensure_ascii=False)
  986. }
  987. db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
  988. # 通知
  989. linkage_action = {
  990. "linkage_push_wechat_service": self.linkage_action_.wechat_service_
  991. }
  992. mqtt_send.alarm_event(dev_id, self.dev_name_, this_event_uuid, plan_uuid, event_desc_map[self.event_type_], last_info, linkage_action, "events")
  993. LOGINFO(f"cb_bathroom_stay_frequency+- succeed")
  994. else:
  995. LOGDBG("cb_bathroom_stay_frequency, empty result")
  996. except json.JSONDecodeError as e:
  997. tb_info = traceback.extract_tb(e.__traceback__)
  998. for frame in tb_info:
  999. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  1000. except Exception as e:
  1001. tb_info = traceback.extract_tb(e.__traceback__)
  1002. for frame in tb_info:
  1003. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  1004. return