alarm_plan.py 17 KB


  1. from typing import List, Tuple, Optional
  2. from datetime import datetime, time, date
  3. from threading import Thread, Lock
  4. from enum import Enum
  5. import uuid
  6. import json
  7. import traceback
  8. from datetime import datetime, timezone, timedelta
  9. from common.sys_comm import (
  10. LOGDBG, LOGINFO, LOGWARN, LOGERR,
  11. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  12. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  13. )
  14. from core.time_plan import TimePlan
  15. from core.event_type import EventType, event_desc_map
  16. import core.g_LAS as g_las
  17. from device.dev_mng import (
  18. Device,
  19. dev_map_push, dev_map_pop, dev_map_find, dev_map_delete
  20. )
  21. from db.db_process import (
  22. db_req_que, DBRequest
  23. )
  24. import db.db_sqls as sqls
  25. import mqtt.mqtt_send as mqtt_send
  26. from device.dev_mng import g_dev_map, g_dev_map_lock
  27. class EventAttr_Base:
  28. def __init__(self):
  29. return
  30. # 事件属性 事件事件
  31. class EventAttr_StayDetection(EventAttr_Base):
  32. def __init__(self):
  33. self.enter_ts_: int = -1 # 进入时间(s)
  34. self.leave_ts_: int = -1 # 离开时间(s)
  35. self.stay_time_: int = -1 # 停留时长(s)
  36. return
  37. def reset(self):
  38. self.enter_ts_ = -1
  39. self.leave_ts_ = -1
  40. self.stay_time_ = -1
  41. # 事件属性 滞留事件
  42. class EventAttr_RetentionDetection(EventAttr_Base):
  43. def __init__(self):
  44. self.enter_ts_: int = -1 # 进入时间(s)
  45. self.leave_ts_: int = -1 # 离开时间(s)
  46. self.stay_time_: int = -1 # 停留时长(s)
  47. return
  48. def reset(self):
  49. self.enter_ts_ = -1
  50. self.leave_ts_ = -1
  51. self.stay_time_ = -1
  52. # 事件属性 如厕事件
  53. class EventAttr_ToiletingDetection(EventAttr_Base):
  54. def __init__(self):
  55. self.enter_ts_: int = -1 # 进入时间(ms)
  56. self.leave_ts_: int = -1 # 离开时间(ms)
  57. self.stay_time_: int = -1 # 停留时长(ms)
  58. return
  59. def reset(self):
  60. self.enter_ts_ = -1
  61. self.leave_ts_ = -1
  62. self.stay_time_ = -1
  63. class AlarmPlan:
  64. def __init__(self,
  65. plan_uuid: str,
  66. name: str,
  67. dev_id: str,
  68. enable: bool,
  69. time_plan: TimePlan,
  70. rect: list,
  71. event_type: int,
  72. threshold_time: int,
  73. merge_time: int
  74. ):
  75. self.lock_ = Lock()
  76. self.plan_uuid_ = plan_uuid # 计划id
  77. self.name_ = name # 计划名称
  78. self.dev_id_ = dev_id # 设备id
  79. self.enable_ = enable # 是否启用
  80. self.time_plan_ = time_plan # 时间计划
  81. # 维护状态(根据TimePlanu判断)
  82. self.status_ = 0 # 0未激活,1激活,-1过期
  83. self.status_update_ts_ = -1 # 状态更新时间,初始值为-1
  84. # 事件属性表
  85. self.event_attr_map = {
  86. EventType.STAY_DETECTION.value: EventAttr_StayDetection,
  87. EventType.RETENTION_DETECTION.value: EventAttr_RetentionDetection,
  88. EventType.TOILETING_DETECTION.value: EventAttr_ToiletingDetection
  89. }
  90. # 事件触发参数
  91. self.rect_ = rect # 检测区域 [left, top, width, height]
  92. self.threshold_time_ = threshold_time # 触发时间阈值
  93. self.merge_time_ = merge_time # 归并时间窗口
  94. self.event_type_ = event_type # 事件类型
  95. self.event_attr_ = self.init_event_attr() # 事件属性
  96. if self.event_attr_ is None:
  97. raise ValueError(f"Invalid event_type: {event_type}")
  98. def execute(self):
  99. if self.status_ != 1:
  100. return
  101. g_las.g_event_dispatcher.dispatch(self.event_type_, self)
  102. # 更新激活状态
  103. def update_status(self, now: Optional[datetime] = None) -> None:
  104. now = now or datetime.now()
  105. old_status = self.status_
  106. if not self.enable_:
  107. self.status_ = 0
  108. else:
  109. now_fmt = now.strftime("%Y-%m-%d")
  110. # 过期
  111. if now_fmt > self.time_plan_.stop_date_:
  112. self.status_ = -1
  113. elif now_fmt < self.time_plan_.start_date_:
  114. self.status_ = 0
  115. elif self.time_plan_.is_active_now(now):
  116. self.status_ = 1
  117. else:
  118. self.status_ = 0
  119. if self.status_ != old_status:
  120. self.status_update_ts = int(now.timestamp())
  121. LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}")
  122. def is_point_in_rect(self, x:float, y:float, rect:list) -> bool:
  123. rx, ry, rw, rh = rect
  124. x_min = min(rx, rx + rw)
  125. x_max = max(rx, rx + rw)
  126. y_min = min(ry, ry - rh)
  127. y_max = max(ry, ry - rh)
  128. bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max
  129. return bRet
  130. # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit
  131. def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=1):
  132. now_s = now if now else get_utc_time_s()
  133. rtd_que_copy = device.get_rtd_que_copy()
  134. with self.lock_:
  135. for rtd_unit in reversed(rtd_que_copy): # 倒序扫描
  136. ts_s = int(rtd_unit.get("timestamp", 0))
  137. if now_s - ts_s > t:
  138. break # 已经超过 t 秒,可以直接结束
  139. # 检查点是否在区域内
  140. for pt in rtd_unit.get("target_point", []):
  141. if len(pt) >= 2:
  142. x, y = pt[0], pt[1]
  143. if self.is_point_in_rect(x, y, rect):
  144. return rtd_unit
  145. return None
  146. # 初始化事件属性
  147. def init_event_attr(self):
  148. event_cls = self.event_attr_map.get(self.event_type_)
  149. if event_cls is None:
  150. return None
  151. event_attr = event_cls()
  152. return event_attr
  153. # 停留事件
  154. def handle_stay_detection(self):
  155. try:
  156. dev_id = self.dev_id_
  157. device:Device = dev_map_find(dev_id)
  158. if not device:
  159. return
  160. now = get_utc_time_s()
  161. # 查找最新的落在检测区域的目标
  162. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  163. if rtd_unit:
  164. timestamp = rtd_unit["timestamp"]
  165. pose = rtd_unit["pose"]
  166. target_point = rtd_unit["target_point"]
  167. if self.event_attr_.enter_ts_ == -1:
  168. self.event_attr_.enter_ts_ = timestamp
  169. else:
  170. self.event_attr_.leave_ts_ = timestamp
  171. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  172. return
  173. # 归并时间内,不认为事件结束
  174. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  175. return
  176. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  177. stay_time =self.event_attr_.stay_time_
  178. # 时间小于触发时间阈值,忽略并重置
  179. if stay_time < self.threshold_time_ :
  180. self.event_attr_.reset()
  181. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  182. return
  183. # 构造事件
  184. # 入库
  185. info = {
  186. "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  187. "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  188. "stay_time": stay_time
  189. }
  190. event_uuid = str(uuid.uuid4())
  191. params = {
  192. "dev_id": dev_id,
  193. "uuid": event_uuid,
  194. "plan_uuid": self.plan_uuid_,
  195. "event_type": event_desc_map[self.event_type_],
  196. "info": json.dumps(info),
  197. "is_handle": 0,
  198. "is_deleted": 0
  199. }
  200. db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
  201. # 通知
  202. mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
  203. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  204. self.event_attr_.reset()
  205. except json.JSONDecodeError as e:
  206. tb_info = traceback.extract_tb(e.__traceback__)
  207. for frame in tb_info:
  208. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  209. except Exception as e:
  210. tb_info = traceback.extract_tb(e.__traceback__)
  211. for frame in tb_info:
  212. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  213. # 滞留事件
  214. def handle_retention_detection(self):
  215. try:
  216. dev_id = self.dev_id_
  217. device:Device = dev_map_find(dev_id)
  218. if not device:
  219. return
  220. now = get_utc_time_s()
  221. # 查找最新的落在检测区域的目标
  222. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  223. if rtd_unit:
  224. timestamp = rtd_unit["timestamp"]
  225. pose = rtd_unit["pose"]
  226. target_point = rtd_unit["target_point"]
  227. if self.event_attr_.enter_ts_ == -1:
  228. self.event_attr_.enter_ts_ = timestamp
  229. else:
  230. self.event_attr_.leave_ts_ = timestamp
  231. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  232. return
  233. # 归并时间内,不认为事件结束
  234. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  235. return
  236. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  237. stay_time =self.event_attr_.stay_time_
  238. # 时间小于触发时间阈值,忽略并重置
  239. if stay_time < self.threshold_time_ :
  240. self.event_attr_.reset()
  241. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  242. return
  243. # 构造事件
  244. # 入库
  245. info = {
  246. "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  247. "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  248. "stay_time": stay_time
  249. }
  250. event_uuid = str(uuid.uuid4())
  251. params = {
  252. "dev_id": dev_id,
  253. "uuid": event_uuid,
  254. "plan_uuid": self.plan_uuid_,
  255. "event_type": event_desc_map[self.event_type_],
  256. "info": json.dumps(info),
  257. "is_handle": 0,
  258. "is_deleted": 0
  259. }
  260. db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
  261. # 通知
  262. mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
  263. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  264. self.event_attr_.reset()
  265. except json.JSONDecodeError as e:
  266. tb_info = traceback.extract_tb(e.__traceback__)
  267. for frame in tb_info:
  268. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  269. except Exception as e:
  270. tb_info = traceback.extract_tb(e.__traceback__)
  271. for frame in tb_info:
  272. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  273. # 如厕事件
  274. def handle_toileting_detection(self):
  275. try:
  276. dev_id = self.dev_id_
  277. device:Device = dev_map_find(dev_id)
  278. if not device:
  279. return
  280. now = get_utc_time_s()
  281. # 查找最新的落在检测区域的目标
  282. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  283. if rtd_unit:
  284. timestamp = rtd_unit["timestamp"]
  285. pose = rtd_unit["pose"]
  286. target_point = rtd_unit["target_point"]
  287. if self.event_attr_.enter_ts_ == -1:
  288. self.event_attr_.enter_ts_ = timestamp
  289. else:
  290. self.event_attr_.leave_ts_ = timestamp
  291. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  292. return
  293. # 归并时间内,不认为事件结束
  294. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  295. return
  296. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  297. stay_time =self.event_attr_.stay_time_
  298. # 时间小于触发时间阈值,忽略并重置
  299. if stay_time < self.threshold_time_ :
  300. self.event_attr_.reset()
  301. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  302. return
  303. # 构造事件
  304. # 入库
  305. info = {
  306. "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  307. "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  308. "stay_time": stay_time
  309. }
  310. event_uuid = str(uuid.uuid4())
  311. params = {
  312. "dev_id": dev_id,
  313. "uuid": event_uuid,
  314. "plan_uuid": self.plan_uuid_,
  315. "event_type": event_desc_map[self.event_type_],
  316. "info": json.dumps(info),
  317. "is_handle": 0,
  318. "is_deleted": 0
  319. }
  320. db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
  321. # 通知
  322. mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
  323. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  324. self.event_attr_.reset()
  325. except json.JSONDecodeError as e:
  326. tb_info = traceback.extract_tb(e.__traceback__)
  327. for frame in tb_info:
  328. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  329. except Exception as e:
  330. tb_info = traceback.extract_tb(e.__traceback__)
  331. for frame in tb_info:
  332. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  333. # 如厕频次统计
  334. def handle_toileting_frequency(self):
  335. try:
  336. dev_id = self.dev_id_
  337. device:Device = dev_map_find(dev_id)
  338. if not device:
  339. return
  340. now = get_utc_time_s()
  341. # 查找最新的落在检测区域的目标
  342. rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
  343. if rtd_unit:
  344. timestamp = rtd_unit["timestamp"]
  345. pose = rtd_unit["pose"]
  346. target_point = rtd_unit["target_point"]
  347. if self.event_attr_.enter_ts_ == -1:
  348. self.event_attr_.enter_ts_ = timestamp
  349. else:
  350. self.event_attr_.leave_ts_ = timestamp
  351. if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
  352. return
  353. # 归并时间内,不认为事件结束
  354. if now - self.event_attr_.leave_ts_ < self.merge_time_:
  355. return
  356. self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
  357. stay_time =self.event_attr_.stay_time_
  358. # 时间小于触发时间阈值,忽略并重置
  359. if stay_time < self.threshold_time_ :
  360. self.event_attr_.reset()
  361. LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
  362. return
  363. # 构造事件
  364. # 入库
  365. info = {
  366. "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  367. "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
  368. "stay_time": stay_time
  369. }
  370. event_uuid = str(uuid.uuid4())
  371. params = {
  372. "dev_id": dev_id,
  373. "uuid": event_uuid,
  374. "plan_uuid": self.plan_uuid_,
  375. "event_type": event_desc_map[self.event_type_],
  376. "info": json.dumps(info),
  377. "is_handle": 0,
  378. "is_deleted": 0
  379. }
  380. db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
  381. # 通知
  382. mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
  383. LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
  384. self.event_attr_.reset()
  385. except json.JSONDecodeError as e:
  386. tb_info = traceback.extract_tb(e.__traceback__)
  387. for frame in tb_info:
  388. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  389. except Exception as e:
  390. tb_info = traceback.extract_tb(e.__traceback__)
  391. for frame in tb_info:
  392. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")