Forráskód Böngészése

修复bug:清理事件sql执行失败

nifangxu 1 hónapja
szülő
commit
51ddfc53d7
8 módosított fájl, 43 hozzáadás és 34 törlés
  1. 6 6
      LAS.py
  2. 12 14
      core/alarm_plan.py
  3. 1 1
      core/alarm_plan_dispatcher.py
  4. 5 4
      core/alarm_plan_manager.py
  5. 2 2
      core/event_type.py
  6. 8 3
      core/time_plan.py
  7. 6 3
      db/db_process.py
  8. 3 1
      db/db_sqls.py

+ 6 - 6
LAS.py

@@ -166,7 +166,7 @@ def run():
         time.sleep(1)
 
 
-def query_events_expired_range():
+def query_events_expire_range():
     event_save_range = 90
     try:
         result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15)
@@ -186,11 +186,11 @@ def query_events_expired_range():
 
 
 # 创建事件过期计划
-def create_events_expired_plan():
-    event_save_range = query_events_expired_range()
+def create_events_expire_plan():
+    event_save_range = query_events_expire_range()
     if not event_save_range:
         return -1
-    g_las.g_alarm_plan_mgr.create_clean_expired_events_task(event_save_range)
+    g_las.g_alarm_plan_mgr.create_clean_expire_events_task(event_save_range)
     return 0
 
 
@@ -212,9 +212,9 @@ def main_process():
                              callback=cb_handle_query_all_alarm_plan_info))
 
     # 创建任务:创建事件过期计划
-    iRet = create_events_expired_plan()
+    iRet = create_events_expire_plan()
     if iRet:
-        LOGERR(f"create_events_expired_plan failed, process termination")
+        LOGERR(f"create_events_expire_plan failed, process termination")
         return iRet
 
     # 轮循任务

+ 12 - 14
core/alarm_plan.py

@@ -7,6 +7,7 @@ import json
 import traceback
 from datetime import datetime, timezone, timedelta
 
+import common.sys_comm as sys_comm
 from common.sys_comm import (
     LOGDBG, LOGINFO, LOGWARN, LOGERR,
     get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
@@ -161,19 +162,11 @@ class EventAttr_TargetAbsence(EventAttr_Base):
         self.absence_time_ = -1
 
 # 事件属性 清理过期事件(无用)
-class EventAttr_CleanExpiredEvents(EventAttr_Base):
+class EventAttr_CleanExpireEvents(EventAttr_Base):
     def __init__(self, event_type):
         self.event_type_ = event_type
-        self.leave_ts_: int  = -1    # 离开时间(ms)
-        self.enter_ts_: int  = -1    # 进入时间(ms)
-        self.absence_time_: int = -1    # 消失时长(ms)
-        self.time_threshold_: int = 300    # 触发消失时间阈值(ms)
-        return
+        self.expire_range_ = 90
 
-    def reset(self):
-        self.leave_ts_  = -1
-        self.enter_ts_  = -1
-        self.absen
 
 # 事件属性表
 event_attr_map = {
@@ -187,7 +180,7 @@ event_attr_map = {
     EventType.BATHROOM_STAY_FREQUENCY.value         : EventAttr_BathroomStayFrequency,
     EventType.TARGET_ABSENCE.value                  : EventAttr_TargetAbsence,
 
-    EventType.CLEAN_EXPIRED_EVENTS.value            : EventAttr_CleanExpiredEvents,
+    EventType.CLEAN_EXPIRE_EVENTS.value             : EventAttr_CleanExpireEvents,
 }
 
 class Cron:
@@ -258,8 +251,10 @@ class AlarmPlan:
             if ((self.event_type_ == EventType.TOILETING_FREQUENCY_ABNORMAL.value) or
                 (self.event_type_ == EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value)):
                 event_attr.threshold_count_ = int(self.param_.get("count", 0))
-            if ((self.event_type_ == EventType.TARGET_ABSENCE.value)):
+            elif ((self.event_type_ == EventType.TARGET_ABSENCE.value)):
                 event_attr.time_threshold_ = int(self.param_.get("time_threshold", 0))
+            elif ((self.event_type_ == EventType.CLEAN_EXPIRE_EVENTS.value)):
+                event_attr.expire_range_ = 90
 
             return event_attr
 
@@ -819,9 +814,12 @@ class AlarmPlan:
 
 
     # 清理过期事件
-    def handle_clear_expired_events(self):
+    def handle_clear_expire_events(self):
         try:
-            db_execute_async(sqls.sql_delete_expired_events)
+            params = {
+                "save_days": self.event_attr_.expire_range_
+            }
+            db_execute_async(sqls.sql_delete_expire_events, params=params)
 
         except json.JSONDecodeError as e:
             tb_info = traceback.extract_tb(e.__traceback__)

+ 1 - 1
core/alarm_plan_dispatcher.py

@@ -64,6 +64,6 @@ def start_alarm_plan_dispatcher():
         EventType.BATHROOM_STAY_FREQUENCY.value         : AlarmPlan.handle_bathroom_stay_frequency,
         EventType.TARGET_ABSENCE.value                  : AlarmPlan.handle_target_absence,
 
-        EventType.CLEAN_EXPIRED_EVENTS.value            : AlarmPlan.handle_clear_expired_events
+        EventType.CLEAN_EXPIRE_EVENTS.value             : AlarmPlan.handle_clear_expire_events
     }
     g_las.g_alarm_plan_disp.start(handles)

+ 5 - 4
core/alarm_plan_manager.py

@@ -170,7 +170,7 @@ class AlarmPlanManager:
 # 创建计划相关
 # --------------------------------------
     # 清理过期事件任务
-    def create_clean_expired_events_task(self, save_range:int = 90):
+    def create_clean_expire_events_task(self, expire_range:int = 90):
         linkage_action  = LinkageAction()
         cron = {
             "hour": 1,
@@ -185,9 +185,9 @@ class AlarmPlanManager:
             month_days  = []
         )
 
-        plan_uuid   = 'clean_expired_events_task'
+        plan_uuid   = 'clean_expire_events_task'
         plan_name   = '清理过期事件'
-        event_type  = EventType.CLEAN_EXPIRED_EVENTS.value
+        event_type  = EventType.CLEAN_EXPIRE_EVENTS.value
         alarm_plan = AlarmPlan(
             plan_uuid   = plan_uuid,
             name        = plan_name,
@@ -204,9 +204,10 @@ class AlarmPlanManager:
             linkage_action=linkage_action,
             tenant_id   = 0
         )
+        alarm_plan.event_attr_.expire_range_ = expire_range
         # 塞入告警计划
         self.push_cron(plan_uuid, alarm_plan)
-        LOGINFO(f"create task:clean_expired_events_task, {plan_uuid}, {plan_name}")
+        LOGINFO(f"create task: {plan_uuid}, {plan_name}")
 
 
     def query_one_alarm_plan(self, plan_uuid: str):

+ 2 - 2
core/event_type.py

@@ -14,7 +14,7 @@ class EventType(Enum):
     TARGET_ABSENCE                  = 9 # 异常消失
 
     # 平台事件(任务)
-    CLEAN_EXPIRED_EVENTS            = 9001  # 清理过期事件
+    CLEAN_EXPIRE_EVENTS             = 9001  # 清理过期事件
 
 
 event_desc_map = {
@@ -29,5 +29,5 @@ event_desc_map = {
     EventType.TARGET_ABSENCE.value              : "target_absence",
 
     # 平台事件(任务)
-    EventType.CLEAN_EXPIRED_EVENTS.value        : "clear_expired_events"
+    EventType.CLEAN_EXPIRE_EVENTS.value         : "clear_expire_events"
 }

+ 8 - 3
core/time_plan.py

@@ -37,11 +37,16 @@ class TimePlan:
             return False
 
         # 周日期匹配
-        if self.weekdays_ is not None and now.weekday() not in self.weekdays_:
-            return False
+        match_weekdays: bool = False
+        if self.weekdays_ is not None and now.weekday() in self.weekdays_:
+            match_weekdays = True
 
         # 月日期匹配
-        if self.month_days_ is not None and now.day not in self.month_days_:
+        match_monthdays: bool = False
+        if self.month_days_ is not None and now.day in self.month_days_:
+            match_monthdays = True
+
+        if not (match_weekdays or match_monthdays):
             return False
 
         # 时间匹配

+ 6 - 3
db/db_process.py

@@ -161,15 +161,17 @@ def handle_db_request(db_request):
                 try:
                     db_request.callback(result, db_request.userdata)
                 except Exception as e:
-                    LOGERR(f"[DB ERROR] 回调执行失败: {e}")
+                    LOGERR(f"[DB ERROR] 回调执行失败: {e}, sql: {db_request.sql}")
 
             if isinstance(db_request, DBRequest_Sync):
                 db_request.set_result(result)
 
+            # LOGINFO(f"[DB SUCCESS] SQL executed successfully: {db_request.sql}")
+
         conn.commit()
 
     except Exception as e:
-        LOGERR(f"[DB ERROR] SQL执行失败: {e}")
+        LOGERR(f"[DB ERROR] SQL执行失败: {e}, sql: {db_request.sql}")
         if isinstance(db_request, DBRequest_Sync):
             db_request.set_exception(e)
     finally:
@@ -235,7 +237,7 @@ def db_process():
                     async_executor.submit(handle_db_request, db_request)
 
             except Exception as e:
-                LOGERR(f"[DB Thread Error] {e}")
+                LOGERR(f"[DB Thread Error] {e}, sql: {db_request.sql}")
             finally:
                 db_req_que.task_done()
 
@@ -259,6 +261,7 @@ def stop_db_process():
         db_thread.join()
         LOGINFO("DB worker stopped")
 
+
 # ========== 示例 ==========
 
 # 处理数据库返回的结果

+ 3 - 1
db/db_sqls.py

@@ -184,9 +184,11 @@ WHERE
 
 
 # 删除 events 表中过期数据,只保留最近 {save_days} 天
-sql_delete_expired_events = """
+sql_delete_expire_events = """
 DELETE FROM lnxx_dev.events
 WHERE
     create_time < DATE_SUB(NOW(), INTERVAL %(save_days)s DAY)
     AND is_deleted = 0;
 """
+
+