|
- from typing import List, Tuple, Optional
- import time
- from datetime import datetime, date
- import threading
- from threading import Thread, Lock, Event
- from enum import Enum
- import queue
- import json
- import ast
- import traceback
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from core.alarm_event import AlarmEvent
- from core.alarm_plan import AlarmPlan
- from core.time_plan import TimePlan
- from core.event_type import EventType
- import core.g_LAS as g_las
- class AlarmPlanManager:
- def __init__(self, alarm_plan_map: dict = None):
- self.lock_ = Lock()
- self.alarm_plan_map_ = alarm_plan_map or {} # <uuid, AlarmPlan>
- self.running = False
- self.thread = None
- def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
- """插入/更新一个 AlarmPlan"""
- with self.lock_:
- self.alarm_plan_map_[plan_uuid] = alarm_plan
- def pop(self, plan_uuid: str) -> Optional[AlarmPlan]:
- """弹出并返回 AlarmPlan,如果不存在返回 None"""
- with self.lock_:
- return self.alarm_plan_map_.pop(plan_uuid, None)
- def find(self, plan_uuid: str) -> Optional[AlarmPlan]:
- """查找 AlarmPlan,如果不存在返回 None"""
- with self.lock_:
- return self.alarm_plan_map_.get(plan_uuid, None)
- def delete(self, plan_uuid: str) -> bool:
- """删除 AlarmPlan,成功返回 True,不存在返回 False"""
- with self.lock_:
- if plan_uuid in self.alarm_plan_map_:
- del self.alarm_plan_map_[plan_uuid]
- return True
- return False
- def list_all(self) -> list:
- """返回所有 AlarmPlan(浅拷贝列表)"""
- with self.lock_:
- return list(self.alarm_plan_map_.values())
- def start_scheduler(self, interval=5):
- if self.running:
- return
- self.running = True
- self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
- self.thread.start()
- def stop_scheduler(self):
- self.running = False
- def _scheduler(self, interval):
- while self.running:
- plans = self.list_all()
- plan: AlarmPlan = None
- for plan in plans:
- try:
- plan.update_status() # 更新状态
- if plan.status_ == 1: # 激活状态才执行
- plan.execute()
- except Exception as e:
- LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
- time.sleep(interval)
- class EventDispatcher:
- def __init__(self):
- self.queues = {} # event_type -> Queue
- self.threads = {}
- self.running = False
- def start(self, handlers: dict):
- """handlers: {event_type: handler_func}"""
- self.running = True
- for event_type, handler in handlers.items():
- q = queue.Queue()
- self.queues[event_type] = q
- t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
- self.threads[event_type] = t
- t.start()
- def stop(self):
- self.running = False
- def dispatch(self, event_type: int, plan):
- if event_type in self.queues:
- self.queues[event_type].put(plan)
- else:
- LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
- def worker(self, event_type, q: queue.Queue, handler):
- while self.running:
- try:
- plan = q.get(timeout=1)
- handler(plan)
- except queue.Empty:
- continue
- except Exception as e:
- LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
- def init_alarm_plan_mgr():
- g_las.g_alarm_plan_mgr = AlarmPlanManager()
- g_las.g_event_dispatcher = EventDispatcher()
- def start_event_dispatcher():
- # 注册事件处理函数
- handles = {
- EventType.STAY_DETECTION.value : AlarmPlan.handle_stay_detection,
- EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
- EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
- EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
- }
- g_las.g_event_dispatcher.start(handles)
- def start_alarm_plan_mgr():
- g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
- # 将region字典转换为xy平面的rect [left, top, width, height]
- def region_to_rect(region: dict) -> list:
- x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
- y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
- left = min(x_start, x_stop)
- right = max(x_start, x_stop)
- top = min(y_start, y_stop)
- bottom = max(y_start, y_stop)
- width = right - left
- height = bottom - top
- return [left, top, width, height]
- # 回调函数,处理查询结果:查询所有的告警计划信息
- def cb_handle_query_all_alarm_plan_info(result):
- try:
- if result:
- for row in result:
- plan_uuid: str = row["plan_uuid"]
- plan_name: str = row["plan_name"]
- dev_id: str = row["dev_id"]
- enable: int = bool(row["enable"])
- # region = row["region"]
- # rect = json.loads(region_to_rect(region))
- rect: list = json.loads(row["region"])
- threshold_time: int = row["threshold_time"]
- merge_time: int = row["merge_time"]
- event_val: int = row["event_val"]
- event_type = event_val
- event_str: str = row["event_str"]
- event_desc: str = row["event_desc"]
- start_date = row["start_date"]
- stop_date = row["stop_date"]
- time_range = json.loads(row["time_range"])
- month_days = None
- if row["month_days"]:
- month_days = ast.literal_eval(row["mozhenth_days"])
- weekdays = None
- if row["weekdays"]:
- weekdays = ast.literal_eval(row["weekdays"])
- time_plan = TimePlan(
- time_range = time_range,
- start_date = start_date,
- stop_date = stop_date,
- weekdays = weekdays,
- month_days = month_days
- )
- alarm_plan = AlarmPlan(
- plan_uuid = plan_uuid,
- name = plan_name,
- dev_id = dev_id,
- enable = enable,
- time_plan = time_plan,
- rect = rect,
- event_type = event_type,
- threshold_time = threshold_time,
- merge_time = merge_time
- )
- if alarm_plan.event_attr_ is None:
- LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
- continue
- # 更新设备信息
- g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
- LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
- else:
- LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
|