Bläddra i källkod

支持设备保活

nifangxu 1 månad sedan
förälder
incheckning
651fd81286
3 ändrade filer med 44 tillägg och 5 borttagningar
  1. 5 5
      LAS.py
  2. 38 0
      device/dev_mng.py
  3. 1 0
      mqtt/mqtt_recv.py

+ 5 - 5
LAS.py

@@ -195,12 +195,15 @@ def main():
 
     # 初始化 dev_mng
     init_dev_mng()
+    g_Dev.g_dev_mgr.start()
 
     # 初始化LAS
     init_alarm_plan_mgr()
     init_alarm_plan_disp()
+    start_alarm_plan_dispatcher()   # 事件分发器
+    start_alarm_plan_mgr()  # 告警计划管理器
 
-    # 数据库处理线程
+    # 数据库线程
     db_process.create_db_process().start()
 
     # MQTT 消息线程
@@ -209,10 +212,7 @@ def main():
     mqtt_consumer = MQTTConsumerThread()
     mqtt_consumer.start()
 
-    # 事件分发器
-    start_alarm_plan_dispatcher()
-    # 告警计划管理器
-    start_alarm_plan_mgr()
+
 
     # 主线程
     main_process()

+ 38 - 0
device/dev_mng.py

@@ -5,6 +5,7 @@ import json
 import traceback
 from collections import deque
 from typing import Optional, List
+import time
 
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 from common.sys_comm import (
@@ -176,6 +177,11 @@ class DeviceManager():
         self.g_dev_map_lock = threading.Lock()
         self.g_dev_map = {}  # <dev_id: str, device: Device>
 
+        self.HEARTBEAT_TIMEOUT  = 3600 * 24 # 超长保活时间
+
+        self.running_ = False
+        self.thread_ = None
+
     def push_dev_map(self, dev_id:str, dev_instance:Device):
         with self.g_dev_map_lock:
             self.g_dev_map[dev_id] = dev_instance
@@ -199,6 +205,38 @@ class DeviceManager():
         with self.g_dev_map_lock:
             return list(self.g_dev_map.values())
 
+
+    # ------------------- 心跳检测相关 -------------------
+    def _heartbeat_monitor(self):
+        """后台线程:定时检查设备是否失活"""
+        while self.running_:
+            time.sleep(30)
+            now = int(time.time())
+            with self.g_dev_map_lock:
+                for dev_id, device in list(self.g_dev_map.items()):
+                    last_keepalive = device.get_keepalive()
+                    if now - last_keepalive > self.HEARTBEAT_TIMEOUT:
+                        
+                        # device.online_ = 0    # 标记掉线
+                        
+                        del self.g_dev_map[dev_id]  # 直接销毁实例
+                        LOGINFO(f"[WARN] Device {dev_id} heartbeat timeout destroy")
+
+    def start(self):
+        if not self.running_:
+            self.running_ = True
+            self.thread_ = threading.Thread(target=self._heartbeat_monitor, daemon=True)
+            self.thread_.start()
+            LOGINFO("[INFO] DeviceManager heartbeat task start")
+
+    def stop(self):
+        if self.running_:
+            self.running_ = False
+            if self.thread_:
+                self.thread_.join()
+            LOGINFO("[INFO] DeviceManager heartbeat task stop")
+
+
     # 回调函数,处理查询结果:查询所有的设备信息
     def cb_handle_query_all_dev_info(self, result, userdata):
         try:

+ 1 - 0
mqtt/mqtt_recv.py

@@ -89,6 +89,7 @@ def deal_dsp_data(msg:mqtt.MQTTMessage):
                 "target_point": tracker_targets
             }
             device.put_rtd_unit(rtd_unit)
+            device.update_keepalive(timestamp)
 
     except json.JSONDecodeError as e:
         tb_info = traceback.extract_tb(e.__traceback__)