''' 处理mqtt消息相关 ''' import paho.mqtt.client as mqtt import time import threading import re import queue import sys from queue import Queue, Empty from concurrent.futures import ThreadPoolExecutor import signal import common.sys_comm as sys_comm from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from mqtt.mqtt_topics import TOPICS, Topic_Pattern import mqtt.mqtt_recv as mqtt_recv import mqtt.mqtt_send as mqtt_send from mqtt.mqtt_send import mqtt_send_que # 发送队列 # 格式如下 ''' { "topic": "topic/xxxx", "msg": "msg to be send" } ''' # ================================ # MQTT 配置 # ================================ MQTT_BROKER = "119.45.12.173" # MQTT BROKER 地址 MQTT_PORT = 1883 # MQTT 端口 MQTT_USERNAME = "lnradar" # MQTT 用户名 MQTT_PASSWD = "lnradar" # MQTT 密码 # ================================ # 全局对象 # ================================ executor = ThreadPoolExecutor(max_workers=8) mqtt_queue = Queue() # 消息队列 shutting_down = False import atexit atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True)) # ================================ # 辅助函数 # ================================ def check_topic(pattern:str, topic:str) -> bool: return bool(re.match(pattern, topic)) # ================================ # 消费者线程 # ================================ class MQTTConsumerThread(threading.Thread): def __init__(self): super().__init__() self.running = True def run(self): global shutting_down while self.running: try: msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg) except Empty: if shutting_down: break continue client, userdata, msg = msg_tuple if shutting_down: break try: executor.submit(mqtt_recv.process_message, client, userdata, msg) except RuntimeError: # 线程池已关闭,忽略 break # ================================ # MQTT 回调 # ================================ def on_connect(client:mqtt.Client, userdata, flags, rc): if rc == 0: LOGINFO("MQTT Connected successfully!") client.subscribe(TOPICS.dev_dsp_data) # client.subscribe(TOPICS.das_all) # client.subscribe(TOPICS.mps_all) client.subscribe(TOPICS.las_alarm_plan_update, qos=2) else: LOGERR(f"MQTT failed to connect, return code {rc}") def on_message(client, userdata, msg): if not shutting_down: mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理 # ================================ # MQTT 线程类 # ================================ class MQTTClientThread(threading.Thread): def __init__(self,): threading.Thread.__init__(self) self.client:mqtt.Client = mqtt.Client() self.publish_status = {} self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD) self.client.on_connect = on_connect self.client.on_message = on_message self.client.on_publish = self.on_publish self.running = True def on_publish(self, client, userdata, mid): self.publish_status[mid] = "success" def send_msg_to_mqtt(self): while True: time.sleep(0.01) try: mqtt_msg: dict = mqtt_send_que.get(timeout=0.1) except Exception: if shutting_down: break continue try: topic = mqtt_msg["topic"] msg = mqtt_msg["msg"] qos = mqtt_msg.get("qos", 0) info = self.client.publish(topic, msg, qos=qos) if info.rc == 0: self.publish_status[info.mid] = "pending" if qos == 0: self.publish_status[info.mid] = "success" else: LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}") except Exception as e: LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}") def run(self): global shutting_down try: self.client.connect(MQTT_BROKER, MQTT_PORT, 60) LOGINFO("Connecting to MQTT broker...") self.client.loop_start() self.send_msg_to_mqtt() except Exception as e: LOGERR(f"MQTT thread encountered an error: {e}") finally: self.running = False self.client.loop_stop() self.client.disconnect() shutting_down = True mqtt_client: MQTTClientThread = None mqtt_consumer: MQTTConsumerThread = None # ================================ # 退出信号处理 # ================================ def signal_handler(sig, frame): global shutting_down LOGINFO("Exiting... shutting down MQTT and thread pool") shutting_down = True mqtt_client.running = False mqtt_client.client.loop_stop() mqtt_client.client.disconnect() mqtt_consumer.running = False executor.shutdown(wait=True) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler)