فهرست منبع

优化mqtt,1.增加断线重连回调;2.消息重发机制;3.安全退出

nifangxu 1 ماه پیش
والد
کامیت
804d097ba7
2فایلهای تغییر یافته به همراه41 افزوده شده و 21 حذف شده
  1. 2 0
      LAS.py
  2. 39 21
      mqtt/mqtt_process.py

+ 2 - 0
LAS.py

@@ -247,6 +247,8 @@ def main():
     mqtt_consumer = MQTTConsumerThread()
     mqtt_consumer.start()
 
+    LOGDBG(f" ================ LAS start success ...")
+
     # 主线程
     if not main_process():
         LOGERR(f"main_process error, process termination")

+ 39 - 21
mqtt/mqtt_process.py

@@ -81,23 +81,11 @@ class MQTTConsumerThread(threading.Thread):
 
 
 
-# ================================
-# MQTT 回调
-# ================================
-def on_connect(client:mqtt.Client, userdata, flags, rc):
-    if rc == 0:
-        LOGINFO("MQTT Connected successfully!")
-        client.subscribe(TOPICS.dev_tracker_targets)
-        # client.subscribe(TOPICS.das_all)
-        # client.subscribe(TOPICS.mps_all)
-        client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
-    else:
-        LOGERR(f"MQTT failed to connect, return code {rc}")
-
-def on_message(client, userdata, msg):
+def on_message(self, client, userdata, msg):
     if not shutting_down:
         mqtt_queue.put((client, userdata, msg))  # 放入队列,由消费者线程处理
 
+
 # ================================
 # MQTT 线程类
 # ================================
@@ -107,24 +95,52 @@ class MQTTClientThread(threading.Thread):
         self.client:mqtt.Client = mqtt.Client()
         self.publish_status = {}
         self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
-        self.client.on_connect = on_connect
+        self.client.on_connect = self.on_connect
         self.client.on_message = on_message
+        self.client.on_disconnect = self.on_disconnect
         self.client.on_publish = self.on_publish
 
         self.running = True
+        self.reconnect_delay    = 5     # 重连间隔 秒
+
+
+# ================================
+# MQTT 回调
+# ================================
+    def on_connect(self, client:mqtt.Client, userdata, flags, rc):
+        if rc == 0:
+            LOGINFO("MQTT Connected successfully!")
+            client.subscribe(TOPICS.dev_tracker_targets)
+            client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
+        else:
+            LOGERR(f"MQTT failed to connect, return code {rc}")
+
+    def on_disconnect(self, client, userdata, rc):
+        if shutting_down:
+            return
+        LOGERR(f"MQTT disconnected (rc={rc})")
+        # 循环尝试重连
+        while not shutting_down and self.running:
+            try:
+                LOGINFO("Trying to reconnect to MQTT broker...")
+                client.reconnect()
+                LOGINFO("MQTT reconnected successfully!")
+                break
+            except Exception as e:
+                LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s")
+                time.sleep(self.reconnect_delay)
 
     def on_publish(self, client, userdata, mid):
         self.publish_status[mid] = "success"
 
+
     def send_msg_to_mqtt(self):
-        while True:
-            time.sleep(0.01)
+        while self.running and not shutting_down:
             try:
                 mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
-            except Exception:
-                if shutting_down:
-                    break
+            except Empty:
                 continue
+
             try:
                 topic = mqtt_msg["topic"]
                 msg = mqtt_msg["msg"]
@@ -136,15 +152,17 @@ class MQTTClientThread(threading.Thread):
                         self.publish_status[info.mid] = "success"
                 else:
                     LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
+                    mqtt_send_que.put(mqtt_msg) # 重试
 
             except Exception as e:
                 LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
+                mqtt_send_que.put(mqtt_msg) # 重试
 
     def run(self):
         global shutting_down
         try:
-            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
             LOGINFO("Connecting to MQTT broker...")
+            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
             self.client.loop_start()
             self.send_msg_to_mqtt()