|  | @@ -17,8 +17,8 @@ from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
 | 
	
		
			
				|  |  |  from mqtt.mqtt_topics import TOPICS, Topic_Pattern
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import mqtt.mqtt_recv as mqtt_recv
 | 
	
		
			
				|  |  | -import mqtt.mqtt_send as mqtt_send
 | 
	
		
			
				|  |  | -from mqtt.mqtt_send import mqtt_send_que # 发送队列
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +mqtt_send_que = queue.Queue()   # 发送队列
 | 
	
		
			
				|  |  |  # 格式如下
 | 
	
		
			
				|  |  |  '''
 | 
	
		
			
				|  |  |  {
 | 
	
	
		
			
				|  | @@ -81,50 +81,85 @@ class MQTTConsumerThread(threading.Thread):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -# ================================
 | 
	
		
			
				|  |  | -# MQTT 回调
 | 
	
		
			
				|  |  | -# ================================
 | 
	
		
			
				|  |  | -def on_connect(client:mqtt.Client, userdata, flags, rc):
 | 
	
		
			
				|  |  | -    if rc == 0:
 | 
	
		
			
				|  |  | -        LOGINFO("MQTT Connected successfully!")
 | 
	
		
			
				|  |  | -        client.subscribe(TOPICS.dev_tracker_targets)
 | 
	
		
			
				|  |  | -        # client.subscribe(TOPICS.das_all)
 | 
	
		
			
				|  |  | -        # client.subscribe(TOPICS.mps_all)
 | 
	
		
			
				|  |  | -        client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
 | 
	
		
			
				|  |  | -    else:
 | 
	
		
			
				|  |  | -        LOGERR(f"MQTT failed to connect, return code {rc}")
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  def on_message(client, userdata, msg):
 | 
	
		
			
				|  |  |      if not shutting_down:
 | 
	
		
			
				|  |  |          mqtt_queue.put((client, userdata, msg))  # 放入队列,由消费者线程处理
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  # ================================
 | 
	
		
			
				|  |  |  # MQTT 线程类
 | 
	
		
			
				|  |  |  # ================================
 | 
	
		
			
				|  |  |  class MQTTClientThread(threading.Thread):
 | 
	
		
			
				|  |  |      def __init__(self,):
 | 
	
		
			
				|  |  | -        threading.Thread.__init__(self, name= "MQTTClientThread")
 | 
	
		
			
				|  |  | +        super().__init__(name= "MQTTClientThread")
 | 
	
		
			
				|  |  |          self.client:mqtt.Client = mqtt.Client()
 | 
	
		
			
				|  |  |          self.publish_status = {}
 | 
	
		
			
				|  |  |          self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
 | 
	
		
			
				|  |  | -        self.client.on_connect = on_connect
 | 
	
		
			
				|  |  | -        self.client.on_message = on_message
 | 
	
		
			
				|  |  | +        self.client.on_connect = self.on_connect
 | 
	
		
			
				|  |  | +        self.client.on_disconnect = self.on_disconnect
 | 
	
		
			
				|  |  |          self.client.on_publish = self.on_publish
 | 
	
		
			
				|  |  | +        self.client.on_message = on_message
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          self.running = True
 | 
	
		
			
				|  |  | +        self.reconnect_delay    = 5     # 重连间隔 秒
 | 
	
		
			
				|  |  | +        self.executor = ThreadPoolExecutor(max_workers=4)   # 异步发送线程池
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +# ================================
 | 
	
		
			
				|  |  | +# MQTT 回调
 | 
	
		
			
				|  |  | +# ================================
 | 
	
		
			
				|  |  | +    def on_connect(self, client:mqtt.Client, userdata, flags, rc):
 | 
	
		
			
				|  |  | +        if rc == 0:
 | 
	
		
			
				|  |  | +            LOGINFO("MQTT Connected successfully!")
 | 
	
		
			
				|  |  | +            client.subscribe(TOPICS.dev_tracker_targets)
 | 
	
		
			
				|  |  | +            client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            LOGERR(f"MQTT failed to connect, return code {rc}")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def on_disconnect(self, client, userdata, rc):
 | 
	
		
			
				|  |  | +        if shutting_down:
 | 
	
		
			
				|  |  | +            return
 | 
	
		
			
				|  |  | +        LOGERR(f"MQTT disconnected (rc={rc})")
 | 
	
		
			
				|  |  | +        # 循环尝试重连
 | 
	
		
			
				|  |  | +        while not shutting_down and self.running:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                LOGINFO("Trying to reconnect to MQTT broker...")
 | 
	
		
			
				|  |  | +                client.reconnect()
 | 
	
		
			
				|  |  | +                LOGINFO("MQTT reconnected successfully!")
 | 
	
		
			
				|  |  | +                break
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s")
 | 
	
		
			
				|  |  | +                time.sleep(self.reconnect_delay)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def on_publish(self, client, userdata, mid):
 | 
	
		
			
				|  |  |          self.publish_status[mid] = "success"
 | 
	
		
			
				|  |  | +        LOGINFO(f"Message {mid} published successfully")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # 异步发送消息
 | 
	
		
			
				|  |  | +    def async_publish(self, mqtt_msg: dict):
 | 
	
		
			
				|  |  | +        try:
 | 
	
		
			
				|  |  | +            topic = mqtt_msg["topic"]
 | 
	
		
			
				|  |  | +            msg = mqtt_msg["msg"]
 | 
	
		
			
				|  |  | +            qos = mqtt_msg.get("qos", 0)
 | 
	
		
			
				|  |  | +            info = self.client.publish(topic, msg, qos=qos)
 | 
	
		
			
				|  |  | +            if info.rc == 0:
 | 
	
		
			
				|  |  | +                self.publish_status[info.mid] = "pending"
 | 
	
		
			
				|  |  | +                if qos == 0:
 | 
	
		
			
				|  |  | +                    self.publish_status[info.mid] = "success"
 | 
	
		
			
				|  |  | +            else:
 | 
	
		
			
				|  |  | +                LOGERR(f"MQTT publish failed (rc={info.rc}), topic: {topic}")
 | 
	
		
			
				|  |  | +                mqtt_send_que.put(mqtt_msg)  # 重试
 | 
	
		
			
				|  |  | +        except Exception as e:
 | 
	
		
			
				|  |  | +            LOGERR(f"MQTT send error: {e}, topic: {topic}")
 | 
	
		
			
				|  |  | +            mqtt_send_que.put(mqtt_msg)  # 重试
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def send_msg_to_mqtt(self):
 | 
	
		
			
				|  |  | -        while True:
 | 
	
		
			
				|  |  | -            time.sleep(0.01)
 | 
	
		
			
				|  |  | +        while self.running and not shutting_down:
 | 
	
		
			
				|  |  |              try:
 | 
	
		
			
				|  |  |                  mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
 | 
	
		
			
				|  |  | -            except Exception:
 | 
	
		
			
				|  |  | -                if shutting_down:
 | 
	
		
			
				|  |  | -                    break
 | 
	
		
			
				|  |  | +            except Empty:
 | 
	
		
			
				|  |  |                  continue
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |              try:
 | 
	
		
			
				|  |  |                  topic = mqtt_msg["topic"]
 | 
	
		
			
				|  |  |                  msg = mqtt_msg["msg"]
 | 
	
	
		
			
				|  | @@ -136,29 +171,162 @@ class MQTTClientThread(threading.Thread):
 | 
	
		
			
				|  |  |                          self.publish_status[info.mid] = "success"
 | 
	
		
			
				|  |  |                  else:
 | 
	
		
			
				|  |  |                      LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
 | 
	
		
			
				|  |  | +                    mqtt_send_que.put(mqtt_msg) # 重试
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              except Exception as e:
 | 
	
		
			
				|  |  |                  LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
 | 
	
		
			
				|  |  | +                mqtt_send_que.put(mqtt_msg) # 重试
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def run(self):
 | 
	
		
			
				|  |  |          global shutting_down
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  | +            LOGINFO("Connecting to MQTT broker...")
 | 
	
		
			
				|  |  |              self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
 | 
	
		
			
				|  |  | +            self.client.loop_start()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            while self.running and not shutting_down:
 | 
	
		
			
				|  |  | +                try:
 | 
	
		
			
				|  |  | +                    mqtt_msg = mqtt_send_que.get(timeout=0.1)
 | 
	
		
			
				|  |  | +                    # 使用线程池异步发送
 | 
	
		
			
				|  |  | +                    self.executor.submit(self.async_publish, mqtt_msg)
 | 
	
		
			
				|  |  | +                except Empty:
 | 
	
		
			
				|  |  | +                    continue
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        except Exception as e:
 | 
	
		
			
				|  |  | +            LOGERR(f"MQTT thread encountered an error: {e}")
 | 
	
		
			
				|  |  | +        finally:
 | 
	
		
			
				|  |  | +            self.running = False
 | 
	
		
			
				|  |  | +            self.executor.shutdown(wait=True)
 | 
	
		
			
				|  |  | +            self.client.loop_stop()
 | 
	
		
			
				|  |  | +            self.client.disconnect()
 | 
	
		
			
				|  |  | +            shutting_down = True
 | 
	
		
			
				|  |  | +            LOGINFO("MQTT thread exited")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +# 可靠顺序发送、支持断线重连、安全退出
 | 
	
		
			
				|  |  | +class RobustMQTTClient(threading.Thread):
 | 
	
		
			
				|  |  | +    def __init__(self, max_workers=4):
 | 
	
		
			
				|  |  | +        super().__init__(name="RobustMQTTClient")
 | 
	
		
			
				|  |  | +        self.client: mqtt.Client = mqtt.Client()
 | 
	
		
			
				|  |  | +        self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
 | 
	
		
			
				|  |  | +        self.client.on_connect = self.on_connect
 | 
	
		
			
				|  |  | +        self.client.on_disconnect = self.on_disconnect
 | 
	
		
			
				|  |  | +        self.client.on_publish = self.on_publish
 | 
	
		
			
				|  |  | +        self.client.on_message = on_message
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        self.running = True
 | 
	
		
			
				|  |  | +        self.reconnect_delay = 5  # 重连间隔
 | 
	
		
			
				|  |  | +        self.publish_status = {}  # mid -> 状态
 | 
	
		
			
				|  |  | +        self.send_queue = Queue()  # 内部可靠顺序队列
 | 
	
		
			
				|  |  | +        self.executor = ThreadPoolExecutor(max_workers=max_workers)
 | 
	
		
			
				|  |  | +        self.connected = threading.Event()  # 用于标记 MQTT 是否已连接
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    # MQTT 回调
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    def on_connect(self, client, userdata, flags, rc):
 | 
	
		
			
				|  |  | +        if rc == 0:
 | 
	
		
			
				|  |  | +            LOGINFO("MQTT connected successfully!")
 | 
	
		
			
				|  |  | +            client.subscribe(TOPICS.dev_tracker_targets)
 | 
	
		
			
				|  |  | +            client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            LOGERR(f"MQTT failed to connect, rc={rc}")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def on_disconnect(self, client, userdata, rc):
 | 
	
		
			
				|  |  | +        LOGERR(f"MQTT disconnected (rc={rc})")
 | 
	
		
			
				|  |  | +        self.connected.clear()
 | 
	
		
			
				|  |  | +        if shutting_down:
 | 
	
		
			
				|  |  | +            return
 | 
	
		
			
				|  |  | +        LOGERR(f"MQTT disconnected (rc={rc})")
 | 
	
		
			
				|  |  | +        while not shutting_down and self.running:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                LOGINFO("Trying to reconnect to MQTT broker...")
 | 
	
		
			
				|  |  | +                client.reconnect()
 | 
	
		
			
				|  |  | +                LOGINFO("MQTT reconnected successfully!")
 | 
	
		
			
				|  |  | +                self.connected.set()
 | 
	
		
			
				|  |  | +                break
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                LOGERR(f"Reconnect failed: {e}, retry in {self.reconnect_delay}s")
 | 
	
		
			
				|  |  | +                time.sleep(self.reconnect_delay)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def on_publish(self, client, userdata, mid):
 | 
	
		
			
				|  |  | +        self.publish_status[mid] = "success"
 | 
	
		
			
				|  |  | +        LOGINFO(f"Message {mid} published successfully")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    # 安全发送单条消息
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    def _send_message(self, mqtt_msg: dict):
 | 
	
		
			
				|  |  | +        while self.running and not shutting_down:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                # 等待连接成功再发送
 | 
	
		
			
				|  |  | +                if not self.connected.wait(timeout=1):
 | 
	
		
			
				|  |  | +                    continue
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                topic = mqtt_msg["topic"]
 | 
	
		
			
				|  |  | +                payload = mqtt_msg["msg"]
 | 
	
		
			
				|  |  | +                qos = mqtt_msg.get("qos", 0)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                info = self.client.publish(topic, payload, qos=qos)
 | 
	
		
			
				|  |  | +                if info.rc != 0:
 | 
	
		
			
				|  |  | +                    LOGERR(f"MQTT publish failed (rc={info.rc}), topic={topic}, retrying")
 | 
	
		
			
				|  |  | +                    time.sleep(1)
 | 
	
		
			
				|  |  | +                    continue
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                self.publish_status[info.mid] = "pending"
 | 
	
		
			
				|  |  | +                while self.publish_status.get(info.mid) != "success" and not shutting_down:
 | 
	
		
			
				|  |  | +                    time.sleep(0.01)
 | 
	
		
			
				|  |  | +                break
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            except Exception as e:
 | 
	
		
			
				|  |  | +                LOGERR(f"MQTT send error: {e}, topic={mqtt_msg.get('topic')}, retrying")
 | 
	
		
			
				|  |  | +                time.sleep(1)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    # 外部接口:发送消息
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    def send_msg(self, mqtt_msg: dict):
 | 
	
		
			
				|  |  | +        self.send_queue.put(mqtt_msg)  # 放入内部可靠队列
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    # 队列处理循环
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    def _process_queue(self):
 | 
	
		
			
				|  |  | +        while self.running and not shutting_down:
 | 
	
		
			
				|  |  | +            try:
 | 
	
		
			
				|  |  | +                mqtt_msg = self.send_queue.get(timeout=0.1)
 | 
	
		
			
				|  |  | +                # 提交线程池异步发送
 | 
	
		
			
				|  |  | +                self.executor.submit(self._send_message, mqtt_msg)
 | 
	
		
			
				|  |  | +            except Empty:
 | 
	
		
			
				|  |  | +                continue
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    # 主线程
 | 
	
		
			
				|  |  | +    # =========================
 | 
	
		
			
				|  |  | +    def run(self):
 | 
	
		
			
				|  |  | +        global shutting_down
 | 
	
		
			
				|  |  | +        try:
 | 
	
		
			
				|  |  |              LOGINFO("Connecting to MQTT broker...")
 | 
	
		
			
				|  |  | +            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
 | 
	
		
			
				|  |  |              self.client.loop_start()
 | 
	
		
			
				|  |  | -            self.send_msg_to_mqtt()
 | 
	
		
			
				|  |  | +            self._process_queue()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          except Exception as e:
 | 
	
		
			
				|  |  |              LOGERR(f"MQTT thread encountered an error: {e}")
 | 
	
		
			
				|  |  |          finally:
 | 
	
		
			
				|  |  |              self.running = False
 | 
	
		
			
				|  |  | +            self.executor.shutdown(wait=True)
 | 
	
		
			
				|  |  |              self.client.loop_stop()
 | 
	
		
			
				|  |  |              self.client.disconnect()
 | 
	
		
			
				|  |  |              shutting_down = True
 | 
	
		
			
				|  |  | +            LOGINFO("MQTT client exited")
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -mqtt_client: MQTTClientThread = None
 | 
	
		
			
				|  |  | -mqtt_consumer: MQTTConsumerThread = None
 | 
	
		
			
				|  |  | +g_mqtt_client: RobustMQTTClient = None
 | 
	
		
			
				|  |  | +g_mqtt_consumer: MQTTConsumerThread = None
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  # ================================
 | 
	
		
			
				|  |  |  # 退出信号处理
 | 
	
	
		
			
				|  | @@ -167,10 +335,10 @@ def signal_handler(sig, frame):
 | 
	
		
			
				|  |  |      global shutting_down
 | 
	
		
			
				|  |  |      LOGINFO("Exiting... shutting down MQTT and thread pool")
 | 
	
		
			
				|  |  |      shutting_down = True
 | 
	
		
			
				|  |  | -    mqtt_client.running = False
 | 
	
		
			
				|  |  | -    mqtt_client.client.loop_stop()
 | 
	
		
			
				|  |  | -    mqtt_client.client.disconnect()
 | 
	
		
			
				|  |  | -    mqtt_consumer.running = False
 | 
	
		
			
				|  |  | +    g_mqtt_client.running = False
 | 
	
		
			
				|  |  | +    g_mqtt_client.client.loop_stop()
 | 
	
		
			
				|  |  | +    g_mqtt_client.client.disconnect()
 | 
	
		
			
				|  |  | +    g_mqtt_consumer.running = False
 | 
	
		
			
				|  |  |      executor.shutdown(wait=True)
 | 
	
		
			
				|  |  |      sys.exit(0)
 | 
	
		
			
				|  |  |  
 |