Sfoglia il codice sorgente

1. 数据库封装同步和异步接口
2. 支持清理过期事件

nifangxu 1 mese fa
parent
commit
0bb9a50fb6
10 ha cambiato i file con 336 aggiunte e 100 eliminazioni
  1. 81 42
      LAS.py
  2. 50 16
      core/alarm_plan.py
  3. 2 0
      core/alarm_plan_dispatcher.py
  4. 48 4
      core/alarm_plan_manager.py
  5. 9 1
      core/event_type.py
  6. 127 30
      db/db_process.py
  7. 15 1
      db/db_sqls.py
  8. 1 1
      device/dev_mng.py
  9. 1 2
      mqtt/mqtt_recv.py
  10. 2 3
      mqtt/mqtt_send.py

+ 81 - 42
LAS.py

@@ -9,16 +9,18 @@ import sys
 import json
 
 import common.sys_comm as sys_comm
-from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
-from common.sys_comm import get_utc_time_ms
-from common.sys_comm import g_sys_conf, g_sys_conf_mtx, g_log_conf, g_log_conf_mtx
+from common.sys_comm import (
+    LOGDBG, LOGINFO, LOGWARN, LOGERR,
+    get_utc_time_ms
+)
 
 from mqtt import mqtt_process
 from mqtt.mqtt_process import MQTTClientThread, MQTTConsumerThread, mqtt_client, mqtt_consumer
 
 import db.db_process as db_process
 from db.db_process import db_req_que
-from db.db_process import DBRequest
+from db.db_process import DBRequest_Async
+from db.db_process import (db_execute_sync, db_execute_async)
 import db.db_sqls as sqls
 
 import device.dev_mng as g_Dev
@@ -81,47 +83,47 @@ def sys_init():
 
 
         # 初始化日志配置
-        with g_log_conf_mtx:
-            g_log_conf["module_name"]     = str(config["conf"]["module_name"])
-            g_log_conf["log_lvl"]         = int(config["conf"]["log_lvl"])
-            g_log_conf["max_log_size"]    = int(config["conf"]["max_log_size"]) * 1024 * 1024
-            g_log_conf["max_log_files"]   = int(config["conf"]["max_log_files"])
+        with sys_comm.g_log_conf_mtx:
+            sys_comm.g_log_conf["module_name"]     = str(config["conf"]["module_name"])
+            sys_comm.g_log_conf["log_lvl"]         = int(config["conf"]["log_lvl"])
+            sys_comm.g_log_conf["max_log_size"]    = int(config["conf"]["max_log_size"]) * 1024 * 1024
+            sys_comm.g_log_conf["max_log_files"]   = int(config["conf"]["max_log_files"])
         LOGDBG("log init succeed")
 
         # 初始化系统配置
-        with g_sys_conf_mtx:
-            g_sys_conf["module_name"]     = str(config["conf"]["module_name"])
-            g_sys_conf["platform"]        = int(config["conf"]["platform"])
-            g_sys_conf["db_host"]         = str(config["conf"]["db_host"])
-            g_sys_conf["log_lvl"]         = int(config["conf"]["log_lvl"])
-            g_sys_conf["max_log_size"]    = int(config["conf"]["max_log_size"]) * 1024 * 1024
-            g_sys_conf["max_log_files"]   = int(config["conf"]["max_log_files"])
+        with sys_comm.g_sys_conf_mtx:
+            sys_comm.g_sys_conf["module_name"]     = str(config["conf"]["module_name"])
+            sys_comm.g_sys_conf["platform"]        = int(config["conf"]["platform"])
+            sys_comm.g_sys_conf["db_host"]         = str(config["conf"]["db_host"])
+            sys_comm.g_sys_conf["log_lvl"]         = int(config["conf"]["log_lvl"])
+            sys_comm.g_sys_conf["max_log_size"]    = int(config["conf"]["max_log_size"]) * 1024 * 1024
+            sys_comm.g_sys_conf["max_log_files"]   = int(config["conf"]["max_log_files"])
 
             # windows 本地
-            if g_sys_conf["platform"] == 0:
-                g_sys_conf["host_ip"]     = str(config["windows"]["host_ip"])
-                g_sys_conf["server_ip"]   = str(config["windows"]["server_ip"])
-                g_sys_conf["ssh_host"]    = str(config["windows"]["ssh_host"])
-                g_sys_conf["ssh_port"]    = int(config["windows"]["ssh_port"])
-                mqtt_process.MQTT_BROKER = g_sys_conf["server_ip"]
+            if sys_comm.g_sys_conf["platform"] == 0:
+                sys_comm.g_sys_conf["host_ip"]     = str(config["windows"]["host_ip"])
+                sys_comm.g_sys_conf["server_ip"]   = str(config["windows"]["server_ip"])
+                sys_comm.g_sys_conf["ssh_host"]    = str(config["windows"]["ssh_host"])
+                sys_comm.g_sys_conf["ssh_port"]    = int(config["windows"]["ssh_port"])
+                mqtt_process.MQTT_BROKER = sys_comm.g_sys_conf["server_ip"]
             # linux 服务器
-            elif g_sys_conf["platform"] == 1:
-                g_sys_conf["host_ip"]     = str(config["linux"]["host_ip"])
-                mqtt_process.MQTT_BROKER = g_sys_conf["host_ip"]
+            elif sys_comm.g_sys_conf["platform"] == 1:
+                sys_comm.g_sys_conf["host_ip"]     = str(config["linux"]["host_ip"])
+                mqtt_process.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"]
 
-        g_sys_conf["sp_id"] = int(get_utc_time_ms())
+        sys_comm.g_sys_conf["sp_id"] = int(get_utc_time_ms())
 
         # 报警配置
-        g_sys_conf["alarm_conf"] = sys_comm.alarm_conf
+        sys_comm.g_sys_conf["alarm_conf"] = sys_comm.alarm_conf
 
         # 启动成功,打印系统信息
-        module_name     = g_sys_conf["module_name"]
-        platform        = g_sys_conf["platform"]
-        host_ip         = g_sys_conf["host_ip"]
-        max_log_files   = g_sys_conf["max_log_files"]
-        max_log_size    = g_sys_conf["max_log_size"]
-        log_lvl         = g_sys_conf["log_lvl"]
-        sp_id           = g_sys_conf["sp_id"]
+        module_name     = sys_comm.g_sys_conf["module_name"]
+        platform        = sys_comm.g_sys_conf["platform"]
+        host_ip         = sys_comm.g_sys_conf["host_ip"]
+        max_log_files   = sys_comm.g_sys_conf["max_log_files"]
+        max_log_size    = sys_comm.g_sys_conf["max_log_size"]
+        log_lvl         = sys_comm.g_sys_conf["log_lvl"]
+        sp_id           = sys_comm.g_sys_conf["sp_id"]
 
         print(f" ================ system init succeed !")
         print(f" ================ module         : {module_name}")
@@ -157,27 +159,65 @@ def sys_init():
         return -1
 
 
-# 轮循函数,定期执行一些任务
+# 轮循执行一些任务
 def run():
     # 轮循处理任务
     while True:
         time.sleep(1)
 
 
+def query_events_expired_range():
+    event_save_range = 90
+    try:
+        result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15)
+        if result and len(result) == 1:
+            row = result[0]
+            param_value = row.get("param_value")
+            if param_value is not None:   # 只要不是 None,就用数据库的值
+                event_save_range = param_value
+        else:
+            LOGWARN("found invalid event_save_range, use default 90")
+
+        return event_save_range
+
+    except Exception as e:
+        LOGERR(f"query event_save_range failed: {e}, use default 90")
+        return -1
+
+
+# 创建事件过期计划
+def create_events_expired_plan():
+    event_save_range = query_events_expired_range()
+    if not event_save_range:
+        return -1
+    g_las.g_alarm_plan_mgr.create_clean_expired_events_task(event_save_range)
+    return 0
+
+
 # 主线程
 def main_process():
     if not g_las.g_alarm_plan_mgr:
+        LOGERR(f"error: g_alarm_plan_mgr not init")
+        return -1
+    if not g_las.g_alarm_plan_disp:
+        LOGERR(f"error: g_alarm_plan_disp not init")
         return -1
 
     # 查询所有设备信息
-    db_req_que.put(DBRequest(sql=sqls.sql_query_all_dev_info,
+    db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_dev_info,
                              callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
 
     # 查询所有告警计划
-    db_req_que.put(DBRequest(sql=sqls.sql_query_all_alarm_plan,
+    db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_alarm_plan,
                              callback=cb_handle_query_all_alarm_plan_info))
 
-    # 轮循函数
+    # 创建任务:创建事件过期计划
+    iRet = create_events_expired_plan()
+    if iRet:
+        LOGERR(f"create_events_expired_plan failed, process termination")
+        return iRet
+
+    # 轮循任务
     run()
 
 
@@ -206,10 +246,9 @@ def main():
     mqtt_consumer = MQTTConsumerThread()
     mqtt_consumer.start()
 
-
-
     # 主线程
-    main_process()
-
+    if not main_process():
+        LOGERR(f"main_process error, process termination")
+        sys.exit(-1)
 
 main()

+ 50 - 16
core/alarm_plan.py

@@ -20,7 +20,11 @@ import core.alarm_plan_helper as helper
 from core.linkage_action import LinkageAction
 
 from db.db_process import (
-    db_req_que, DBRequest
+    db_req_que, DBRequest_Async
+)
+
+from db.db_process import (
+    db_execute_sync, db_execute_async
 )
 import db.db_sqls as sqls
 import mqtt.mqtt_send as mqtt_send
@@ -156,6 +160,20 @@ class EventAttr_TargetAbsence(EventAttr_Base):
         self.enter_ts_  = -1
         self.absence_time_ = -1
 
+# 事件属性 清理过期事件(无用)
+class EventAttr_CleanExpiredEvents(EventAttr_Base):
+    def __init__(self, event_type):
+        self.event_type_ = event_type
+        self.leave_ts_: int  = -1    # 离开时间(ms)
+        self.enter_ts_: int  = -1    # 进入时间(ms)
+        self.absence_time_: int = -1    # 消失时长(ms)
+        self.time_threshold_: int = 300    # 触发消失时间阈值(ms)
+        return
+
+    def reset(self):
+        self.leave_ts_  = -1
+        self.enter_ts_  = -1
+        self.absen
 
 # 事件属性表
 event_attr_map = {
@@ -168,6 +186,8 @@ event_attr_map = {
     EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value: EventAttr_NightToiletingFrequencyAbnormal,
     EventType.BATHROOM_STAY_FREQUENCY.value         : EventAttr_BathroomStayFrequency,
     EventType.TARGET_ABSENCE.value                  : EventAttr_TargetAbsence,
+
+    EventType.CLEAN_EXPIRED_EVENTS.value            : EventAttr_CleanExpiredEvents,
 }
 
 class Cron:
@@ -361,7 +381,7 @@ class AlarmPlan:
                 "tenant_id": self.tenant_id_,
                 "remark": json.dumps({}, ensure_ascii=False) 
             }
-            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+            db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
             # 通知
             linkage_action = {
@@ -439,7 +459,7 @@ class AlarmPlan:
                 "tenant_id": self.tenant_id_,
                 "remark": json.dumps({}, ensure_ascii=False) 
             }
-            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+            db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
             # 通知
             linkage_action = {
@@ -517,7 +537,7 @@ class AlarmPlan:
                 "tenant_id": self.tenant_id_,
                 "remark": json.dumps({}, ensure_ascii=False) 
             }
-            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+            db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
             # 通知
             linkage_action = {
@@ -558,7 +578,7 @@ class AlarmPlan:
                 "start_dt"  : start_dt,
                 "end_dt"  : end_dt
             }
-            db_req_que.put(DBRequest(
+            db_req_que.put(DBRequest_Async(
                 sql=sqls.sql_query_events_by_datetime,
                 params=params,
                 callback=self.cb_toileting_frequency,
@@ -594,7 +614,7 @@ class AlarmPlan:
                 "start_dt"  : start_dt,
                 "end_dt"  : end_dt
             }
-            db_req_que.put(DBRequest(
+            db_req_que.put(DBRequest_Async(
                 sql=sqls.sql_query_events_by_datetime,
                 params=params,
                 callback=self.cb_night_toileting_frequency,
@@ -630,7 +650,7 @@ class AlarmPlan:
                 "start_dt"  : start_dt,
                 "end_dt"  : end_dt
             }
-            db_req_que.put(DBRequest(
+            db_req_que.put(DBRequest_Async(
                 sql=sqls.sql_query_events_by_datetime,
                 params=params,
                 callback=self.cb_toileting_frequency_abnormal,
@@ -666,7 +686,7 @@ class AlarmPlan:
                 "start_dt"  : start_dt,
                 "end_dt"  : end_dt
             }
-            db_req_que.put(DBRequest(
+            db_req_que.put(DBRequest_Async(
                 sql=sqls.sql_query_events_by_datetime,
                 params=params,
                 callback=self.cb_night_toileting_frequency_abnormal,
@@ -701,7 +721,7 @@ class AlarmPlan:
                 "start_dt"  : start_dt,
                 "end_dt"  : end_dt
             }
-            db_req_que.put(DBRequest(
+            db_req_que.put(DBRequest_Async(
                 sql=sqls.sql_query_events_by_datetime,
                 params=params,
                 callback=self.cb_bathroom_stay_frequency,
@@ -776,7 +796,7 @@ class AlarmPlan:
                 "tenant_id": self.tenant_id_,
                 "remark": json.dumps({}, ensure_ascii=False) 
             }
-            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+            db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
             # 通知
             linkage_action = {
@@ -798,7 +818,22 @@ class AlarmPlan:
 
 
 
+    # 清理过期事件
+    def handle_clear_expired_events(self):
+        try:
+            db_execute_async(sqls.sql_delete_expired_events)
 
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+# ========== 一些回调 ==========
 
     # 如厕频次统计回调
     def cb_toileting_frequency(self, result, userdata):
@@ -844,7 +879,7 @@ class AlarmPlan:
                     "tenant_id": self.tenant_id_,
                     "remark": json.dumps({}, ensure_ascii=False) 
                 }
-                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+                db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
                 # 通知
                 linkage_action = {
@@ -911,7 +946,7 @@ class AlarmPlan:
                     "tenant_id": self.tenant_id_,
                     "remark": json.dumps({}, ensure_ascii=False) 
                 }
-                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+                db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
                 # 通知
                 linkage_action = {
@@ -981,7 +1016,7 @@ class AlarmPlan:
                     "tenant_id": self.tenant_id_,
                     "remark": json.dumps({}, ensure_ascii=False) 
                 }
-                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+                db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
                 # 通知
                 linkage_action = {
@@ -1051,7 +1086,7 @@ class AlarmPlan:
                     "tenant_id": self.tenant_id_,
                     "remark": json.dumps({}, ensure_ascii=False) 
                 }
-                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+                db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
                 # 通知
                 linkage_action = {
@@ -1074,7 +1109,6 @@ class AlarmPlan:
             return
 
 
-
     # 卫生间频次统计回调
     def cb_bathroom_stay_frequency(self, result, userdata):
         try:
@@ -1119,7 +1153,7 @@ class AlarmPlan:
                     "tenant_id": self.tenant_id_,
                     "remark": json.dumps({}, ensure_ascii=False) 
                 }
-                db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+                db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
                 # 通知
                 linkage_action = {

+ 2 - 0
core/alarm_plan_dispatcher.py

@@ -63,5 +63,7 @@ def start_alarm_plan_dispatcher():
         EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value : AlarmPlan.handle_night_toileting_frequency_abnormal,
         EventType.BATHROOM_STAY_FREQUENCY.value         : AlarmPlan.handle_bathroom_stay_frequency,
         EventType.TARGET_ABSENCE.value                  : AlarmPlan.handle_target_absence,
+
+        EventType.CLEAN_EXPIRED_EVENTS.value            : AlarmPlan.handle_clear_expired_events
     }
     g_las.g_alarm_plan_disp.start(handles)

+ 48 - 4
core/alarm_plan_manager.py

@@ -18,7 +18,7 @@ from core.linkage_action import LinkageAction
 import core.g_LAS as g_las
 
 from db.db_process import db_req_que
-from db.db_process import DBRequest
+from db.db_process import DBRequest_Async
 import db.db_sqls as sqls
 
 class AlarmPlanManager:
@@ -165,16 +165,60 @@ class AlarmPlanManager:
             time.sleep(5)  # 每 30 秒检查一次
 
 
+
+# --------------------------------------
+# 创建计划相关
+# --------------------------------------
+    # 清理过期事件任务
+    def create_clean_expired_events_task(self, save_range:int = 90):
+        linkage_action  = LinkageAction()
+        cron = {
+            "hour": 1,
+            "minute": 0
+        }
+
+        time_plan = TimePlan(
+            time_range  = [{"start_time": "00:00","end_time": "23:59"}],
+            start_date  = '2025-09-01',
+            stop_date   = '2099-12-31',
+            weekdays    = [1,2,3,4,5,6,7],
+            month_days  = []
+        )
+
+        plan_uuid   = 'clean_expired_events_task'
+        plan_name   = '清理过期事件'
+        event_type  = EventType.CLEAN_EXPIRED_EVENTS.value
+        alarm_plan = AlarmPlan(
+            plan_uuid   = plan_uuid,
+            name        = plan_name,
+            dev_id      = 'LAS',
+            dev_name    = '告警联动服务',
+            enable      = 1,
+            time_plan   = time_plan,
+            rect        = [],
+            event_type  = event_type,
+            threshold_time  = 300,
+            merge_time  = 30,
+            param       = {},
+            cron        = cron,
+            linkage_action=linkage_action,
+            tenant_id   = 0
+        )
+        # 塞入告警计划
+        self.push_cron(plan_uuid, alarm_plan)
+        LOGINFO(f"create task:clean_expired_events_task, {plan_uuid}, {plan_name}")
+
+
     def query_one_alarm_plan(self, plan_uuid: str):
         # 查询单个告警计划
         params = {
             "plan_uuid": plan_uuid
         }
-        db_req_que.put(DBRequest(sql=sqls.sql_query_one_alarm_plan, params=params,
+        db_req_que.put(DBRequest_Async(sql=sqls.sql_query_one_alarm_plan, params=params,
                                  callback=self.cb_query_one_alarm_plan))
 
 
-# 查询单条告警计划
+    # 查询单个告警计划回调
     def cb_query_one_alarm_plan(self, result, userdata):
         try:
             if not result:
@@ -195,7 +239,7 @@ class AlarmPlanManager:
                 event_type      = event_val
                 event_str: str  = row["event_str"]
                 event_desc: str = row["event_desc"]
-                tenant_id: int  = row.get("tenant_id", 0)
+                tenant_id: int  = row.get("tenant_id") or 0
 
                 start_date  = row["start_date"]
                 stop_date   = row["stop_date"]

+ 9 - 1
core/event_type.py

@@ -2,6 +2,7 @@ from enum import Enum
 
 # 事件类型
 class EventType(Enum):
+    # 设备事件
     STAY_DETECTION                  = 1 # 停留事件
     RETENTION_DETECTION             = 2 # 滞留事件
     TOILETING_DETECTION             = 3 # 如厕事件
@@ -12,6 +13,10 @@ class EventType(Enum):
     BATHROOM_STAY_FREQUENCY         = 8 # 卫生间频次统计
     TARGET_ABSENCE                  = 9 # 异常消失
 
+    # 平台事件(任务)
+    CLEAN_EXPIRED_EVENTS            = 9001  # 清理过期事件
+
+
 event_desc_map = {
     EventType.STAY_DETECTION.value              : "stay_detection",
     EventType.RETENTION_DETECTION.value         : "retention_detection",
@@ -21,5 +26,8 @@ event_desc_map = {
     EventType.TOILETING_FREQUENCY_ABNORMAL.value: "toileting_frequency_abnormal",
     EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value  : "night_toileting_frequency_abnormal",
     EventType.BATHROOM_STAY_FREQUENCY.value     : "bathroom_stay_frequency",
-    EventType.TARGET_ABSENCE.value              : "target_absence"
+    EventType.TARGET_ABSENCE.value              : "target_absence",
+
+    # 平台事件(任务)
+    EventType.CLEAN_EXPIRED_EVENTS.value        : "clear_expired_events"
 }

+ 127 - 30
db/db_process.py

@@ -13,10 +13,9 @@ from concurrent.futures import ThreadPoolExecutor
 import json
 import shutil
 
-import common.sys_comm
+import common.sys_comm as sys_comm
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 from common.sys_comm import get_bj_time_ms
-from common.sys_comm import g_sys_conf, g_sys_conf_mtx
 
 
 # ssh配置
@@ -37,16 +36,18 @@ db_config = {
     "database": "lnxx_dev"
 }
 
-
+# ===================== 全局对象 =====================
 # 请求队列
 db_req_que = queue.Queue()
 # 记录 SSH 隧道和数据库连接
 ssh_server = None
 # 连接池对象
 db_pool = None
+# 数据库线程是否运行标记
+db_worker_running = False 
 
 # 数据库请求类
-class DBRequest:
+class DBRequest_Async:
     def __init__(self, sql:str, params=None, callback=None, userdata=None):
         self.sql = sql
         self.params = params if params else ()
@@ -54,21 +55,46 @@ class DBRequest:
         self.userdata = userdata
 
 
+class DBRequest_Sync(DBRequest_Async):
+    def __init__(self, sql:str, params=None, callback=None, userdata=None):
+        super().__init__(sql, params, callback, userdata)
+        self._done_event = threading.Event()
+        self._result = None
+        self._exception = None
+
+    def wait(self, timeout=None):
+        """阻塞等待执行完成"""
+        finished = self._done_event.wait(timeout)
+        if not finished:
+            raise TimeoutError("DBRequest_Sync timed out")
+        if self._exception:
+            raise self._exception
+        return self._result
+
+    def set_result(self, result):
+        self._result = result
+        self._done_event.set()
+
+    def set_exception(self, e):
+        self._exception = e
+        self._done_event.set()
+
+
 # ========== 初始化配置 ==========
 def db_pro_init():
     global ssh_conf, db_config
-    with g_sys_conf_mtx:
+    with sys_comm.g_sys_conf_mtx:
         ssh_conf = {
-            "ssh_host": g_sys_conf["ssh_host"],
-            "ssh_port": g_sys_conf["ssh_port"],
-            "ssh_user": g_sys_conf["ssh_username"],
-            "ssh_pwd": g_sys_conf["ssh_password"],
+            "ssh_host": sys_comm.g_sys_conf["ssh_host"],
+            "ssh_port": sys_comm.g_sys_conf["ssh_port"],
+            "ssh_user": sys_comm.g_sys_conf["ssh_username"],
+            "ssh_pwd": sys_comm.g_sys_conf["ssh_password"],
         }
 
         db_config = {
-            "host": g_sys_conf["db_host"],
-            "user": g_sys_conf["db_username"],
-            "password": g_sys_conf["db_password"],
+            "host": sys_comm.g_sys_conf["db_host"],
+            "user": sys_comm.g_sys_conf["db_username"],
+            "password": sys_comm.g_sys_conf["db_password"],
             "database": "lnxx_dev"
         }
 
@@ -90,7 +116,7 @@ def initialize_ssh_connection():
 # ========== 初始化连接池 ==========
 def initialize_connection_pool():
     global db_pool, ssh_server
-    if g_sys_conf["platform"] == 0:
+    if sys_comm.g_sys_conf["platform"] == 0:
         initialize_ssh_connection()
         port = ssh_server.local_bind_port
         host = "127.0.0.1"
@@ -116,7 +142,7 @@ def initialize_connection_pool():
 
 
 # ========== 执行数据库请求 ==========
-def handle_db_request(db_request: DBRequest):
+def handle_db_request(db_request):
     conn = None
     try:
         conn = db_pool.connection()
@@ -129,52 +155,123 @@ def handle_db_request(db_request: DBRequest):
                 result = {"lastrowid": cursor.lastrowid}
             else:
                 result = {"rowcount": cursor.rowcount}
+
+            # 执行回调
             if db_request.callback:
-                db_request.callback(result, db_request.userdata)
+                try:
+                    db_request.callback(result, db_request.userdata)
+                except Exception as e:
+                    LOGERR(f"[DB ERROR] 回调执行失败: {e}")
+
+            if isinstance(db_request, DBRequest_Sync):
+                db_request.set_result(result)
+
         conn.commit()
+
     except Exception as e:
         LOGERR(f"[DB ERROR] SQL执行失败: {e}")
+        if isinstance(db_request, DBRequest_Sync):
+            db_request.set_exception(e)
     finally:
         if conn:
             conn.close()
-        db_req_que.task_done()
 
 
+
+# ========== 封装接口 ==========
+# 同步执行
+def db_execute_sync(sql: str, params=None, callback=None, userdata=None, timeout=5):
+    """
+    如果传了 callback,会先执行 callback,再返回结果。
+    如果不传 callback,直接返回查询结果。
+    若timeout传入None会无限期等待(不建议)
+    """
+    if not db_worker_running:
+        LOGERR("DB worker is not running, cannot execute sync request")
+        return -1
+    req = DBRequest_Sync(sql=sql, params=params, callback=callback, userdata=userdata)
+    db_req_que.put(req)
+    return req.wait(timeout=timeout)
+
+# 异步执行
+def db_execute_async(sql: str, params=None, callback=None, userdata=None):
+    """
+    callback: 可选,数据库操作完成后调用
+    userdata: 可选,回调附带的用户数据
+    """
+    if not db_worker_running:
+        LOGERR("DB worker is not running, cannot execute async request")
+        return None
+    req = DBRequest_Async(sql=sql, params=params, callback=callback, userdata=userdata)
+    db_req_que.put(req)
+    return req
+
 # ========== 主数据库线程 ==========
 def db_process():
+    global db_worker_running
+    db_worker_running = True
+
     db_pro_init()
     initialize_connection_pool()
 
-    executor = ThreadPoolExecutor(max_workers=8)  # 限制线程并发数
-    while True:
-        try:
-            db_request: DBRequest = db_req_que.get()
+    # 单线程执行器
+    sync_executor = ThreadPoolExecutor(max_workers=1)
+    # 多线程执行器
+    async_executor = ThreadPoolExecutor(max_workers=8)  # 限制线程并发数
+
+    try:
+        while True:
+            db_request = db_req_que.get()
             if db_request is None:
                 break
-            executor.submit(handle_db_request, db_request)
-        except Exception as e:
-            LOGERR(f"[DB Thread Error] {e}")
-        time.sleep(0.01)
 
+            try:
+                if isinstance(db_request, DBRequest_Sync):
+                    # 同步操作
+                    handle_db_request(db_request)
+                    # sync_executor.submit(handle_db_request, db_request)
+                else:
+                    # 异步操作
+                    async_executor.submit(handle_db_request, db_request)
+
+            except Exception as e:
+                LOGERR(f"[DB Thread Error] {e}")
+            finally:
+                db_req_que.task_done()
+
+    finally:
+        # 收到退出信号后,关闭执行器
+        sync_executor.shutdown(wait=True)
+        async_executor.shutdown(wait=True)
+        db_worker_running = False
+        LOGERR("DB process exit gracefully")
 
+# 创建数据库线程
 def create_db_process():
-    # 启动数据库线程
-    return threading.Thread(target=db_process, daemon=True)
+    global db_thread
+    db_thread = threading.Thread(target=db_process, daemon=True)
+    return db_thread
 
+# 停止数据库线程
+def stop_db_process():
+    if db_worker_running:
+        db_req_que.put(None)
+        db_thread.join()
+        LOGINFO("DB worker stopped")
+
+# ========== 示例 ==========
 
 # 处理数据库返回的结果
-def handle_device_data(results, userdata):
+def cb_handle_device_data(results, userdata):
     LOGDBG("Received results: {results}")
 
-
 # 示例请求生成器
 def request_generator():
     while True:
         sql_query = "SELECT * FROM dev_info"  # 示例查询
-        db_req_que.put(DBRequest(sql=sql_query, callback=handle_device_data))
+        db_req_que.put(DBRequest_Async(sql=sql_query, callback=cb_handle_device_data))
         time.sleep(1)  # 每秒生成一个请求
 
-
 def test_main():
     # 启动数据库线程
     db_thread = threading.Thread(target=db_process, daemon=True)

+ 15 - 1
db/db_sqls.py

@@ -133,7 +133,6 @@ WHERE
 """
 
 
-
 # 插入events
 sql_insert_events = """
 INSERT INTO events (
@@ -163,6 +162,7 @@ VALUES (
 );
 """
 
+
 # 查询events
 sql_query_events_by_datetime = """
 SELECT *
@@ -175,4 +175,18 @@ WHERE
 """
 
 
+# 查询告警记录保存周期
+sql_event_save_range = """
+SELECT * FROM lnxx_dev.tbl_parameter
+WHERE
+	parameter_id = 1963779012011212801
+"""
 
+
+# 删除 events 表中过期数据,只保留最近 {save_days} 天
+sql_delete_expired_events = """
+DELETE FROM lnxx_dev.events
+WHERE
+    create_time < DATE_SUB(NOW(), INTERVAL %(save_days)s DAY)
+    AND is_deleted = 0;
+"""

+ 1 - 1
device/dev_mng.py

@@ -12,7 +12,7 @@ from common.sys_comm import (
     get_utc_time_ms, get_utc_time_s,
     get_bj_time_ms, get_bj_time_s,
     utc_to_bj_s)
-from common.sys_comm import g_sys_conf, g_sys_conf_mtx
+
 
 
 # 跟踪区域类

+ 1 - 2
mqtt/mqtt_recv.py

@@ -12,11 +12,10 @@ import common.sys_comm as sys_comm
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 from common.sys_comm import get_tracker_targets, get_utc_time_ms, get_utc_time_s
 from common.sys_comm import POSE_E, DEV_EC
-from common.sys_comm import g_sys_conf, g_sys_conf_mtx
 
 import db.db_process as db_process
 from db.db_process import db_req_que
-from db.db_process import DBRequest
+from db.db_process import DBRequest_Async
 
 from mqtt.mqtt_topics import Topic_Pattern
 import mqtt.mqtt_send as mqtt_send

+ 2 - 3
mqtt/mqtt_send.py

@@ -12,7 +12,6 @@ mqtt_send_que = queue.Queue()   # 发送队列
 import common.sys_comm as sys_comm
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 from common.sys_comm import MODEL_E, POSE_CLASS_E, POSE_E
-from common.sys_comm import g_sys_conf, g_sys_conf_mtx
 from mqtt.mqtt_topics import TOPICS
 
 # 消息类型
@@ -37,8 +36,8 @@ def send_msg(topic:str, format_json:dict, qos:int=0):
         model = parts[1]
 
         if model != "dev":
-            with g_sys_conf_mtx:
-                format_json["sp_id"] = g_sys_conf["sp_id"]
+            with sys_comm.g_sys_conf_mtx:
+                format_json["sp_id"] = sys_comm.g_sys_conf["sp_id"]
         content:str = json.dumps(format_json)
 
         mqtt_msg = {