mqtt_process.py 12 KB


  1. '''
  2. 处理mqtt消息相关
  3. '''
  4. import paho.mqtt.client as mqtt
  5. import time
  6. import threading
  7. import re
  8. import queue
  9. import sys
  10. from queue import Queue, Empty
  11. from concurrent.futures import ThreadPoolExecutor
  12. import signal
  13. import common.sys_comm as sys_comm
  14. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  15. from mqtt.mqtt_topics import TOPICS, Topic_Pattern
  16. import mqtt.mqtt_recv as mqtt_recv
  17. mqtt_send_que = queue.Queue() # 发送队列
  18. # 格式如下
  19. '''
  20. {
  21. "topic": "topic/xxxx",
  22. "msg": "msg to be send"
  23. }
  24. '''
  25. # ================================
  26. # MQTT 配置
  27. # ================================
  28. MQTT_BROKER = "119.45.12.173" # MQTT BROKER 地址
  29. MQTT_PORT = 1883 # MQTT 端口
  30. MQTT_USERNAME = "lnradar" # MQTT 用户名
  31. MQTT_PASSWD = "lnradar" # MQTT 密码
  32. # ================================
  33. # 全局对象
  34. # ================================
  35. executor = ThreadPoolExecutor(max_workers=8)
  36. mqtt_queue = Queue() # 消息队列
  37. shutting_down = False
  38. import atexit
  39. atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True))
  40. # ================================
  41. # 辅助函数
  42. # ================================
  43. def check_topic(pattern:str, topic:str) -> bool:
  44. return bool(re.match(pattern, topic))
  45. # ================================
  46. # 消费者线程
  47. # ================================
  48. class MQTTConsumerThread(threading.Thread):
  49. def __init__(self):
  50. super().__init__(name= "MQTTConsumerThread")
  51. self.running = True
  52. def run(self):
  53. global shutting_down
  54. while self.running:
  55. try:
  56. msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg)
  57. except Empty:
  58. if shutting_down:
  59. break
  60. continue
  61. client, userdata, msg = msg_tuple
  62. if shutting_down:
  63. break
  64. try:
  65. executor.submit(mqtt_recv.process_message, client, userdata, msg)
  66. except RuntimeError:
  67. # 线程池已关闭,忽略
  68. break
  69. def on_message(client, userdata, msg):
  70. if not shutting_down:
  71. mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理
  72. # ================================
  73. # MQTT 线程类
  74. # ================================
  75. class MQTTClientThread(threading.Thread):
  76. def __init__(self,):
  77. super().__init__(name= "MQTTClientThread")
  78. self.client:mqtt.Client = mqtt.Client()
  79. self.publish_status = {}
  80. self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
  81. self.client.on_connect = self.on_connect
  82. self.client.on_disconnect = self.on_disconnect
  83. self.client.on_publish = self.on_publish
  84. self.client.on_message = on_message
  85. self.running = True
  86. self.reconnect_delay = 5 # 重连间隔 秒
  87. self.executor = ThreadPoolExecutor(max_workers=4) # 异步发送线程池
  88. # ================================
  89. # MQTT 回调
  90. # ================================
  91. def on_connect(self, client:mqtt.Client, userdata, flags, rc):
  92. if rc == 0:
  93. LOGINFO("MQTT Connected successfully!")
  94. client.subscribe(TOPICS.dev_tracker_targets)
  95. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  96. client.subscribe(TOPICS.las_test, qos=2)
  97. else:
  98. LOGERR(f"MQTT failed to connect, return code {rc}")
  99. def on_disconnect(self, client, userdata, rc):
  100. if shutting_down:
  101. return
  102. LOGERR(f"MQTT disconnected (rc={rc})")
  103. # 循环尝试重连
  104. while not shutting_down and self.running:
  105. try:
  106. LOGINFO("Trying to reconnect to MQTT broker...")
  107. client.reconnect()
  108. LOGINFO("MQTT reconnected successfully!")
  109. break
  110. except Exception as e:
  111. LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s")
  112. time.sleep(self.reconnect_delay)
  113. def on_publish(self, client, userdata, mid):
  114. self.publish_status[mid] = "success"
  115. LOGINFO(f"Message {mid} published successfully")
  116. # 异步发送消息
  117. def async_publish(self, mqtt_msg: dict):
  118. try:
  119. topic = mqtt_msg["topic"]
  120. msg = mqtt_msg["msg"]
  121. qos = mqtt_msg.get("qos", 0)
  122. info = self.client.publish(topic, msg, qos=qos)
  123. if info.rc == 0:
  124. self.publish_status[info.mid] = "pending"
  125. if qos == 0:
  126. self.publish_status[info.mid] = "success"
  127. else:
  128. LOGERR(f"MQTT publish failed (rc={info.rc}), topic: {topic}")
  129. mqtt_send_que.put(mqtt_msg) # 重试
  130. except Exception as e:
  131. LOGERR(f"MQTT send error: {e}, topic: {topic}")
  132. mqtt_send_que.put(mqtt_msg) # 重试
  133. def send_msg_to_mqtt(self):
  134. while self.running and not shutting_down:
  135. try:
  136. mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
  137. except Empty:
  138. continue
  139. try:
  140. topic = mqtt_msg["topic"]
  141. msg = mqtt_msg["msg"]
  142. qos = mqtt_msg.get("qos", 0)
  143. info = self.client.publish(topic, msg, qos=qos)
  144. if info.rc == 0:
  145. self.publish_status[info.mid] = "pending"
  146. if qos == 0:
  147. self.publish_status[info.mid] = "success"
  148. else:
  149. LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  150. mqtt_send_que.put(mqtt_msg) # 重试
  151. except Exception as e:
  152. LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  153. mqtt_send_que.put(mqtt_msg) # 重试
  154. def run(self):
  155. global shutting_down
  156. try:
  157. LOGINFO("Connecting to MQTT broker...")
  158. self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
  159. self.client.loop_start()
  160. while self.running and not shutting_down:
  161. try:
  162. mqtt_msg = mqtt_send_que.get(timeout=0.1)
  163. # 使用线程池异步发送
  164. self.executor.submit(self.async_publish, mqtt_msg)
  165. except Empty:
  166. continue
  167. except Exception as e:
  168. LOGERR(f"MQTT thread encountered an error: {e}")
  169. finally:
  170. self.running = False
  171. self.executor.shutdown(wait=True)
  172. self.client.loop_stop()
  173. self.client.disconnect()
  174. shutting_down = True
  175. LOGINFO("MQTT thread exited")
  176. # 可靠顺序发送、支持断线重连、安全退出
  177. class RobustMQTTClient(threading.Thread):
  178. def __init__(self, max_workers=4):
  179. super().__init__(name="RobustMQTTClient")
  180. self.client: mqtt.Client = mqtt.Client()
  181. self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
  182. self.client.on_connect = self.on_connect
  183. self.client.on_disconnect = self.on_disconnect
  184. self.client.on_publish = self.on_publish
  185. self.client.on_message = on_message
  186. self.running = True
  187. self.reconnect_delay = 5 # 重连间隔
  188. self.publish_status = {} # mid -> 状态
  189. self.send_queue = Queue() # 内部可靠顺序队列
  190. self.executor = ThreadPoolExecutor(max_workers=max_workers)
  191. self.connected = threading.Event() # 用于标记 MQTT 是否已连接
  192. # =========================
  193. # MQTT 回调
  194. # =========================
  195. def on_connect(self, client, userdata, flags, rc):
  196. if rc == 0:
  197. LOGINFO("MQTT connected successfully!")
  198. client.subscribe(TOPICS.dev_tracker_targets)
  199. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  200. client.subscribe(TOPICS.las_test, qos=2)
  201. else:
  202. LOGERR(f"MQTT failed to connect, rc={rc}")
  203. def on_disconnect(self, client, userdata, rc):
  204. LOGERR(f"MQTT disconnected (rc={rc})")
  205. self.connected.clear()
  206. if shutting_down:
  207. return
  208. LOGERR(f"MQTT disconnected (rc={rc})")
  209. while not shutting_down and self.running:
  210. try:
  211. LOGINFO("Trying to reconnect to MQTT broker...")
  212. client.reconnect()
  213. LOGINFO("MQTT reconnected successfully!")
  214. self.connected.set()
  215. break
  216. except Exception as e:
  217. LOGERR(f"Reconnect failed: {e}, retry in {self.reconnect_delay}s")
  218. time.sleep(self.reconnect_delay)
  219. def on_publish(self, client, userdata, mid):
  220. self.publish_status[mid] = "success"
  221. LOGINFO(f"Message {mid} published successfully")
  222. # =========================
  223. # 安全发送单条消息
  224. # =========================
  225. def _send_message(self, mqtt_msg: dict):
  226. while self.running and not shutting_down:
  227. try:
  228. # 等待连接成功再发送
  229. if not self.connected.wait(timeout=1):
  230. continue
  231. topic = mqtt_msg["topic"]
  232. payload = mqtt_msg["msg"]
  233. qos = mqtt_msg.get("qos", 0)
  234. info = self.client.publish(topic, payload, qos=qos)
  235. if info.rc != 0:
  236. LOGERR(f"MQTT publish failed (rc={info.rc}), topic={topic}, retrying")
  237. time.sleep(1)
  238. continue
  239. self.publish_status[info.mid] = "pending"
  240. while self.publish_status.get(info.mid) != "success" and not shutting_down:
  241. time.sleep(0.01)
  242. break
  243. except Exception as e:
  244. LOGERR(f"MQTT send error: {e}, topic={mqtt_msg.get('topic')}, retrying")
  245. time.sleep(1)
  246. # =========================
  247. # 外部接口:发送消息
  248. # =========================
  249. def send_msg(self, mqtt_msg: dict):
  250. self.send_queue.put(mqtt_msg) # 放入内部可靠队列
  251. # =========================
  252. # 队列处理循环
  253. # =========================
  254. def _process_queue(self):
  255. while self.running and not shutting_down:
  256. try:
  257. mqtt_msg = self.send_queue.get(timeout=0.1)
  258. # 提交线程池异步发送
  259. self.executor.submit(self._send_message, mqtt_msg)
  260. except Empty:
  261. continue
  262. # =========================
  263. # 主线程
  264. # =========================
  265. def run(self):
  266. global shutting_down
  267. try:
  268. LOGINFO("Connecting to MQTT broker...")
  269. self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
  270. self.client.loop_start()
  271. self._process_queue()
  272. except Exception as e:
  273. LOGERR(f"MQTT thread encountered an error: {e}")
  274. finally:
  275. self.running = False
  276. self.executor.shutdown(wait=True)
  277. self.client.loop_stop()
  278. self.client.disconnect()
  279. shutting_down = True
  280. LOGINFO("MQTT client exited")
  281. g_mqtt_client: RobustMQTTClient = None
  282. g_mqtt_consumer: MQTTConsumerThread = None
  283. # ================================
  284. # 退出信号处理
  285. # ================================
  286. def signal_handler(sig, frame):
  287. global shutting_down
  288. LOGINFO("Exiting... shutting down MQTT and thread pool")
  289. shutting_down = True
  290. g_mqtt_client.running = False
  291. g_mqtt_client.client.loop_stop()
  292. g_mqtt_client.client.disconnect()
  293. g_mqtt_consumer.running = False
  294. executor.shutdown(wait=True)
  295. sys.exit(0)
  296. signal.signal(signal.SIGINT, signal_handler)
  297. signal.signal(signal.SIGTERM, signal_handler)