alarm_plan_manager.py 17 KB


  1. from typing import List, Tuple, Optional
  2. import time
  3. from datetime import datetime, date
  4. import threading
  5. from threading import Thread, Lock, Event
  6. from enum import Enum
  7. import queue
  8. import json
  9. import ast
  10. import traceback
  11. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  12. from core.alarm_event import AlarmEvent
  13. from core.alarm_plan import AlarmPlan
  14. from core.time_plan import TimePlan
  15. from core.event_type import EventType
  16. from core.linkage_action import LinkageAction
  17. import core.alarm_plan_helper as helper
  18. import core.g_LAS as g_las
  19. from db.db_process import db_req_que
  20. from db.db_process import DBRequest_Async
  21. import db.db_sqls as sqls
  22. class AlarmPlanManager:
  23. def __init__(self,
  24. alarm_plan_map: dict = None,
  25. alarm_plan_cron_map: dict = None):
  26. self.lock_ = Lock()
  27. self.alarm_plan_map_ = alarm_plan_map or {} # <uuid, AlarmPlan>
  28. self.alarm_plan_cron_map_ = alarm_plan_cron_map or {} # <uuid, AlarmPlan>
  29. self.running = False
  30. self.thread = None
  31. def push_plan(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
  32. with self.lock_:
  33. self.alarm_plan_map_[plan_uuid] = alarm_plan
  34. def pop_plan(self, plan_uuid: str) -> Optional[AlarmPlan]:
  35. with self.lock_:
  36. return self.alarm_plan_map_.pop(plan_uuid, None)
  37. def find_plan(self, plan_uuid: str) -> Optional[AlarmPlan]:
  38. with self.lock_:
  39. return self.alarm_plan_map_.get(plan_uuid, None)
  40. def delete_plan(self, plan_uuid: str) -> bool:
  41. with self.lock_:
  42. if plan_uuid in self.alarm_plan_map_:
  43. del self.alarm_plan_map_[plan_uuid]
  44. return True
  45. return False
  46. def list_all_plan(self) -> list:
  47. with self.lock_:
  48. return list(self.alarm_plan_map_.values())
  49. def push_cron(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
  50. with self.lock_:
  51. self.alarm_plan_cron_map_[plan_uuid] = alarm_plan
  52. def pop_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
  53. with self.lock_:
  54. return self.alarm_plan_cron_map_.pop(plan_uuid, None)
  55. def find_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
  56. with self.lock_:
  57. return self.alarm_plan_cron_map_.get(plan_uuid, None)
  58. def delete_cron(self, plan_uuid: str) -> bool:
  59. with self.lock_:
  60. if plan_uuid in self.alarm_plan_cron_map_:
  61. del self.alarm_plan_cron_map_[plan_uuid]
  62. return True
  63. return False
  64. def list_all_cron(self) -> list:
  65. with self.lock_:
  66. return list(self.alarm_plan_cron_map_.values())
  67. def remove_one_alarm_plan(self, plan_uuid: str) -> bool:
  68. with self.lock_:
  69. removed = False
  70. if plan_uuid in self.alarm_plan_map_:
  71. plan_name = self.alarm_plan_map_[plan_uuid]
  72. del self.alarm_plan_map_[plan_uuid]
  73. removed = True
  74. LOGINFO(f"remove alarm_plan: {plan_uuid}, {plan_name}")
  75. if plan_uuid in self.alarm_plan_cron_map_:
  76. plan_name = self.alarm_plan_cron_map_[plan_uuid]
  77. del self.alarm_plan_cron_map_[plan_uuid]
  78. removed = True
  79. LOGINFO(f"remove alarm_plan: {plan_uuid}, {plan_name}")
  80. return removed
  81. # 启动调度器
  82. def start_scheduler(self, interval=5):
  83. if self.running:
  84. return
  85. self.running = True
  86. # plan 定时调度器
  87. self.thread = threading.Thread(
  88. target=self._scheduler,
  89. args=(interval,),
  90. daemon=True,
  91. name="APSchedulerThread")
  92. self.thread.start()
  93. # cron 定时调度线程
  94. self.cron_thread = threading.Thread(
  95. target=self._cron_scheduler,
  96. daemon=True,
  97. name="CronSchedulerThread")
  98. self.cron_thread.start()
  99. def stop_scheduler(self):
  100. self.running = False
  101. # 调度告警计划
  102. def _scheduler(self, interval):
  103. while self.running:
  104. plans = self.list_all_plan()
  105. plan: AlarmPlan = None
  106. for plan in plans:
  107. try:
  108. plan.update_status() # 更新状态
  109. if plan.status_ == 1: # 激活状态才执行
  110. plan.execute()
  111. except Exception as e:
  112. LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
  113. time.sleep(interval)
  114. # 调度定时任务
  115. def _cron_scheduler(self):
  116. import datetime
  117. last_run_map = {} # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
  118. last_reset_date = None # 上一次重置的日期
  119. while self.running:
  120. now = datetime.datetime.now()
  121. today = now.date()
  122. # 每天 00:00 重置执行标记
  123. if last_reset_date != today:
  124. last_run_map.clear()
  125. last_reset_date = today
  126. LOGINFO(f"[CronScheduler] reset last_run_map at {today}")
  127. plans = self.list_all_cron()
  128. plan: AlarmPlan = None
  129. for plan in plans:
  130. try:
  131. cron = plan.cron_
  132. if not cron or not plan.enable_:
  133. continue
  134. hour = cron.get("hour", None)
  135. minute = cron.get("minute", 0)
  136. run_time = datetime.datetime.combine(today, datetime.time(hour or 0, minute or 0))
  137. # 判断是否到达执行点
  138. if now >= run_time:
  139. # 检查今天是否已经执行过
  140. if last_run_map.get(plan.plan_uuid_) == today:
  141. continue
  142. # 执行任务
  143. plan.update_status() # 更新状态
  144. if plan.status_ == 1: # 激活状态才执行
  145. plan.execute()
  146. last_run_map[plan.plan_uuid_] = today
  147. LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_} at {now}")
  148. except Exception as e:
  149. LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
  150. time.sleep(5) # 每 30 秒检查一次
  151. # --------------------------------------
  152. # 创建计划相关
  153. # --------------------------------------
  154. # 清理过期事件任务
  155. def create_clean_expire_events_task(self, expire_range:int = 90):
  156. linkage_action = LinkageAction()
  157. cron = {
  158. "hour": 1,
  159. "minute": 0
  160. }
  161. time_plan = TimePlan(
  162. time_range = [{"start_time": "00:00","end_time": "23:59"}],
  163. start_date = '2025-09-01',
  164. stop_date = '2099-12-31',
  165. weekdays = [1,2,3,4,5,6,7],
  166. month_days = []
  167. )
  168. plan_uuid = 'clean_expire_events_task'
  169. plan_name = '清理过期事件'
  170. event_type = EventType.CLEAN_EXPIRE_EVENTS.value
  171. alarm_plan = AlarmPlan(
  172. plan_uuid = plan_uuid,
  173. name = plan_name,
  174. dev_id = 'LAS',
  175. dev_name = '告警联动服务',
  176. enable = 1,
  177. time_plan = time_plan,
  178. rect = [],
  179. event_type = event_type,
  180. threshold_time = 300,
  181. merge_time = 30,
  182. param = {},
  183. cron = cron,
  184. linkage_action=linkage_action,
  185. tenant_id = 0
  186. )
  187. alarm_plan.event_attr_.expire_range_ = expire_range
  188. # 塞入告警计划
  189. self.push_cron(plan_uuid, alarm_plan)
  190. LOGINFO(f"create task: {plan_uuid}, {plan_name}")
  191. def query_one_alarm_plan(self, plan_uuid: str):
  192. # 查询单个告警计划
  193. params = {
  194. "plan_uuid": plan_uuid
  195. }
  196. db_req_que.put(DBRequest_Async(sql=sqls.sql_query_one_alarm_plan, params=params,
  197. callback=self.cb_query_one_alarm_plan, userdata=plan_uuid))
  198. # 更新单条告警计划
  199. def update_one_alarm_plan(self, alarm_plan:AlarmPlan):
  200. plan_uuid = alarm_plan.plan_uuid_
  201. plan_name = alarm_plan.name_
  202. if alarm_plan.enable_:
  203. # 塞入告警计划
  204. if not alarm_plan.cron_:
  205. self.push_plan(plan_uuid, alarm_plan)
  206. else:
  207. self.push_cron(plan_uuid, alarm_plan)
  208. LOGINFO(f"update alarm_plan: {plan_uuid}, {plan_name}")
  209. else:
  210. LOGINFO(f"disable alarm_plan: {plan_uuid}, {plan_name}")
  211. self.remove_one_alarm_plan(plan_uuid)
  212. # 查询单个告警计划回调
  213. def cb_query_one_alarm_plan(self, result, userdata):
  214. try:
  215. plan_uuid = userdata
  216. if not result:
  217. LOGDBG("cb_query_one_alarm_plan, invalid result")
  218. self.remove_one_alarm_plan(plan_uuid)
  219. for row in result:
  220. plan_uuid: str = row["plan_uuid"]
  221. plan_name: str = row["plan_name"]
  222. dev_id: str = row["dev_id"]
  223. dev_name: str = row["dev_name"]
  224. enable: int = bool(row["enable"])
  225. region = row["region"]
  226. rect = helper.region_to_rect(region)
  227. if not helper.check_plan_rect_valid(event_type, rect):
  228. LOGWARN(f"skip plan {plan_uuid}: invalid rect={rect} for event_type={event_type}")
  229. continue
  230. threshold_time: int = row["threshold_time"]
  231. merge_time: int = row["merge_time"]
  232. param: dict = json.loads(row["param"])
  233. event_val: int = row["event_val"]
  234. event_type = event_val
  235. event_str: str = row["event_str"]
  236. event_desc: str = row["event_desc"]
  237. tenant_id: int = row.get("tenant_id") or 0
  238. start_date = row["start_date"]
  239. stop_date = row["stop_date"]
  240. time_range = json.loads(row["time_range"])
  241. month_days = None
  242. if row["month_days"]:
  243. month_days = ast.literal_eval(row["month_days"])
  244. weekdays = None
  245. if row["weekdays"]:
  246. weekdays = ast.literal_eval(row["weekdays"])
  247. linkage_push_wechat_service: int = row.get("linkage_push_wechat_service") or 0
  248. linkage_action = LinkageAction(wechat_service=linkage_push_wechat_service)
  249. cron = None
  250. if ((event_type == EventType.TOILETING_FREQUENCY.value) or
  251. (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
  252. (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
  253. (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
  254. (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
  255. cron = {
  256. "hour": 7,
  257. "minute": 0
  258. }
  259. time_plan = TimePlan(
  260. time_range = time_range,
  261. start_date = start_date,
  262. stop_date = stop_date,
  263. weekdays = weekdays,
  264. month_days = month_days
  265. )
  266. alarm_plan = AlarmPlan(
  267. plan_uuid = plan_uuid,
  268. name = plan_name,
  269. dev_id = dev_id,
  270. dev_name = dev_name,
  271. enable = enable,
  272. time_plan = time_plan,
  273. rect = rect,
  274. event_type = event_type,
  275. threshold_time = threshold_time,
  276. merge_time = merge_time,
  277. param = param,
  278. cron = cron,
  279. linkage_action=linkage_action,
  280. tenant_id = tenant_id
  281. )
  282. if alarm_plan.event_attr_ is None:
  283. LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
  284. continue
  285. # 更新告警计划
  286. self.update_one_alarm_plan(alarm_plan)
  287. except json.JSONDecodeError as e:
  288. tb_info = traceback.extract_tb(e.__traceback__)
  289. for frame in tb_info:
  290. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  291. except Exception as e:
  292. tb_info = traceback.extract_tb(e.__traceback__)
  293. for frame in tb_info:
  294. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  295. return
  296. def init_alarm_plan_mgr():
  297. g_las.g_alarm_plan_mgr = AlarmPlanManager()
  298. def start_alarm_plan_mgr():
  299. g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
  300. LOGINFO(f"start g_alarm_plan_mgr succeed")
  301. # --------------------------------------
  302. # 数据库响应的回调函数
  303. # --------------------------------------
  304. # 将region字典转换为xy平面的rect [left, top, width, height]
  305. def region_to_rect(region: dict) -> list:
  306. x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
  307. y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
  308. left = min(x_start, x_stop)
  309. right = max(x_start, x_stop)
  310. top = min(y_start, y_stop)
  311. bottom = max(y_start, y_stop)
  312. width = right - left
  313. height = bottom - top
  314. return [left, top, width, height]
  315. # 回调函数,处理查询结果:查询所有的告警计划信息
  316. def cb_handle_query_all_alarm_plan_info(result, userdata):
  317. try:
  318. if not result:
  319. LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
  320. for row in result:
  321. plan_uuid: str = row["plan_uuid"]
  322. plan_name: str = row["plan_name"]
  323. dev_id: str = row["dev_id"]
  324. dev_name: str = row["dev_name"]
  325. enable: int = bool(row["enable"])
  326. region = row["region"]
  327. rect = helper.region_to_rect(region)
  328. threshold_time: int = row["threshold_time"]
  329. merge_time: int = row["merge_time"]
  330. param: dict = json.loads(row["param"]) if row.get("param") else {}
  331. event_val: int = row["event_val"]
  332. event_type = event_val
  333. event_str: str = row["event_str"]
  334. event_desc: str = row["event_desc"]
  335. tenant_id: int = row.get("tenant_id", 0)
  336. start_date = row["start_date"]
  337. stop_date = row["stop_date"]
  338. time_range = json.loads(row["time_range"])
  339. month_days = None
  340. if row["month_days"]:
  341. month_days = ast.literal_eval(row["month_days"])
  342. weekdays = None
  343. if row["weekdays"]:
  344. weekdays = ast.literal_eval(row["weekdays"])
  345. linkage_push_wechat_service: int = row.get("linkage_push_wechat_service") or 0
  346. linkage_action = LinkageAction(wechat_service=linkage_push_wechat_service)
  347. cron = None
  348. if ((event_type == EventType.TOILETING_FREQUENCY.value) or
  349. (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
  350. (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
  351. (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
  352. (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
  353. cron = {
  354. "hour": 7,
  355. "minute": 0
  356. }
  357. time_plan = TimePlan(
  358. time_range = time_range,
  359. start_date = start_date,
  360. stop_date = stop_date,
  361. weekdays = weekdays,
  362. month_days = month_days
  363. )
  364. alarm_plan = AlarmPlan(
  365. plan_uuid = plan_uuid,
  366. name = plan_name,
  367. dev_id = dev_id,
  368. dev_name = dev_name,
  369. enable = enable,
  370. time_plan = time_plan,
  371. rect = rect,
  372. event_type = event_type,
  373. threshold_time = threshold_time,
  374. merge_time = merge_time,
  375. param = param,
  376. cron = cron,
  377. linkage_action=linkage_action,
  378. tenant_id = tenant_id
  379. )
  380. if alarm_plan.event_attr_ is None:
  381. LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
  382. continue
  383. # 塞入告警计划
  384. if not alarm_plan.cron_:
  385. g_las.g_alarm_plan_mgr.push_plan(plan_uuid, alarm_plan)
  386. else:
  387. g_las.g_alarm_plan_mgr.push_cron(plan_uuid, alarm_plan)
  388. LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
  389. except json.JSONDecodeError as e:
  390. tb_info = traceback.extract_tb(e.__traceback__)
  391. for frame in tb_info:
  392. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  393. except Exception as e:
  394. tb_info = traceback.extract_tb(e.__traceback__)
  395. for frame in tb_info:
  396. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")