123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- from typing import List, Tuple, Optional
- import time
- from datetime import datetime, date
- import threading
- from threading import Thread, Lock, Event
- from enum import Enum
- import queue
- import json
- import ast
- import traceback
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from core.alarm_event import AlarmEvent
- from core.alarm_plan import AlarmPlan
- from core.time_plan import TimePlan
- from core.event_type import EventType
- import core.g_LAS as g_las
- class AlarmPlanManager:
- def __init__(self,
- alarm_plan_map: dict = None,
- alarm_plan_cron_map: dict = None):
- self.lock_ = Lock()
- self.alarm_plan_map_ = alarm_plan_map or {} # <uuid, AlarmPlan>
- self.alarm_plan_cron_map_ = alarm_plan_cron_map or {} # <uuid, AlarmPlan>
- self.running = False
- self.thread = None
- def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
- with self.lock_:
- self.alarm_plan_map_[plan_uuid] = alarm_plan
- def pop(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_map_.pop(plan_uuid, None)
- def find(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_map_.get(plan_uuid, None)
- def delete(self, plan_uuid: str) -> bool:
- with self.lock_:
- if plan_uuid in self.alarm_plan_map_:
- del self.alarm_plan_map_[plan_uuid]
- return True
- return False
- def list_all_plan(self) -> list:
- with self.lock_:
- return list(self.alarm_plan_map_.values())
- def push_cron(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
- with self.lock_:
- self.alarm_plan_cron_map_[plan_uuid] = alarm_plan
- def pop_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_cron_map_.pop(plan_uuid, None)
- def find_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
- with self.lock_:
- return self.alarm_plan_cron_map_.get(plan_uuid, None)
- def delete_cron(self, plan_uuid: str) -> bool:
- with self.lock_:
- if plan_uuid in self.alarm_plan_cron_map_:
- del self.alarm_plan_cron_map_[plan_uuid]
- return True
- return False
- def list_all_cron(self) -> list:
- with self.lock_:
- return list(self.alarm_plan_cron_map_.values())
- def start_scheduler(self, interval=5):
- if self.running:
- return
- self.running = True
- self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
- self.thread.start()
- # 启动 cron 定时调度线程
- self.cron_thread = threading.Thread(target=self._cron_scheduler, daemon=True)
- self.cron_thread.start()
- def stop_scheduler(self):
- self.running = False
- # 调度告警计划
- def _scheduler(self, interval):
- while self.running:
- plans = self.list_all_plan()
- plan: AlarmPlan = None
- for plan in plans:
- try:
- plan.update_status() # 更新状态
- if plan.status_ == 1: # 激活状态才执行
- plan.execute()
- except Exception as e:
- LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
- time.sleep(interval)
- # 调度定时任务
- def _cron_scheduler(self):
- import datetime
- last_run_map = {} # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
- while self.running:
- now = datetime.datetime.now()
- today = now.date()
- # 每天 00:00 重置执行标记
- # todo
- plans = self.list_all_cron()
- plan: AlarmPlan = None
- for plan in plans:
- try:
- cron = plan.cron_
- if not cron or not plan.enable_:
- continue
- hour = cron.get("hour", None)
- minute = cron.get("minute", 0)
- run_time = datetime.datetime.combine(today, datetime.time(hour or 0, minute or 0))
- # 判断是否到达执行点
- if now >= run_time:
- # 检查今天是否已经执行过
- if last_run_map.get(plan.plan_uuid_) == today:
- continue
- # 执行任务
- plan.update_status() # 更新状态
- if plan.status_ == 1: # 激活状态才执行
- plan.execute()
- last_run_map[plan.plan_uuid_] = today
- LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_} at {now}")
- except Exception as e:
- LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
- time.sleep(5) # 每 30 秒检查一次
- # 分发器
- class EventDispatcher:
- def __init__(self):
- self.queues = {} # event_type -> Queue
- self.threads = {}
- self.running = False
- def start(self, handlers: dict):
- """handlers: {event_type: handler_func}"""
- self.running = True
- for event_type, handler in handlers.items():
- q = queue.Queue()
- self.queues[event_type] = q
- t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
- self.threads[event_type] = t
- t.start()
- def stop(self):
- self.running = False
- def dispatch(self, event_type: int, plan):
- if event_type in self.queues:
- self.queues[event_type].put(plan)
- else:
- LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
- def worker(self, event_type, q: queue.Queue, handler):
- while self.running:
- try:
- plan = q.get(timeout=1)
- handler(plan)
- except queue.Empty:
- continue
- except Exception as e:
- LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
- def init_alarm_plan_mgr():
- g_las.g_alarm_plan_mgr = AlarmPlanManager()
- g_las.g_event_dispatcher = EventDispatcher()
- def start_alarm_plan_mgr():
- g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
- def start_event_dispatcher():
- # 注册事件处理函数
- handles = {
- EventType.STAY_DETECTION.value : AlarmPlan.handle_stay_detection,
- EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
- EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
- EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
- EventType.NIGHT_TOILETING_FREQUENCY.value : AlarmPlan.handle_night_toileting_frequency,
- EventType.TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_toileting_frequency_abnormal,
- EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal,
- EventType.BATHROOM_STAY_FREQUENCY.value : AlarmPlan.handle_bathroom_stay_frequency,
- EventType.TARGET_ABSENCE.value : AlarmPlan.handle_target_absence,
- }
- g_las.g_event_dispatcher.start(handles)
- # 将region字典转换为xy平面的rect [left, top, width, height]
- def region_to_rect(region: dict) -> list:
- x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
- y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
- left = min(x_start, x_stop)
- right = max(x_start, x_stop)
- top = min(y_start, y_stop)
- bottom = max(y_start, y_stop)
- width = right - left
- height = bottom - top
- return [left, top, width, height]
- # 回调函数,处理查询结果:查询所有的告警计划信息
- def cb_handle_query_all_alarm_plan_info(result):
- try:
- if result:
- for row in result:
- plan_uuid: str = row["plan_uuid"]
- plan_name: str = row["plan_name"]
- dev_id: str = row["dev_id"]
- enable: int = bool(row["enable"])
- # region = row["region"]
- # rect = json.loads(region_to_rect(region))
- rect: list = json.loads(row["region"]) if row.get("region") else []
- threshold_time: int = row["threshold_time"]
- merge_time: int = row["merge_time"]
- param: dict = json.loads(row["param"])
- event_val: int = row["event_val"]
- event_type = event_val
- event_str: str = row["event_str"]
- event_desc: str = row["event_desc"]
- start_date = row["start_date"]
- stop_date = row["stop_date"]
- time_range = json.loads(row["time_range"])
- month_days = None
- if row["month_days"]:
- month_days = ast.literal_eval(row["month_days"])
- weekdays = None
- if row["weekdays"]:
- weekdays = ast.literal_eval(row["weekdays"])
- cron = None
- if ((event_type == EventType.TOILETING_FREQUENCY.value) or
- (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
- (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
- (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
- (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
- cron = {
- "hour": 1,
- "minute": 0
- }
- time_plan = TimePlan(
- time_range = time_range,
- start_date = start_date,
- stop_date = stop_date,
- weekdays = weekdays,
- month_days = month_days
- )
- alarm_plan = AlarmPlan(
- plan_uuid = plan_uuid,
- name = plan_name,
- dev_id = dev_id,
- enable = enable,
- time_plan = time_plan,
- rect = rect,
- event_type = event_type,
- threshold_time = threshold_time,
- merge_time = merge_time,
- param = param,
- cron = cron
- )
- if alarm_plan.event_attr_ is None:
- LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
- continue
- # 塞入告警计划
- if not alarm_plan.cron_:
- g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
- else:
- g_las.g_alarm_plan_mgr.push_cron(plan_uuid, alarm_plan)
- LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
- else:
- LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
|