alarm_plan_manager.py 7.6 KB


  1. from typing import List, Tuple, Optional
  2. import time
  3. from datetime import datetime, date
  4. import threading
  5. from threading import Thread, Lock, Event
  6. from enum import Enum
  7. import queue
  8. import json
  9. import ast
  10. import traceback
  11. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  12. from core.alarm_event import AlarmEvent
  13. from core.alarm_plan import AlarmPlan
  14. from core.time_plan import TimePlan
  15. from core.event_type import EventType
  16. import core.g_LAS as g_las
  17. class AlarmPlanManager:
  18. def __init__(self, alarm_plan_map: dict = None):
  19. self.lock_ = Lock()
  20. self.alarm_plan_map_ = alarm_plan_map or {} # <uuid, AlarmPlan>
  21. self.running = False
  22. self.thread = None
  23. def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
  24. """插入/更新一个 AlarmPlan"""
  25. with self.lock_:
  26. self.alarm_plan_map_[plan_uuid] = alarm_plan
  27. def pop(self, plan_uuid: str) -> Optional[AlarmPlan]:
  28. """弹出并返回 AlarmPlan,如果不存在返回 None"""
  29. with self.lock_:
  30. return self.alarm_plan_map_.pop(plan_uuid, None)
  31. def find(self, plan_uuid: str) -> Optional[AlarmPlan]:
  32. """查找 AlarmPlan,如果不存在返回 None"""
  33. with self.lock_:
  34. return self.alarm_plan_map_.get(plan_uuid, None)
  35. def delete(self, plan_uuid: str) -> bool:
  36. """删除 AlarmPlan,成功返回 True,不存在返回 False"""
  37. with self.lock_:
  38. if plan_uuid in self.alarm_plan_map_:
  39. del self.alarm_plan_map_[plan_uuid]
  40. return True
  41. return False
  42. def list_all(self) -> list:
  43. """返回所有 AlarmPlan(浅拷贝列表)"""
  44. with self.lock_:
  45. return list(self.alarm_plan_map_.values())
  46. def start_scheduler(self, interval=5):
  47. if self.running:
  48. return
  49. self.running = True
  50. self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
  51. self.thread.start()
  52. def stop_scheduler(self):
  53. self.running = False
  54. def _scheduler(self, interval):
  55. while self.running:
  56. plans = self.list_all()
  57. plan: AlarmPlan = None
  58. for plan in plans:
  59. try:
  60. plan.update_status() # 更新状态
  61. if plan.status_ == 1: # 激活状态才执行
  62. plan.execute()
  63. except Exception as e:
  64. LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
  65. time.sleep(interval)
  66. class EventDispatcher:
  67. def __init__(self):
  68. self.queues = {} # event_type -> Queue
  69. self.threads = {}
  70. self.running = False
  71. def start(self, handlers: dict):
  72. """handlers: {event_type: handler_func}"""
  73. self.running = True
  74. for event_type, handler in handlers.items():
  75. q = queue.Queue()
  76. self.queues[event_type] = q
  77. t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
  78. self.threads[event_type] = t
  79. t.start()
  80. def stop(self):
  81. self.running = False
  82. def dispatch(self, event_type: int, plan):
  83. if event_type in self.queues:
  84. self.queues[event_type].put(plan)
  85. else:
  86. LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
  87. def worker(self, event_type, q: queue.Queue, handler):
  88. while self.running:
  89. try:
  90. plan = q.get(timeout=1)
  91. handler(plan)
  92. except queue.Empty:
  93. continue
  94. except Exception as e:
  95. LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
  96. def init_alarm_plan_mgr():
  97. g_las.g_alarm_plan_mgr = AlarmPlanManager()
  98. g_las.g_event_dispatcher = EventDispatcher()
  99. def start_event_dispatcher():
  100. handles = {
  101. EventType.STAY_DETECTION.value : AlarmPlan.handle_stay_detection,
  102. EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
  103. EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
  104. EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
  105. }
  106. g_las.g_event_dispatcher.start(handles)
  107. def start_alarm_plan_mgr():
  108. g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
  109. # 将region字典转换为xy平面的rect [left, top, width, height]
  110. def region_to_rect(region: dict) -> list:
  111. x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
  112. y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
  113. left = min(x_start, x_stop)
  114. right = max(x_start, x_stop)
  115. top = min(y_start, y_stop)
  116. bottom = max(y_start, y_stop)
  117. width = right - left
  118. height = bottom - top
  119. return [left, top, width, height]
  120. # 回调函数,处理查询结果:查询所有的告警计划信息
  121. def cb_handle_query_all_alarm_plan_info(result):
  122. try:
  123. if result:
  124. for row in result:
  125. plan_uuid: str = row["plan_uuid"]
  126. plan_name: str = row["plan_name"]
  127. dev_id: str = row["dev_id"]
  128. enable: int = bool(row["enable"])
  129. # region = row["region"]
  130. # rect = json.loads(region_to_rect(region))
  131. rect: list = json.loads(row["region"])
  132. threshold_time: int = row["threshold_time"]
  133. merge_time: int = row["merge_time"]
  134. event_val: int = row["event_val"]
  135. event_type = event_val
  136. event_str: str = row["event_str"]
  137. event_desc: str = row["event_desc"]
  138. start_date = row["start_date"]
  139. stop_date = row["stop_date"]
  140. time_range = json.loads(row["time_range"])
  141. month_days = None
  142. if row["month_days"]:
  143. month_days = ast.literal_eval(row["mozhenth_days"])
  144. weekdays = None
  145. if row["weekdays"]:
  146. weekdays = ast.literal_eval(row["weekdays"])
  147. time_plan = TimePlan(
  148. time_range = time_range,
  149. start_date = start_date,
  150. stop_date = stop_date,
  151. weekdays = weekdays,
  152. month_days = month_days
  153. )
  154. alarm_plan = AlarmPlan(
  155. plan_uuid = plan_uuid,
  156. name = plan_name,
  157. dev_id = dev_id,
  158. enable = enable,
  159. time_plan = time_plan,
  160. rect = rect,
  161. event_type = event_type,
  162. threshold_time = threshold_time,
  163. merge_time = merge_time
  164. )
  165. if alarm_plan.event_attr_ is None:
  166. LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
  167. continue
  168. # 更新设备信息
  169. g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
  170. LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
  171. else:
  172. LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
  173. except json.JSONDecodeError as e:
  174. tb_info = traceback.extract_tb(e.__traceback__)
  175. for frame in tb_info:
  176. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  177. except Exception as e:
  178. tb_info = traceback.extract_tb(e.__traceback__)
  179. for frame in tb_info:
  180. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")