ソースを参照

支持对定时计划调度

nifangxu 1 ヶ月 前
コミット
cf8f345776
3 ファイル変更129 行追加52 行削除
  1. 0 0
      core/alarm_checker.py
  2. 39 32
      core/alarm_plan.py
  3. 90 20
      core/alarm_plan_manager.py

+ 0 - 0
core/alarm_checker.py


+ 39 - 32
core/alarm_plan.py

@@ -77,15 +77,13 @@ class EventAttr_ToiletingDetection(EventAttr_Base):
 # 事件属性 如厕频次统计
 class EventAttr_ToiletingFrequency(EventAttr_Base):
     def __init__(self):
-        self.enter_ts_: int  = -1    # 进入时间(ms)
-        self.leave_ts_: int  = -1    # 离开时间(ms)
-        self.stay_time_: int = -1    # 停留时长(ms)
+        self.count_: int        = 0
+        self.event_list: list   = []
         return
 
     def reset(self):
-        self.enter_ts_  = -1
-        self.leave_ts_  = -1
-        self.stay_time_ = -1
+        self.count_  = 0
+        self.event_list  = []
 
 # 事件属性 夜间如厕频次统计
 class EventAttr_NightToiletingFrequency(EventAttr_Base):
@@ -153,6 +151,26 @@ class EventAttr_TargetAbsence(EventAttr_Base):
         self.stay_time_ = -1
 
 
+# 事件属性表
+event_attr_map = {
+    EventType.STAY_DETECTION.value                  : EventAttr_StayDetection,
+    EventType.RETENTION_DETECTION.value             : EventAttr_RetentionDetection,
+    EventType.TOILETING_DETECTION.value             : EventAttr_ToiletingDetection,
+    EventType.TOILETING_FREQUENCY.value             : EventAttr_ToiletingFrequency,
+    EventType.NIGHT_TOILETING_FREQUENCY.value       : EventAttr_NightToiletingFrequency,
+    EventType.TOILETING_FREQUENCY_ABNORMAL.value    : EventAttr_ToiletingFrequencyAbnormal,
+    EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal,
+    EventType.BATHROOM_STAY_FREQUENCY.value         : EventAttr_BathroomStayFrequency,
+    EventType.TARGET_ABSENCE.value                  : EventAttr_TargetAbsence,
+}
+
+class Cron:
+    def __init__(self, h, m, s):
+        self.h_ = None
+        self.m_ = None
+        self.s_ = None
+
+
 class AlarmPlan:
     def __init__(self,
                  plan_uuid: str,
@@ -164,7 +182,8 @@ class AlarmPlan:
                  event_type: int,
                  threshold_time: int,
                  merge_time: int,
-                 param: dict
+                 param: dict,
+                 cron: Optional[dict] = None
                  ):
         self.lock_          = Lock()
         self.plan_uuid_     = plan_uuid     # 计划id
@@ -178,19 +197,6 @@ class AlarmPlan:
         self.status_ = 0     # 0未激活,1激活,-1过期
         self.status_update_ts_ = -1   # 状态更新时间,初始值为-1
 
-        # 事件属性表
-        self.event_attr_map = {
-            EventType.STAY_DETECTION.value                  : EventAttr_StayDetection,
-            EventType.RETENTION_DETECTION.value             : EventAttr_RetentionDetection,
-            EventType.TOILETING_DETECTION.value             : EventAttr_ToiletingDetection,
-            EventType.TOILETING_FREQUENCY.value             : EventAttr_ToiletingFrequency,
-            EventType.NIGHT_TOILETING_FREQUENCY.value       : EventAttr_NightToiletingFrequency,
-            EventType.TOILETING_FREQUENCY_ABNORMAL.value    : EventAttr_ToiletingFrequencyAbnormal,
-            EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal,
-            EventType.BATHROOM_STAY_FREQUENCY.value         : EventAttr_BathroomStayFrequency,
-            EventType.TARGET_ABSENCE.value                  : EventAttr_TargetAbsence,
-        }
-
         # 事件触发参数
         self.rect_          = rect        # 检测区域  [left, top, width, height]
         self.threshold_time_    = threshold_time    # 触发时间阈值
@@ -200,11 +206,23 @@ class AlarmPlan:
         if self.event_attr_ is None:
             raise ValueError(f"Invalid event_type: {event_type}")
 
+        self.cron_ = cron   # {“hour": 6, "minute": 0}
 
     def execute(self):
         if self.status_ != 1:
             return
-        g_las.g_event_dispatcher.dispatch(self.event_type_, self)
+        if not self.cron_:
+            g_las.g_event_dispatcher.dispatch(self.event_type_, self)
+
+
+    # 初始化事件属性
+    def init_event_attr(self):
+        event_cls = event_attr_map.get(self.event_type_)
+        if event_cls is None:
+            return None
+
+        event_attr    = event_cls()
+        return event_attr
 
 
     # 更新激活状态
@@ -258,17 +276,6 @@ class AlarmPlan:
         return None
 
 
-
-    # 初始化事件属性
-    def init_event_attr(self):
-        event_cls = self.event_attr_map.get(self.event_type_)
-        if event_cls is None:
-            return None
-
-        event_attr    = event_cls()
-        return event_attr
-
-
     # 停留事件
     def handle_stay_detection(self):
         try:

+ 90 - 20
core/alarm_plan_manager.py

@@ -20,46 +20,63 @@ import core.g_LAS as g_las
 
 
 class AlarmPlanManager:
-    def __init__(self, alarm_plan_map: dict = None):
+    def __init__(self,
+                 alarm_plan_map: dict = None,
+                 alarm_plan_cron_map: dict = None):
         self.lock_ = Lock()
 
         self.alarm_plan_map_ = alarm_plan_map or {}   # <uuid, AlarmPlan>
+        self.alarm_plan_cron_map_ = alarm_plan_cron_map or {}   # <uuid, AlarmPlan>
 
         self.running = False
         self.thread = None
 
     def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
-        """插入/更新一个 AlarmPlan"""
         with self.lock_:
             self.alarm_plan_map_[plan_uuid] = alarm_plan
 
-
     def pop(self, plan_uuid: str) -> Optional[AlarmPlan]:
-        """弹出并返回 AlarmPlan,如果不存在返回 None"""
         with self.lock_:
             return self.alarm_plan_map_.pop(plan_uuid, None)
 
-
     def find(self, plan_uuid: str) -> Optional[AlarmPlan]:
-        """查找 AlarmPlan,如果不存在返回 None"""
         with self.lock_:
             return self.alarm_plan_map_.get(plan_uuid, None)
 
-
     def delete(self, plan_uuid: str) -> bool:
-        """删除 AlarmPlan,成功返回 True,不存在返回 False"""
         with self.lock_:
             if plan_uuid in self.alarm_plan_map_:
                 del self.alarm_plan_map_[plan_uuid]
                 return True
             return False
 
-
-    def list_all(self) -> list:
-        """返回所有 AlarmPlan(浅拷贝列表)"""
+    def list_all_plan(self) -> list:
         with self.lock_:
             return list(self.alarm_plan_map_.values())
 
+    def push_cron(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
+        with self.lock_:
+            self.alarm_plan_cron_map_[plan_uuid] = alarm_plan
+
+    def pop_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
+        with self.lock_:
+            return self.alarm_plan_cron_map_.pop(plan_uuid, None)
+
+    def find_cron(self, plan_uuid: str) -> Optional[AlarmPlan]:
+        with self.lock_:
+            return self.alarm_plan_cron_map_.get(plan_uuid, None)
+
+    def delete_cron(self, plan_uuid: str) -> bool:
+        with self.lock_:
+            if plan_uuid in self.alarm_plan_cron_map_:
+                del self.alarm_plan_cron_map_[plan_uuid]
+                return True
+            return False
+
+    def list_all_cron(self) -> list:
+        with self.lock_:
+            return list(self.alarm_plan_cron_map_.values())
+
 
     def start_scheduler(self, interval=5):
         if self.running:
@@ -68,12 +85,17 @@ class AlarmPlanManager:
         self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
         self.thread.start()
 
+        # 启动 cron 定时调度线程
+        self.cron_thread = threading.Thread(target=self._cron_scheduler, daemon=True)
+        self.cron_thread.start()
+
     def stop_scheduler(self):
         self.running = False
 
+    # 调度告警计划
     def _scheduler(self, interval):
         while self.running:
-            plans = self.list_all()
+            plans = self.list_all_plan()
             plan: AlarmPlan = None
             for plan in plans:
                 try:
@@ -84,7 +106,40 @@ class AlarmPlanManager:
                     LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
             time.sleep(interval)
 
+    # 调度定时任务
+    def _cron_scheduler(self):
+        last_run = {}  # {plan_uuid: date} 记录任务上次执行日期,避免重复跑
+        while self.running:
+            now = datetime.now()
+            plans = self.list_all_cron()
+            plan: AlarmPlan = None
+            for plan in plans:
+                try:
+                    cron = plan.cron_
+                    if not cron or not plan.enable_:
+                        continue
+
+                    hour = cron.get("hour", None)
+                    minute = cron.get("minute", 0)
+
+                    # 判断是否到达执行点
+                    if ((hour is not None and now.hour != hour) or
+                        (minute is not None and now.minute != minute)):
+                        continue
+
+                    # 防止一分钟内重复执行
+                    if last_run.get(plan.plan_uuid_) == now.date():
+                        continue
 
+                    plan.execute()
+                    last_run[plan.plan_uuid_] = now.date()
+                    LOGINFO(f"[CronScheduler] executed cron plan {plan.plan_uuid_}")
+
+                except Exception as e:
+                    LOGERR(f"[CronScheduler] plan {plan.plan_uuid_} cron error: {e}")
+            time.sleep(30)  # 每 30 秒检查一次
+
+# 分发器
 class EventDispatcher:
     def __init__(self):
         self.queues = {}  # event_type -> Queue
@@ -127,6 +182,11 @@ def init_alarm_plan_mgr():
     g_las.g_event_dispatcher = EventDispatcher()
 
 
+
+def start_alarm_plan_mgr():
+    g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
+
+
 def start_event_dispatcher():
     # 注册事件处理函数
     handles = {
@@ -143,11 +203,6 @@ def start_event_dispatcher():
     g_las.g_event_dispatcher.start(handles)
 
 
-def start_alarm_plan_mgr():
-    g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
-
-
-
 # 将region字典转换为xy平面的rect [left, top, width, height]
 def region_to_rect(region: dict) -> list:
     x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
@@ -195,6 +250,17 @@ def cb_handle_query_all_alarm_plan_info(result):
                 if row["weekdays"]:
                     weekdays = ast.literal_eval(row["weekdays"])
 
+                cron = None
+                if ((event_type == EventType.TOILETING_FREQUENCY.value) or
+                    (event_type == EventType.NIGHT_TOILETING_FREQUENCY.value) or
+                    (event_type == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
+                    (event_type == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value) or
+                    (event_type == EventType.BATHROOM_STAY_FREQUENCY.value)):
+                    cron = {
+                        "hour": 6,
+                        "minute": 0
+                    }
+
                 time_plan = TimePlan(
                     time_range  = time_range,
                     start_date  = start_date,
@@ -213,14 +279,18 @@ def cb_handle_query_all_alarm_plan_info(result):
                     event_type  = event_type,
                     threshold_time  = threshold_time,
                     merge_time  = merge_time,
-                    param       = param
+                    param       = param,
+                    cron        = cron
                 )
                 if alarm_plan.event_attr_ is None:
                     LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
                     continue
 
-                # 更新设备信息
-                g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
+                # 塞入告警计划
+                if not alarm_plan.cron_:
+                    g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
+                else:
+                    g_las.g_alarm_plan_mgr.push_cron(plan_uuid, alarm_plan)
 
             LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
         else: