123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- from typing import List, Tuple, Optional
- import time
- from datetime import datetime, date
- import threading
- from threading import Thread, Lock, Event
- from enum import Enum
- import queue
- import json
- import ast
- import traceback
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from core.alarm_event import AlarmEvent
- from core.alarm_plan import AlarmPlan
- from core.time_plan import TimePlan
- from core.event_type import EventType
- from core.linkage_action import LinkageAction
- import core.alarm_plan_helper as helper
- import core.g_LAS as g_las
- from db.db_process import db_req_que
- from db.db_process import DBRequest_Async
- import db.db_sqls as sqls
- class AlarmPlanManager:
- def __init__(self,
- alarm_plan_map: dict = None,
- alarm_plan_cron_map: dict = None):
- self.lock_ = Lock()
- self.alarm_plan_map_ = alarm_plan_map or {} # <uuid, AlarmPlan>
- self.alarm_plan_cron_map_ = alarm_plan_cron_map or {} # <uuid, AlarmPlan>
- self.running = False
- self.thread = None
- def push_plan(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
- with self.lock_:
- self.alarm_plan_map_[plan_uuid] = alarm_plan
- def pop_plan(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_map_.pop(plan_uuid, None)
- def find_plan(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_map_.get(plan_uuid, None)
- def delete_plan(self, plan_uuid: str) -> bool:
- with self.lock_:
- if plan_uuid in self.alarm_plan_map_:
- del self.alarm_plan_map_[plan_uuid]
- return True
- return False
- def list_all_plan(self) -> list:
- with self.lock_:
- return list(self.alarm_plan_map_.values())
- def push_cron(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
- with self.lock_:
- self.alarm_plan_cron_map_[plan_uuid] = alarm_plan
- def pop_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_cron_map_.pop(plan_uuid, None)
- def find_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_cron_map_.get(plan_uuid, None)
- def delete_cron(self, plan_uuid: str) -> bool:
- with self.lock_:
- if plan_uuid in self.alarm_plan_cron_map_:
- del self.alarm_plan_cron_map_[plan_uuid]
- return True
- return False
- def list_all_cron(self) -> list:
- with self.lock_:
- return list(self.alarm_plan_cron_map_.values())
- def remove_one_alarm_plan(self, plan_uuid: str) -> bool:
- with self.lock_:
- removed = False
- if plan_uuid in self.alarm_plan_map_:
- plan_name = self.alarm_plan_map_[plan_uuid]
- del self.alarm_plan_map_[plan_uuid]
- removed = True
- LOGINFO(f"remove alarm_plan: {plan_uuid}, {plan_name}")
- if plan_uuid in self.alarm_plan_cron_map_:
- plan_name = self.alarm_plan_cron_map_[plan_uuid]
- del self.alarm_plan_cron_map_[plan_uuid]
- removed = True
- LOGINFO(f"remove alarm_plan: {plan_uuid}, {plan_name}")
- return removed
- # 启动调度器
- def start_scheduler(self, interval=5):
- if self.running:
- return
- self.running = True
- # plan 定时调度器
- self.thread = threading.Thread(
- target=self._scheduler,
- args=(interval,),
- daemon=True,
- name="APSchedulerThread")
- self.thread.start()
- # cron 定时调度线程
- self.cron_thread = threading.Thread(
- target=self._cron_scheduler,
- daemon=True,
- name="CronSchedulerThread")
- self.cron_thread.start()
- def stop_scheduler(self):
- self.running = False
- # 调度告警计划
- def _scheduler(self, interval):
- while self.running:
- plans = self.list_all_plan()
- plan: AlarmPlan = None
- for plan in plans:
- try:
- plan.update_status() # 更新状态
- if plan.status_ == 1: # 激活状态才执行
- plan.execute()
- except Exception as e:
- LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
- time.sleep(interval)
- # 调度定时任务
- def _cron_scheduler(self):
- import datetime
- last_run_map = {} # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
- last_reset_date = None # 上一次重置的日期
- while self.running:
- now = datetime.datetime.now()
- today = now.date()
- # 每天 00:00 重置执行标记
- if last_reset_date != today:
- last_run_map.clear()
- last_reset_date = today
- LOGINFO(f"[CronScheduler] reset last_run_map at {today}")
- plans = self.list_all_cron()
- plan: AlarmPlan = None
- for plan in plans:
- try:
- cron = plan.cron_
- if not cron or not plan.enable_:
- continue
- hour = cron.get("hour", None)
- minute = cron.get("minute", 0)
- run_time = datetime.datetime.combine(today, datetime.time(hour or 0, minute or 0))
- # 判断是否到达执行点
- if now >= run_time:
- # 检查今天是否已经执行过
- if last_run_map.get(plan.plan_uuid_) == today:
- continue
- # 执行任务
- plan.update_status() # 更新状态
- if plan.status_ == 1: # 激活状态才执行
- plan.execute()
- last_run_map[plan.plan_uuid_] = today
- LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_} at {now}")
- except Exception as e:
- LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
- time.sleep(5) # 每 30 秒检查一次
- # --------------------------------------
- # 创建计划相关
- # --------------------------------------
- # 清理过期事件任务
- def create_clean_expire_events_task(self, expire_range:int = 90):
- linkage_action = LinkageAction()
- cron = {
- "hour": 1,
- "minute": 0
- }
- time_plan = TimePlan(
- time_range = [{"start_time": "00:00","end_time": "23:59"}],
- start_date = '2025-09-01',
- stop_date = '2099-12-31',
- weekdays = [1,2,3,4,5,6,7],
- month_days = []
- )
- plan_uuid = 'clean_expire_events_task'
- plan_name = '清理过期事件'
- event_type = EventType.CLEAN_EXPIRE_EVENTS.value
- alarm_plan = AlarmPlan(
- plan_uuid = plan_uuid,
- name = plan_name,
- dev_id = 'LAS',
- dev_name = '告警联动服务',
- enable = 1,
- time_plan = time_plan,
- rect = [],
- event_type = event_type,
- threshold_time = 300,
- merge_time = 30,
- param = {},
- cron = cron,
- linkage_action=linkage_action,
- tenant_id = 0
- )
- alarm_plan.event_attr_.expire_range_ = expire_range
- # 塞入告警计划
- self.push_cron(plan_uuid, alarm_plan)
- LOGINFO(f"create task: {plan_uuid}, {plan_name}")
- def query_one_alarm_plan(self, plan_uuid: str):
- # 查询单个告警计划
- params = {
- "plan_uuid": plan_uuid
- }
- db_req_que.put(DBRequest_Async(sql=sqls.sql_query_one_alarm_plan, params=params,
- callback=self.cb_query_one_alarm_plan, userdata=plan_uuid))
- # 更新单条告警计划
- def update_one_alarm_plan(self, alarm_plan:AlarmPlan):
- plan_uuid = alarm_plan.plan_uuid_
- plan_name = alarm_plan.name_
- if alarm_plan.enable_:
- # 塞入告警计划
- if not alarm_plan.cron_:
- self.push_plan(plan_uuid, alarm_plan)
- else:
- self.push_cron(plan_uuid, alarm_plan)
- LOGINFO(f"update alarm_plan: {plan_uuid}, {plan_name}")
- else:
- LOGINFO(f"disable alarm_plan: {plan_uuid}, {plan_name}")
- self.remove_one_alarm_plan(plan_uuid)
- # 查询单个告警计划回调
- def cb_query_one_alarm_plan(self, result, userdata):
- try:
- plan_uuid = userdata
- if not result:
- LOGDBG("cb_query_one_alarm_plan, invalid result")
- self.remove_one_alarm_plan(plan_uuid)
- for row in result:
- plan_uuid: str = row["plan_uuid"]
- plan_name: str = row["plan_name"]
- dev_id: str = row["dev_id"]
- dev_name: str = row["dev_name"]
- enable: int = bool(row["enable"])
- region = row["region"]
- rect = helper.region_to_rect(region)
- if not helper.check_plan_rect_valid(event_type, rect):
- LOGWARN(f"skip plan {plan_uuid}: invalid rect={rect} for event_type={event_type}")
- continue
- threshold_time: int = row["threshold_time"]
- merge_time: int = row["merge_time"]
- param: dict = json.loads(row["param"])
- event_val: int = row["event_val"]
- event_type = event_val
- event_str: str = row["event_str"]
- event_desc: str = row["event_desc"]
- tenant_id: int = row.get("tenant_id") or 0
- start_date = row["start_date"]
- stop_date = row["stop_date"]
- time_range = json.loads(row["time_range"])
- month_days = None
- if row["month_days"]:
- month_days = ast.literal_eval(row["month_days"])
- weekdays = None
- if row["weekdays"]:
- weekdays = ast.literal_eval(row["weekdays"])
- linkage_push_wechat_service: int = row.get("linkage_push_wechat_service") or 0
- linkage_action = LinkageAction(wechat_service=linkage_push_wechat_service)
- cron = None
- if ((event_type == EventType.TOILETING_FREQUENCY.value) or
- (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
- (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
- (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
- (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
- cron = {
- "hour": 7,
- "minute": 0
- }
- time_plan = TimePlan(
- time_range = time_range,
- start_date = start_date,
- stop_date = stop_date,
- weekdays = weekdays,
- month_days = month_days
- )
- alarm_plan = AlarmPlan(
- plan_uuid = plan_uuid,
- name = plan_name,
- dev_id = dev_id,
- dev_name = dev_name,
- enable = enable,
- time_plan = time_plan,
- rect = rect,
- event_type = event_type,
- threshold_time = threshold_time,
- merge_time = merge_time,
- param = param,
- cron = cron,
- linkage_action=linkage_action,
- tenant_id = tenant_id
- )
- if alarm_plan.event_attr_ is None:
- LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
- continue
- # 更新告警计划
- self.update_one_alarm_plan(alarm_plan)
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return
- def init_alarm_plan_mgr():
- g_las.g_alarm_plan_mgr = AlarmPlanManager()
- def start_alarm_plan_mgr():
- g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
- LOGINFO(f"start g_alarm_plan_mgr succeed")
- # --------------------------------------
- # 数据库响应的回调函数
- # --------------------------------------
- # 将region字典转换为xy平面的rect [left, top, width, height]
- def region_to_rect(region: dict) -> list:
- x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
- y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
- left = min(x_start, x_stop)
- right = max(x_start, x_stop)
- top = min(y_start, y_stop)
- bottom = max(y_start, y_stop)
- width = right - left
- height = bottom - top
- return [left, top, width, height]
- # 回调函数,处理查询结果:查询所有的告警计划信息
- def cb_handle_query_all_alarm_plan_info(result, userdata):
- try:
- if not result:
- LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
- for row in result:
- plan_uuid: str = row["plan_uuid"]
- plan_name: str = row["plan_name"]
- dev_id: str = row["dev_id"]
- dev_name: str = row["dev_name"]
- enable: int = bool(row["enable"])
- region = row["region"]
- rect = helper.region_to_rect(region)
- threshold_time: int = row["threshold_time"]
- merge_time: int = row["merge_time"]
- param: dict = json.loads(row["param"]) if row.get("param") else {}
- event_val: int = row["event_val"]
- event_type = event_val
- event_str: str = row["event_str"]
- event_desc: str = row["event_desc"]
- tenant_id: int = row.get("tenant_id", 0)
- start_date = row["start_date"]
- stop_date = row["stop_date"]
- time_range = json.loads(row["time_range"])
- month_days = None
- if row["month_days"]:
- month_days = ast.literal_eval(row["month_days"])
- weekdays = None
- if row["weekdays"]:
- weekdays = ast.literal_eval(row["weekdays"])
- linkage_push_wechat_service: int = row.get("linkage_push_wechat_service") or 0
- linkage_action = LinkageAction(wechat_service=linkage_push_wechat_service)
- cron = None
- if ((event_type == EventType.TOILETING_FREQUENCY.value) or
- (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
- (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
- (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
- (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
- cron = {
- "hour": 7,
- "minute": 0
- }
- time_plan = TimePlan(
- time_range = time_range,
- start_date = start_date,
- stop_date = stop_date,
- weekdays = weekdays,
- month_days = month_days
- )
- alarm_plan = AlarmPlan(
- plan_uuid = plan_uuid,
- name = plan_name,
- dev_id = dev_id,
- dev_name = dev_name,
- enable = enable,
- time_plan = time_plan,
- rect = rect,
- event_type = event_type,
- threshold_time = threshold_time,
- merge_time = merge_time,
- param = param,
- cron = cron,
- linkage_action=linkage_action,
- tenant_id = tenant_id
- )
- if alarm_plan.event_attr_ is None:
- LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
- continue
- # 塞入告警计划
- if not alarm_plan.cron_:
- g_las.g_alarm_plan_mgr.push_plan(plan_uuid, alarm_plan)
- else:
- g_las.g_alarm_plan_mgr.push_cron(plan_uuid, alarm_plan)
- LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
|