Browse Source

优化线程管理

nifangxu 1 month ago
parent
commit
04fef2869f
6 changed files with 27 additions and 14 deletions
  1. 2 1
      LAS.py
  2. 5 1
      core/alarm_plan_dispatcher.py
  3. 12 3
      core/alarm_plan_manager.py
  4. 2 6
      db/db_process.py
  5. 4 1
      device/dev_mng.py
  6. 2 2
      mqtt/mqtt_process.py

+ 2 - 1
LAS.py

@@ -238,7 +238,8 @@ def main():
     start_alarm_plan_mgr()  # 告警计划管理器
 
     # 数据库线程
-    db_process.create_db_process().start()
+    db_thread = db_process.create_db_process()
+    db_thread.start()
 
     # MQTT 消息线程
     mqtt_client = MQTTClientThread()

+ 5 - 1
core/alarm_plan_dispatcher.py

@@ -23,7 +23,11 @@ class AlarmPlanDispatcher:
             q = queue.Queue()
             self.queues[event_type] = q
 
-            t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
+            t = threading.Thread(
+                target=self.worker,
+                args=(event_type, q, handler),
+                daemon=True,
+                name=f"APDispatcherThread-{event_type}")
             self.threads[event_type] = t
             t.start()
 

+ 12 - 3
core/alarm_plan_manager.py

@@ -96,15 +96,24 @@ class AlarmPlanManager:
             return removed
 
 
+    # 启动调度器
     def start_scheduler(self, interval=5):
         if self.running:
             return
         self.running = True
-        self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
+        # plan 定时调度器
+        self.thread = threading.Thread(
+            target=self._scheduler,
+            args=(interval,),
+            daemon=True,
+            name="APSchedulerThread")
         self.thread.start()
 
-        # 启动 cron 定时调度线程
-        self.cron_thread = threading.Thread(target=self._cron_scheduler, daemon=True)
+        # cron 定时调度线程
+        self.cron_thread = threading.Thread(
+            target=self._cron_scheduler,
+            daemon=True,
+            name="CronSchedulerThread")
         self.cron_thread.start()
 
     def stop_scheduler(self):

+ 2 - 6
db/db_process.py

@@ -216,10 +216,8 @@ def db_process():
     db_pro_init()
     initialize_connection_pool()
 
-    # 单线程执行器
-    sync_executor = ThreadPoolExecutor(max_workers=1)
     # 多线程执行器
-    async_executor = ThreadPoolExecutor(max_workers=8)  # 限制线程并发数
+    async_executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="AsyncDBWorker")
 
     try:
         while True:
@@ -231,7 +229,6 @@ def db_process():
                 if isinstance(db_request, DBRequest_Sync):
                     # 同步操作
                     handle_db_request(db_request)
-                    # sync_executor.submit(handle_db_request, db_request)
                 else:
                     # 异步操作
                     async_executor.submit(handle_db_request, db_request)
@@ -243,7 +240,6 @@ def db_process():
 
     finally:
         # 收到退出信号后,关闭执行器
-        sync_executor.shutdown(wait=True)
         async_executor.shutdown(wait=True)
         db_worker_running = False
         LOGERR("DB process exit gracefully")
@@ -251,7 +247,7 @@ def db_process():
 # 创建数据库线程
 def create_db_process():
     global db_thread
-    db_thread = threading.Thread(target=db_process, daemon=True)
+    db_thread = threading.Thread(target=db_process, daemon=True, name="DBWorkerThread")
     return db_thread
 
 # 停止数据库线程

+ 4 - 1
device/dev_mng.py

@@ -223,7 +223,10 @@ class DeviceManager():
     def start(self):
         if not self.running_:
             self.running_ = True
-            self.thread_ = threading.Thread(target=self._heartbeat_monitor, daemon=True)
+            self.thread_ = threading.Thread(
+                target=self._heartbeat_monitor,
+                daemon=True,
+                name="DevMgrThread")
             self.thread_.start()
             LOGINFO("[INFO] DeviceManager heartbeat task start")
 

+ 2 - 2
mqtt/mqtt_process.py

@@ -57,7 +57,7 @@ def check_topic(pattern:str, topic:str) -> bool:
 # ================================
 class MQTTConsumerThread(threading.Thread):
     def __init__(self):
-        super().__init__()
+        super().__init__(name= "MQTTConsumerThread")
         self.running = True
 
     def run(self):
@@ -103,7 +103,7 @@ def on_message(client, userdata, msg):
 # ================================
 class MQTTClientThread(threading.Thread):
     def __init__(self,):
-        threading.Thread.__init__(self)
+        threading.Thread.__init__(self, name= "MQTTClientThread")
         self.client:mqtt.Client = mqtt.Client()
         self.publish_status = {}
         self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)