from typing import List, Tuple, Optional import time from datetime import datetime, date import threading from threading import Thread, Lock, Event from enum import Enum import queue import json import ast import traceback from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from core.alarm_event import AlarmEvent from core.alarm_plan import AlarmPlan from core.time_plan import TimePlan from core.event_type import EventType import core.g_LAS as g_las from db.db_process import db_req_que from db.db_process import DBRequest import db.db_sqls as sqls class AlarmPlanManager: def __init__(self, alarm_plan_map: dict = None, alarm_plan_cron_map: dict = None): self.lock_ = Lock() self.alarm_plan_map_ = alarm_plan_map or {} # self.alarm_plan_cron_map_ = alarm_plan_cron_map or {} # self.running = False self.thread = None def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None: with self.lock_: self.alarm_plan_map_[plan_uuid] = alarm_plan def pop(self, plan_uuid: str) -> Optional[AlarmPlan]: with self.lock_: return self.alarm_plan_map_.pop(plan_uuid, None) def find(self, plan_uuid: str) -> Optional[AlarmPlan]: with self.lock_: return self.alarm_plan_map_.get(plan_uuid, None) def delete(self, plan_uuid: str) -> bool: with self.lock_: if plan_uuid in self.alarm_plan_map_: del self.alarm_plan_map_[plan_uuid] return True return False def list_all_plan(self) -> list: with self.lock_: return list(self.alarm_plan_map_.values()) def push_cron(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None: with self.lock_: self.alarm_plan_cron_map_[plan_uuid] = alarm_plan def pop_cron(self, plan_uuid: str) -> Optional[AlarmPlan]: with self.lock_: return self.alarm_plan_cron_map_.pop(plan_uuid, None) def find_cron(self, plan_uuid: str) -> Optional[AlarmPlan]: with self.lock_: return self.alarm_plan_cron_map_.get(plan_uuid, None) def delete_cron(self, plan_uuid: str) -> bool: with self.lock_: if plan_uuid in self.alarm_plan_cron_map_: del self.alarm_plan_cron_map_[plan_uuid] return True return False def list_all_cron(self) -> list: with self.lock_: return list(self.alarm_plan_cron_map_.values()) def remove_one_alarm_plan(self, plan_uuid: str) -> bool: with self.lock_: removed = False if plan_uuid in self.alarm_plan_map_: plan_name = self.alarm_plan_map_[plan_uuid] del self.alarm_plan_map_[plan_uuid] removed = True LOGINFO(f"create new alarm_plan: {plan_uuid}, {plan_name}") if plan_uuid in self.alarm_plan_cron_map_: plan_name = self.alarm_plan_map_[plan_uuid] del self.alarm_plan_cron_map_[plan_uuid] removed = True LOGINFO(f"create new alarm_plan: {plan_uuid}, {plan_name}") return removed def start_scheduler(self, interval=5): if self.running: return self.running = True self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True) self.thread.start() # 启动 cron 定时调度线程 self.cron_thread = threading.Thread(target=self._cron_scheduler, daemon=True) self.cron_thread.start() def stop_scheduler(self): self.running = False # 调度告警计划 def _scheduler(self, interval): while self.running: plans = self.list_all_plan() plan: AlarmPlan = None for plan in plans: try: plan.update_status() # 更新状态 if plan.status_ == 1: # 激活状态才执行 plan.execute() except Exception as e: LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}") time.sleep(interval) # 调度定时任务 def _cron_scheduler(self): import datetime last_run_map = {} # {plan_uuid: date} 记录任务上次执行日期,避免重复跑 while self.running: now = datetime.datetime.now() today = now.date() # 每天 00:00 重置执行标记 # todo plans = self.list_all_cron() plan: AlarmPlan = None for plan in plans: try: cron = plan.cron_ if not cron or not plan.enable_: continue hour = cron.get("hour", None) minute = cron.get("minute", 0) run_time = datetime.datetime.combine(today, datetime.time(hour or 0, minute or 0)) # 判断是否到达执行点 if now >= run_time: # 检查今天是否已经执行过 if last_run_map.get(plan.plan_uuid_) == today: continue # 执行任务 plan.update_status() # 更新状态 if plan.status_ == 1: # 激活状态才执行 plan.execute() last_run_map[plan.plan_uuid_] = today LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_} at {now}") except Exception as e: LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}") time.sleep(5) # 每 30 秒检查一次 def query_one_alarm_plan(self, plan_uuid: str): # 查询单个告警计划 params = { "plan_uuid": plan_uuid } db_req_que.put(DBRequest(sql=sqls.sql_query_one_alarm_plan, params=params, callback=self.cb_query_one_alarm_plan)) # 查询单条告警计划 def cb_query_one_alarm_plan(self, result, userdata): try: if not result: LOGDBG("cb_query_one_alarm_plan, invalid result") for row in result: plan_uuid: str = row["plan_uuid"] plan_name: str = row["plan_name"] dev_id: str = row["dev_id"] dev_name: str = row["dev_name"] enable: int = bool(row["enable"]) rect: list = json.loads(row["region"]) if row.get("region") else [] threshold_time: int = row["threshold_time"] merge_time: int = row["merge_time"] param: dict = json.loads(row["param"]) event_val: int = row["event_val"] event_type = event_val event_str: str = row["event_str"] event_desc: str = row["event_desc"] start_date = row["start_date"] stop_date = row["stop_date"] time_range = json.loads(row["time_range"]) month_days = None if row["month_days"]: month_days = ast.literal_eval(row["month_days"]) weekdays = None if row["weekdays"]: weekdays = ast.literal_eval(row["weekdays"]) cron = None if ((event_type == EventType.TOILETING_FREQUENCY.value) or (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)): cron = { "hour": 6, "minute": 0 } time_plan = TimePlan( time_range = time_range, start_date = start_date, stop_date = stop_date, weekdays = weekdays, month_days = month_days ) alarm_plan = AlarmPlan( plan_uuid = plan_uuid, name = plan_name, dev_id = dev_id, dev_name = dev_name, enable = enable, time_plan = time_plan, rect = rect, event_type = event_type, threshold_time = threshold_time, merge_time = merge_time, param = param, cron = cron ) if alarm_plan.event_attr_ is None: LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}") continue # 塞入告警计划 if not alarm_plan.cron_: self.push(plan_uuid, alarm_plan) else: self.push_cron(plan_uuid, alarm_plan) LOGINFO(f"create new alarm_plan: {plan_uuid}, {plan_name}") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return def init_alarm_plan_mgr(): g_las.g_alarm_plan_mgr = AlarmPlanManager() def start_alarm_plan_mgr(): g_las.g_alarm_plan_mgr.start_scheduler(interval=1) # -------------------------------------- # 数据库响应的回调函数 # -------------------------------------- # 将region字典转换为xy平面的rect [left, top, width, height] def region_to_rect(region: dict) -> list: x_start, x_stop = region["x_cm_start"], region["x_cm_stop"] y_start, y_stop = region["y_cm_start"], region["y_cm_stop"] left = min(x_start, x_stop) right = max(x_start, x_stop) top = min(y_start, y_stop) bottom = max(y_start, y_stop) width = right - left height = bottom - top return [left, top, width, height] # 回调函数,处理查询结果:查询所有的告警计划信息 def cb_handle_query_all_alarm_plan_info(result, userdata): try: if result: for row in result: plan_uuid: str = row["plan_uuid"] plan_name: str = row["plan_name"] dev_id: str = row["dev_id"] dev_name: str = row["dev_name"] enable: int = bool(row["enable"]) # region = row["region"] # rect = json.loads(region_to_rect(region)) rect: list = json.loads(row["region"]) if row.get("region") else [] threshold_time: int = row["threshold_time"] merge_time: int = row["merge_time"] param: dict = json.loads(row["param"]) event_val: int = row["event_val"] event_type = event_val event_str: str = row["event_str"] event_desc: str = row["event_desc"] start_date = row["start_date"] stop_date = row["stop_date"] time_range = json.loads(row["time_range"]) month_days = None if row["month_days"]: month_days = ast.literal_eval(row["month_days"]) weekdays = None if row["weekdays"]: weekdays = ast.literal_eval(row["weekdays"]) cron = None if ((event_type == EventType.TOILETING_FREQUENCY.value) or (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)): cron = { "hour": 6, "minute": 0 } time_plan = TimePlan( time_range = time_range, start_date = start_date, stop_date = stop_date, weekdays = weekdays, month_days = month_days ) alarm_plan = AlarmPlan( plan_uuid = plan_uuid, name = plan_name, dev_id = dev_id, dev_name = dev_name, enable = enable, time_plan = time_plan, rect = rect, event_type = event_type, threshold_time = threshold_time, merge_time = merge_time, param = param, cron = cron ) if alarm_plan.event_attr_ is None: LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}") continue # 塞入告警计划 if not alarm_plan.cron_: g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan) else: g_las.g_alarm_plan_mgr.push_cron(plan_uuid, alarm_plan) LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed") else: LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")