mqtt_process.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. '''
  2. 处理mqtt消息相关
  3. '''
  4. import paho.mqtt.client as mqtt
  5. import time
  6. import threading
  7. import re
  8. import queue
  9. import sys
  10. from queue import Queue, Empty
  11. from concurrent.futures import ThreadPoolExecutor
  12. import signal
  13. import common.sys_comm as sys_comm
  14. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  15. from mqtt.mqtt_topics import TOPICS, Topic_Pattern
  16. import mqtt.mqtt_recv as mqtt_recv
  17. import mqtt.mqtt_send as mqtt_send
  18. from mqtt.mqtt_send import mqtt_send_que # 发送队列
  19. # 格式如下
  20. '''
  21. {
  22. "topic": "topic/xxxx",
  23. "msg": "msg to be send"
  24. }
  25. '''
  26. # ================================
  27. # MQTT 配置
  28. # ================================
  29. MQTT_BROKER = "119.45.12.173" # MQTT BROKER 地址
  30. MQTT_PORT = 1883 # MQTT 端口
  31. MQTT_USERNAME = "lnradar" # MQTT 用户名
  32. MQTT_PASSWD = "lnradar" # MQTT 密码
  33. # ================================
  34. # 全局对象
  35. # ================================
  36. executor = ThreadPoolExecutor(max_workers=8)
  37. mqtt_queue = Queue() # 消息队列
  38. shutting_down = False
  39. import atexit
  40. atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True))
  41. # ================================
  42. # 辅助函数
  43. # ================================
  44. def check_topic(pattern:str, topic:str) -> bool:
  45. return bool(re.match(pattern, topic))
  46. # ================================
  47. # 消费者线程
  48. # ================================
  49. class MQTTConsumerThread(threading.Thread):
  50. def __init__(self):
  51. super().__init__(name= "MQTTConsumerThread")
  52. self.running = True
  53. def run(self):
  54. global shutting_down
  55. while self.running:
  56. try:
  57. msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg)
  58. except Empty:
  59. if shutting_down:
  60. break
  61. continue
  62. client, userdata, msg = msg_tuple
  63. if shutting_down:
  64. break
  65. try:
  66. executor.submit(mqtt_recv.process_message, client, userdata, msg)
  67. except RuntimeError:
  68. # 线程池已关闭,忽略
  69. break
  70. def on_message(self, client, userdata, msg):
  71. if not shutting_down:
  72. mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理
  73. # ================================
  74. # MQTT 线程类
  75. # ================================
  76. class MQTTClientThread(threading.Thread):
  77. def __init__(self,):
  78. threading.Thread.__init__(self, name= "MQTTClientThread")
  79. self.client:mqtt.Client = mqtt.Client()
  80. self.publish_status = {}
  81. self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
  82. self.client.on_connect = self.on_connect
  83. self.client.on_message = on_message
  84. self.client.on_disconnect = self.on_disconnect
  85. self.client.on_publish = self.on_publish
  86. self.running = True
  87. self.reconnect_delay = 5 # 重连间隔 秒
  88. # ================================
  89. # MQTT 回调
  90. # ================================
  91. def on_connect(self, client:mqtt.Client, userdata, flags, rc):
  92. if rc == 0:
  93. LOGINFO("MQTT Connected successfully!")
  94. client.subscribe(TOPICS.dev_tracker_targets)
  95. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  96. else:
  97. LOGERR(f"MQTT failed to connect, return code {rc}")
  98. def on_disconnect(self, client, userdata, rc):
  99. if shutting_down:
  100. return
  101. LOGERR(f"MQTT disconnected (rc={rc})")
  102. # 循环尝试重连
  103. while not shutting_down and self.running:
  104. try:
  105. LOGINFO("Trying to reconnect to MQTT broker...")
  106. client.reconnect()
  107. LOGINFO("MQTT reconnected successfully!")
  108. break
  109. except Exception as e:
  110. LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s")
  111. time.sleep(self.reconnect_delay)
  112. def on_publish(self, client, userdata, mid):
  113. self.publish_status[mid] = "success"
  114. def send_msg_to_mqtt(self):
  115. while self.running and not shutting_down:
  116. try:
  117. mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
  118. except Empty:
  119. continue
  120. try:
  121. topic = mqtt_msg["topic"]
  122. msg = mqtt_msg["msg"]
  123. qos = mqtt_msg.get("qos", 0)
  124. info = self.client.publish(topic, msg, qos=qos)
  125. if info.rc == 0:
  126. self.publish_status[info.mid] = "pending"
  127. if qos == 0:
  128. self.publish_status[info.mid] = "success"
  129. else:
  130. LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  131. mqtt_send_que.put(mqtt_msg) # 重试
  132. except Exception as e:
  133. LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  134. mqtt_send_que.put(mqtt_msg) # 重试
  135. def run(self):
  136. global shutting_down
  137. try:
  138. LOGINFO("Connecting to MQTT broker...")
  139. self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
  140. self.client.loop_start()
  141. self.send_msg_to_mqtt()
  142. except Exception as e:
  143. LOGERR(f"MQTT thread encountered an error: {e}")
  144. finally:
  145. self.running = False
  146. self.client.loop_stop()
  147. self.client.disconnect()
  148. shutting_down = True
  149. mqtt_client: MQTTClientThread = None
  150. mqtt_consumer: MQTTConsumerThread = None
  151. # ================================
  152. # 退出信号处理
  153. # ================================
  154. def signal_handler(sig, frame):
  155. global shutting_down
  156. LOGINFO("Exiting... shutting down MQTT and thread pool")
  157. shutting_down = True
  158. mqtt_client.running = False
  159. mqtt_client.client.loop_stop()
  160. mqtt_client.client.disconnect()
  161. mqtt_consumer.running = False
  162. executor.shutdown(wait=True)
  163. sys.exit(0)
  164. signal.signal(signal.SIGINT, signal_handler)
  165. signal.signal(signal.SIGTERM, signal_handler)