''' 处理mqtt消息相关 ''' import paho.mqtt.client as mqtt import time import threading import re import queue import sys from queue import Queue, Empty from concurrent.futures import ThreadPoolExecutor import signal import common.sys_comm as sys_comm from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from mqtt.mqtt_topics import TOPICS, Topic_Pattern import mqtt.mqtt_recv as mqtt_recv mqtt_send_que = queue.Queue() # 发送队列 # 格式如下 ''' { "topic": "topic/xxxx", "msg": "msg to be send" } ''' # ================================ # MQTT 配置 # ================================ MQTT_BROKER = "119.45.12.173" # MQTT BROKER 地址 MQTT_PORT = 1883 # MQTT 端口 MQTT_USERNAME = "lnradar" # MQTT 用户名 MQTT_PASSWD = "lnradar" # MQTT 密码 # ================================ # 全局对象 # ================================ executor = ThreadPoolExecutor(max_workers=8) mqtt_queue = Queue() # 消息队列 shutting_down = False import atexit atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True)) # ================================ # 辅助函数 # ================================ def check_topic(pattern:str, topic:str) -> bool: return bool(re.match(pattern, topic)) # ================================ # 消费者线程 # ================================ class MQTTConsumerThread(threading.Thread): def __init__(self): super().__init__(name= "MQTTConsumerThread") self.running = True def run(self): global shutting_down while self.running: try: msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg) except Empty: if shutting_down: break continue client, userdata, msg = msg_tuple if shutting_down: break try: executor.submit(mqtt_recv.process_message, client, userdata, msg) except RuntimeError: # 线程池已关闭,忽略 break def on_message(client, userdata, msg): if not shutting_down: mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理 # ================================ # MQTT 线程类 # ================================ class MQTTClientThread(threading.Thread): def __init__(self,): super().__init__(name= "MQTTClientThread") self.client:mqtt.Client = mqtt.Client() self.publish_status = {} self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD) self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_publish = self.on_publish self.client.on_message = on_message self.running = True self.reconnect_delay = 5 # 重连间隔 秒 self.executor = ThreadPoolExecutor(max_workers=4) # 异步发送线程池 # ================================ # MQTT 回调 # ================================ def on_connect(self, client:mqtt.Client, userdata, flags, rc): if rc == 0: LOGINFO("MQTT Connected successfully!") client.subscribe(TOPICS.dev_tracker_targets) client.subscribe(TOPICS.las_alarm_plan_update, qos=2) client.subscribe(TOPICS.las_test, qos=2) else: LOGERR(f"MQTT failed to connect, return code {rc}") def on_disconnect(self, client, userdata, rc): if shutting_down: return LOGERR(f"MQTT disconnected (rc={rc})") # 循环尝试重连 while not shutting_down and self.running: try: LOGINFO("Trying to reconnect to MQTT broker...") client.reconnect() LOGINFO("MQTT reconnected successfully!") break except Exception as e: LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s") time.sleep(self.reconnect_delay) def on_publish(self, client, userdata, mid): self.publish_status[mid] = "success" LOGINFO(f"Message {mid} published successfully") # 异步发送消息 def async_publish(self, mqtt_msg: dict): try: topic = mqtt_msg["topic"] msg = mqtt_msg["msg"] qos = mqtt_msg.get("qos", 0) info = self.client.publish(topic, msg, qos=qos) if info.rc == 0: self.publish_status[info.mid] = "pending" if qos == 0: self.publish_status[info.mid] = "success" else: LOGERR(f"MQTT publish failed (rc={info.rc}), topic: {topic}") mqtt_send_que.put(mqtt_msg) # 重试 except Exception as e: LOGERR(f"MQTT send error: {e}, topic: {topic}") mqtt_send_que.put(mqtt_msg) # 重试 def send_msg_to_mqtt(self): while self.running and not shutting_down: try: mqtt_msg: dict = mqtt_send_que.get(timeout=0.1) except Empty: continue try: topic = mqtt_msg["topic"] msg = mqtt_msg["msg"] qos = mqtt_msg.get("qos", 0) info = self.client.publish(topic, msg, qos=qos) if info.rc == 0: self.publish_status[info.mid] = "pending" if qos == 0: self.publish_status[info.mid] = "success" else: LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}") mqtt_send_que.put(mqtt_msg) # 重试 except Exception as e: LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}") mqtt_send_que.put(mqtt_msg) # 重试 def run(self): global shutting_down try: LOGINFO("Connecting to MQTT broker...") self.client.connect(MQTT_BROKER, MQTT_PORT, 60) self.client.loop_start() while self.running and not shutting_down: try: mqtt_msg = mqtt_send_que.get(timeout=0.1) # 使用线程池异步发送 self.executor.submit(self.async_publish, mqtt_msg) except Empty: continue except Exception as e: LOGERR(f"MQTT thread encountered an error: {e}") finally: self.running = False self.executor.shutdown(wait=True) self.client.loop_stop() self.client.disconnect() shutting_down = True LOGINFO("MQTT thread exited") # 可靠顺序发送、支持断线重连、安全退出 class RobustMQTTClient(threading.Thread): def __init__(self, max_workers=4): super().__init__(name="RobustMQTTClient") self.client: mqtt.Client = mqtt.Client() self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD) self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_publish = self.on_publish self.client.on_message = on_message self.running = True self.reconnect_delay = 5 # 重连间隔 self.publish_status = {} # mid -> 状态 self.send_queue = Queue() # 内部可靠顺序队列 self.executor = ThreadPoolExecutor(max_workers=max_workers) self.connected = threading.Event() # 用于标记 MQTT 是否已连接 # ========================= # MQTT 回调 # ========================= def on_connect(self, client, userdata, flags, rc): if rc == 0: LOGINFO("MQTT connected successfully!") client.subscribe(TOPICS.dev_tracker_targets) client.subscribe(TOPICS.las_alarm_plan_update, qos=2) client.subscribe(TOPICS.las_test, qos=2) else: LOGERR(f"MQTT failed to connect, rc={rc}") def on_disconnect(self, client, userdata, rc): LOGERR(f"MQTT disconnected (rc={rc})") self.connected.clear() if shutting_down: return LOGERR(f"MQTT disconnected (rc={rc})") while not shutting_down and self.running: try: LOGINFO("Trying to reconnect to MQTT broker...") client.reconnect() LOGINFO("MQTT reconnected successfully!") self.connected.set() break except Exception as e: LOGERR(f"Reconnect failed: {e}, retry in {self.reconnect_delay}s") time.sleep(self.reconnect_delay) def on_publish(self, client, userdata, mid): self.publish_status[mid] = "success" LOGINFO(f"Message {mid} published successfully") # ========================= # 安全发送单条消息 # ========================= def _send_message(self, mqtt_msg: dict): while self.running and not shutting_down: try: # 等待连接成功再发送 if not self.connected.wait(timeout=1): continue topic = mqtt_msg["topic"] payload = mqtt_msg["msg"] qos = mqtt_msg.get("qos", 0) info = self.client.publish(topic, payload, qos=qos) if info.rc != 0: LOGERR(f"MQTT publish failed (rc={info.rc}), topic={topic}, retrying") time.sleep(1) continue self.publish_status[info.mid] = "pending" while self.publish_status.get(info.mid) != "success" and not shutting_down: time.sleep(0.01) break except Exception as e: LOGERR(f"MQTT send error: {e}, topic={mqtt_msg.get('topic')}, retrying") time.sleep(1) # ========================= # 外部接口:发送消息 # ========================= def send_msg(self, mqtt_msg: dict): self.send_queue.put(mqtt_msg) # 放入内部可靠队列 # ========================= # 队列处理循环 # ========================= def _process_queue(self): while self.running and not shutting_down: try: mqtt_msg = self.send_queue.get(timeout=0.1) # 提交线程池异步发送 self.executor.submit(self._send_message, mqtt_msg) except Empty: continue # ========================= # 主线程 # ========================= def run(self): global shutting_down try: LOGINFO("Connecting to MQTT broker...") self.client.connect(MQTT_BROKER, MQTT_PORT, 60) self.client.loop_start() self._process_queue() except Exception as e: LOGERR(f"MQTT thread encountered an error: {e}") finally: self.running = False self.executor.shutdown(wait=True) self.client.loop_stop() self.client.disconnect() shutting_down = True LOGINFO("MQTT client exited") g_mqtt_client: RobustMQTTClient = None g_mqtt_consumer: MQTTConsumerThread = None # ================================ # 退出信号处理 # ================================ def signal_handler(sig, frame): global shutting_down LOGINFO("Exiting... shutting down MQTT and thread pool") shutting_down = True g_mqtt_client.running = False g_mqtt_client.client.loop_stop() g_mqtt_client.client.disconnect() g_mqtt_consumer.running = False executor.shutdown(wait=True) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler)