Ver código fonte

优化mqtt:1. 非阻塞发送,支持并发;2. 有序发送;3. 断线期间不丢消息不乱序

nifangxu 1 mês atrás
pai
commit
09c15dff2f
3 arquivos alterados com 174 adições e 22 exclusões
  1. 10 8
      LAS.py
  2. 162 12
      mqtt/mqtt_process.py
  3. 2 2
      mqtt/mqtt_send.py

+ 10 - 8
LAS.py

@@ -14,8 +14,10 @@ from common.sys_comm import (
     get_utc_time_ms
 )
 
-from mqtt import mqtt_process
-from mqtt.mqtt_process import MQTTClientThread, MQTTConsumerThread, mqtt_client, mqtt_consumer
+from mqtt import mqtt_process as mp
+from mqtt.mqtt_process import (
+    MQTTClientThread, RobustMQTTClient, MQTTConsumerThread, g_mqtt_client, g_mqtt_consumer
+)
 
 import db.db_process as db_process
 from db.db_process import db_req_que
@@ -105,11 +107,11 @@ def sys_init():
                 sys_comm.g_sys_conf["server_ip"]   = str(config["windows"]["server_ip"])
                 sys_comm.g_sys_conf["ssh_host"]    = str(config["windows"]["ssh_host"])
                 sys_comm.g_sys_conf["ssh_port"]    = int(config["windows"]["ssh_port"])
-                mqtt_process.MQTT_BROKER = sys_comm.g_sys_conf["server_ip"]
+                mp.MQTT_BROKER = sys_comm.g_sys_conf["server_ip"]
             # linux 服务器
             elif sys_comm.g_sys_conf["platform"] == 1:
                 sys_comm.g_sys_conf["host_ip"]     = str(config["linux"]["host_ip"])
-                mqtt_process.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"]
+                mp.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"]
 
         sys_comm.g_sys_conf["sp_id"] = int(get_utc_time_ms())
 
@@ -242,10 +244,10 @@ def main():
     db_thread.start()
 
     # MQTT 消息线程
-    mqtt_client = MQTTClientThread()
-    mqtt_client.start()
-    mqtt_consumer = MQTTConsumerThread()
-    mqtt_consumer.start()
+    mp.g_mqtt_client = RobustMQTTClient()
+    mp.g_mqtt_client.start()
+    mp.g_mqtt_consumer = MQTTConsumerThread()
+    mp.g_mqtt_consumer.start()
 
     LOGDBG(f" ================ LAS start success ...")
 

+ 162 - 12
mqtt/mqtt_process.py

@@ -17,8 +17,8 @@ from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 from mqtt.mqtt_topics import TOPICS, Topic_Pattern
 
 import mqtt.mqtt_recv as mqtt_recv
-import mqtt.mqtt_send as mqtt_send
-from mqtt.mqtt_send import mqtt_send_que # 发送队列
+
+mqtt_send_que = queue.Queue()   # 发送队列
 # 格式如下
 '''
 {
@@ -81,7 +81,7 @@ class MQTTConsumerThread(threading.Thread):
 
 
 
-def on_message(self, client, userdata, msg):
+def on_message(client, userdata, msg):
     if not shutting_down:
         mqtt_queue.put((client, userdata, msg))  # 放入队列,由消费者线程处理
 
@@ -91,17 +91,18 @@ def on_message(self, client, userdata, msg):
 # ================================
 class MQTTClientThread(threading.Thread):
     def __init__(self,):
-        threading.Thread.__init__(self, name= "MQTTClientThread")
+        super().__init__(name= "MQTTClientThread")
         self.client:mqtt.Client = mqtt.Client()
         self.publish_status = {}
         self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
         self.client.on_connect = self.on_connect
-        self.client.on_message = on_message
         self.client.on_disconnect = self.on_disconnect
         self.client.on_publish = self.on_publish
+        self.client.on_message = on_message
 
         self.running = True
         self.reconnect_delay    = 5     # 重连间隔 秒
+        self.executor = ThreadPoolExecutor(max_workers=4)   # 异步发送线程池
 
 
 # ================================
@@ -132,7 +133,25 @@ class MQTTClientThread(threading.Thread):
 
     def on_publish(self, client, userdata, mid):
         self.publish_status[mid] = "success"
+        LOGINFO(f"Message {mid} published successfully")
 
+    # 异步发送消息
+    def async_publish(self, mqtt_msg: dict):
+        try:
+            topic = mqtt_msg["topic"]
+            msg = mqtt_msg["msg"]
+            qos = mqtt_msg.get("qos", 0)
+            info = self.client.publish(topic, msg, qos=qos)
+            if info.rc == 0:
+                self.publish_status[info.mid] = "pending"
+                if qos == 0:
+                    self.publish_status[info.mid] = "success"
+            else:
+                LOGERR(f"MQTT publish failed (rc={info.rc}), topic: {topic}")
+                mqtt_send_que.put(mqtt_msg)  # 重试
+        except Exception as e:
+            LOGERR(f"MQTT send error: {e}, topic: {topic}")
+            mqtt_send_que.put(mqtt_msg)  # 重试
 
     def send_msg_to_mqtt(self):
         while self.running and not shutting_down:
@@ -164,19 +183,150 @@ class MQTTClientThread(threading.Thread):
             LOGINFO("Connecting to MQTT broker...")
             self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
             self.client.loop_start()
-            self.send_msg_to_mqtt()
+
+            while self.running and not shutting_down:
+                try:
+                    mqtt_msg = mqtt_send_que.get(timeout=0.1)
+                    # 使用线程池异步发送
+                    self.executor.submit(self.async_publish, mqtt_msg)
+                except Empty:
+                    continue
+
+
+        except Exception as e:
+            LOGERR(f"MQTT thread encountered an error: {e}")
+        finally:
+            self.running = False
+            self.executor.shutdown(wait=True)
+            self.client.loop_stop()
+            self.client.disconnect()
+            shutting_down = True
+            LOGINFO("MQTT thread exited")
+
+
+# 可靠顺序发送、支持断线重连、安全退出
+class RobustMQTTClient(threading.Thread):
+    def __init__(self, max_workers=4):
+        super().__init__(name="RobustMQTTClient")
+        self.client: mqtt.Client = mqtt.Client()
+        self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
+        self.client.on_connect = self.on_connect
+        self.client.on_disconnect = self.on_disconnect
+        self.client.on_publish = self.on_publish
+        self.client.on_message = on_message
+
+        self.running = True
+        self.reconnect_delay = 5  # 重连间隔
+        self.publish_status = {}  # mid -> 状态
+        self.send_queue = Queue()  # 内部可靠顺序队列
+        self.executor = ThreadPoolExecutor(max_workers=max_workers)
+        self.connected = threading.Event()  # 用于标记 MQTT 是否已连接
+
+    # =========================
+    # MQTT 回调
+    # =========================
+    def on_connect(self, client, userdata, flags, rc):
+        if rc == 0:
+            LOGINFO("MQTT connected successfully!")
+            client.subscribe(TOPICS.dev_tracker_targets)
+            client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
+        else:
+            LOGERR(f"MQTT failed to connect, rc={rc}")
+
+    def on_disconnect(self, client, userdata, rc):
+        LOGERR(f"MQTT disconnected (rc={rc})")
+        self.connected.clear()
+        if shutting_down:
+            return
+        LOGERR(f"MQTT disconnected (rc={rc})")
+        while not shutting_down and self.running:
+            try:
+                LOGINFO("Trying to reconnect to MQTT broker...")
+                client.reconnect()
+                LOGINFO("MQTT reconnected successfully!")
+                self.connected.set()
+                break
+            except Exception as e:
+                LOGERR(f"Reconnect failed: {e}, retry in {self.reconnect_delay}s")
+                time.sleep(self.reconnect_delay)
+
+    def on_publish(self, client, userdata, mid):
+        self.publish_status[mid] = "success"
+        LOGINFO(f"Message {mid} published successfully")
+
+
+    # =========================
+    # 安全发送单条消息
+    # =========================
+    def _send_message(self, mqtt_msg: dict):
+        while self.running and not shutting_down:
+            try:
+                # 等待连接成功再发送
+                if not self.connected.wait(timeout=1):
+                    continue
+
+                topic = mqtt_msg["topic"]
+                payload = mqtt_msg["msg"]
+                qos = mqtt_msg.get("qos", 0)
+
+                info = self.client.publish(topic, payload, qos=qos)
+                if info.rc != 0:
+                    LOGERR(f"MQTT publish failed (rc={info.rc}), topic={topic}, retrying")
+                    time.sleep(1)
+                    continue
+
+                self.publish_status[info.mid] = "pending"
+                while self.publish_status.get(info.mid) != "success" and not shutting_down:
+                    time.sleep(0.01)
+                break
+
+            except Exception as e:
+                LOGERR(f"MQTT send error: {e}, topic={mqtt_msg.get('topic')}, retrying")
+                time.sleep(1)
+
+
+    # =========================
+    # 外部接口:发送消息
+    # =========================
+    def send_msg(self, mqtt_msg: dict):
+        self.send_queue.put(mqtt_msg)  # 放入内部可靠队列
+
+    # =========================
+    # 队列处理循环
+    # =========================
+    def _process_queue(self):
+        while self.running and not shutting_down:
+            try:
+                mqtt_msg = self.send_queue.get(timeout=0.1)
+                # 提交线程池异步发送
+                self.executor.submit(self._send_message, mqtt_msg)
+            except Empty:
+                continue
+
+    # =========================
+    # 主线程
+    # =========================
+    def run(self):
+        global shutting_down
+        try:
+            LOGINFO("Connecting to MQTT broker...")
+            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
+            self.client.loop_start()
+            self._process_queue()
 
         except Exception as e:
             LOGERR(f"MQTT thread encountered an error: {e}")
         finally:
             self.running = False
+            self.executor.shutdown(wait=True)
             self.client.loop_stop()
             self.client.disconnect()
             shutting_down = True
+            LOGINFO("MQTT client exited")
 
 
-mqtt_client: MQTTClientThread = None
-mqtt_consumer: MQTTConsumerThread = None
+g_mqtt_client: RobustMQTTClient = None
+g_mqtt_consumer: MQTTConsumerThread = None
 
 # ================================
 # 退出信号处理
@@ -185,10 +335,10 @@ def signal_handler(sig, frame):
     global shutting_down
     LOGINFO("Exiting... shutting down MQTT and thread pool")
     shutting_down = True
-    mqtt_client.running = False
-    mqtt_client.client.loop_stop()
-    mqtt_client.client.disconnect()
-    mqtt_consumer.running = False
+    g_mqtt_client.running = False
+    g_mqtt_client.client.loop_stop()
+    g_mqtt_client.client.disconnect()
+    g_mqtt_consumer.running = False
     executor.shutdown(wait=True)
     sys.exit(0)
 

+ 2 - 2
mqtt/mqtt_send.py

@@ -7,12 +7,12 @@ import json
 from enum import Enum
 import numpy
 
-mqtt_send_que = queue.Queue()   # 发送队列
 
 import common.sys_comm as sys_comm
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 from common.sys_comm import POSE_CLASS_E, POSE_E
 from mqtt.mqtt_topics import TOPICS
+import mqtt.mqtt_process as mp
 
 # 消息类型
 class MSG_TYPE(Enum):
@@ -45,7 +45,7 @@ def send_msg(topic:str, format_json:dict, qos:int=0):
             "msg":      content,
             "qos":      qos
         }
-        mqtt_send_que.put(mqtt_msg)
+        mp.g_mqtt_client.send_msg(mqtt_msg)
 
     except Exception as e:
         LOGERR(f"send_msg error: {e}")