alarm_plan_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. from typing import List, Tuple, Optional
  2. import time
  3. from datetime import datetime, date
  4. import threading
  5. from threading import Thread, Lock, Event
  6. from enum import Enum
  7. import queue
  8. import json
  9. import ast
  10. import traceback
  11. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  12. from core.alarm_event import AlarmEvent
  13. from core.alarm_plan import AlarmPlan
  14. from core.time_plan import TimePlan
  15. from core.event_type import EventType
  16. import core.g_LAS as g_las
  17. class AlarmPlanManager:
  18. def __init__(self,
  19. alarm_plan_map: dict = None,
  20. alarm_plan_cron_map: dict = None):
  21. self.lock_ = Lock()
  22. self.alarm_plan_map_ = alarm_plan_map or {} # <uuid, AlarmPlan>
  23. self.alarm_plan_cron_map_ = alarm_plan_cron_map or {} # <uuid, AlarmPlan>
  24. self.running = False
  25. self.thread = None
  26. def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
  27. with self.lock_:
  28. self.alarm_plan_map_[plan_uuid] = alarm_plan
  29. def pop(self, plan_uuid: str) -> Optional[AlarmPlan]:
  30. with self.lock_:
  31. return self.alarm_plan_map_.pop(plan_uuid, None)
  32. def find(self, plan_uuid: str) -> Optional[AlarmPlan]:
  33. with self.lock_:
  34. return self.alarm_plan_map_.get(plan_uuid, None)
  35. def delete(self, plan_uuid: str) -> bool:
  36. with self.lock_:
  37. if plan_uuid in self.alarm_plan_map_:
  38. del self.alarm_plan_map_[plan_uuid]
  39. return True
  40. return False
  41. def list_all_plan(self) -> list:
  42. with self.lock_:
  43. return list(self.alarm_plan_map_.values())
  44. def push_cron(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
  45. with self.lock_:
  46. self.alarm_plan_cron_map_[plan_uuid] = alarm_plan
  47. def pop_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
  48. with self.lock_:
  49. return self.alarm_plan_cron_map_.pop(plan_uuid, None)
  50. def find_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
  51. with self.lock_:
  52. return self.alarm_plan_cron_map_.get(plan_uuid, None)
  53. def delete_cron(self, plan_uuid: str) -> bool:
  54. with self.lock_:
  55. if plan_uuid in self.alarm_plan_cron_map_:
  56. del self.alarm_plan_cron_map_[plan_uuid]
  57. return True
  58. return False
  59. def list_all_cron(self) -> list:
  60. with self.lock_:
  61. return list(self.alarm_plan_cron_map_.values())
  62. def start_scheduler(self, interval=5):
  63. if self.running:
  64. return
  65. self.running = True
  66. self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
  67. self.thread.start()
  68. # 启动 cron 定时调度线程
  69. self.cron_thread = threading.Thread(target=self._cron_scheduler, daemon=True)
  70. self.cron_thread.start()
  71. def stop_scheduler(self):
  72. self.running = False
  73. # 调度告警计划
  74. def _scheduler(self, interval):
  75. while self.running:
  76. plans = self.list_all_plan()
  77. plan: AlarmPlan = None
  78. for plan in plans:
  79. try:
  80. plan.update_status() # 更新状态
  81. if plan.status_ == 1: # 激活状态才执行
  82. plan.execute()
  83. except Exception as e:
  84. LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
  85. time.sleep(interval)
  86. # 调度定时任务
  87. def _cron_scheduler(self):
  88. import datetime
  89. last_run_map = {} # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
  90. while self.running:
  91. now = datetime.datetime.now()
  92. today = now.date()
  93. # 每天 00:00 重置执行标记
  94. # todo
  95. plans = self.list_all_cron()
  96. plan: AlarmPlan = None
  97. for plan in plans:
  98. try:
  99. cron = plan.cron_
  100. if not cron or not plan.enable_:
  101. continue
  102. hour = cron.get("hour", None)
  103. minute = cron.get("minute", 0)
  104. run_time = datetime.datetime.combine(today, datetime.time(hour or 0, minute or 0))
  105. # 判断是否到达执行点
  106. if now >= run_time:
  107. # 检查今天是否已经执行过
  108. if last_run_map.get(plan.plan_uuid_) == today:
  109. continue
  110. # 执行任务
  111. plan.update_status() # 更新状态
  112. if plan.status_ == 1: # 激活状态才执行
  113. plan.execute()
  114. last_run_map[plan.plan_uuid_] = today
  115. LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_} at {now}")
  116. except Exception as e:
  117. LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
  118. time.sleep(5) # 每 30 秒检查一次
  119. # 分发器
  120. class EventDispatcher:
  121. def __init__(self):
  122. self.queues = {} # event_type -> Queue
  123. self.threads = {}
  124. self.running = False
  125. def start(self, handlers: dict):
  126. """handlers: {event_type: handler_func}"""
  127. self.running = True
  128. for event_type, handler in handlers.items():
  129. q = queue.Queue()
  130. self.queues[event_type] = q
  131. t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
  132. self.threads[event_type] = t
  133. t.start()
  134. def stop(self):
  135. self.running = False
  136. def dispatch(self, event_type: int, plan):
  137. if event_type in self.queues:
  138. self.queues[event_type].put(plan)
  139. else:
  140. LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
  141. def worker(self, event_type, q: queue.Queue, handler):
  142. while self.running:
  143. try:
  144. plan = q.get(timeout=1)
  145. handler(plan)
  146. except queue.Empty:
  147. continue
  148. except Exception as e:
  149. LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
  150. def init_alarm_plan_mgr():
  151. g_las.g_alarm_plan_mgr = AlarmPlanManager()
  152. g_las.g_event_dispatcher = EventDispatcher()
  153. def start_alarm_plan_mgr():
  154. g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
  155. def start_event_dispatcher():
  156. # 注册事件处理函数
  157. handles = {
  158. EventType.STAY_DETECTION.value : AlarmPlan.handle_stay_detection,
  159. EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
  160. EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
  161. EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
  162. EventType.NIGHT_TOILETING_FREQUENCY.value : AlarmPlan.handle_night_toileting_frequency,
  163. EventType.TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_toileting_frequency_abnormal,
  164. EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal,
  165. EventType.BATHROOM_STAY_FREQUENCY.value : AlarmPlan.handle_bathroom_stay_frequency,
  166. EventType.TARGET_ABSENCE.value : AlarmPlan.handle_target_absence,
  167. }
  168. g_las.g_event_dispatcher.start(handles)
  169. # 将region字典转换为xy平面的rect [left, top, width, height]
  170. def region_to_rect(region: dict) -> list:
  171. x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
  172. y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
  173. left = min(x_start, x_stop)
  174. right = max(x_start, x_stop)
  175. top = min(y_start, y_stop)
  176. bottom = max(y_start, y_stop)
  177. width = right - left
  178. height = bottom - top
  179. return [left, top, width, height]
  180. # 回调函数,处理查询结果:查询所有的告警计划信息
  181. def cb_handle_query_all_alarm_plan_info(result):
  182. try:
  183. if result:
  184. for row in result:
  185. plan_uuid: str = row["plan_uuid"]
  186. plan_name: str = row["plan_name"]
  187. dev_id: str = row["dev_id"]
  188. dev_name: str = row["dev_name"]
  189. enable: int = bool(row["enable"])
  190. # region = row["region"]
  191. # rect = json.loads(region_to_rect(region))
  192. rect: list = json.loads(row["region"]) if row.get("region") else []
  193. threshold_time: int = row["threshold_time"]
  194. merge_time: int = row["merge_time"]
  195. param: dict = json.loads(row["param"])
  196. event_val: int = row["event_val"]
  197. event_type = event_val
  198. event_str: str = row["event_str"]
  199. event_desc: str = row["event_desc"]
  200. start_date = row["start_date"]
  201. stop_date = row["stop_date"]
  202. time_range = json.loads(row["time_range"])
  203. month_days = None
  204. if row["month_days"]:
  205. month_days = ast.literal_eval(row["month_days"])
  206. weekdays = None
  207. if row["weekdays"]:
  208. weekdays = ast.literal_eval(row["weekdays"])
  209. cron = None
  210. if ((event_type == EventType.TOILETING_FREQUENCY.value) or
  211. (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
  212. (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
  213. (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
  214. (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
  215. cron = {
  216. "hour": 1,
  217. "minute": 0
  218. }
  219. time_plan = TimePlan(
  220. time_range = time_range,
  221. start_date = start_date,
  222. stop_date = stop_date,
  223. weekdays = weekdays,
  224. month_days = month_days
  225. )
  226. alarm_plan = AlarmPlan(
  227. plan_uuid = plan_uuid,
  228. name = plan_name,
  229. dev_id = dev_id,
  230. enable = enable,
  231. time_plan = time_plan,
  232. rect = rect,
  233. event_type = event_type,
  234. threshold_time = threshold_time,
  235. merge_time = merge_time,
  236. param = param,
  237. cron = cron
  238. )
  239. if alarm_plan.event_attr_ is None:
  240. LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
  241. continue
  242. # 塞入告警计划
  243. if not alarm_plan.cron_:
  244. g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
  245. else:
  246. g_las.g_alarm_plan_mgr.push_cron(plan_uuid, alarm_plan)
  247. LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
  248. else:
  249. LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
  250. except json.JSONDecodeError as e:
  251. tb_info = traceback.extract_tb(e.__traceback__)
  252. for frame in tb_info:
  253. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  254. except Exception as e:
  255. tb_info = traceback.extract_tb(e.__traceback__)
  256. for frame in tb_info:
  257. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")