mqtt_process.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. '''
  2. 处理mqtt消息相关
  3. '''
  4. import paho.mqtt.client as mqtt
  5. import time
  6. import threading
  7. import re
  8. import queue
  9. import sys
  10. from queue import Queue, Empty
  11. from concurrent.futures import ThreadPoolExecutor
  12. import signal
  13. import g_config
  14. import common.sys_comm as sys_comm
  15. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  16. from mqtt.mqtt_topics import TOPICS, Topic_Pattern
  17. import mqtt.mqtt_recv as mqtt_recv
  18. mqtt_send_que = queue.Queue() # 发送队列
  19. # 格式如下
  20. '''
  21. {
  22. "topic": "topic/xxxx",
  23. "msg": "msg to be send"
  24. }
  25. '''
  26. # ================================
  27. # 全局对象
  28. # ================================
  29. executor = ThreadPoolExecutor(max_workers=8)
  30. mqtt_queue = Queue() # 消息队列
  31. shutting_down = False
  32. import atexit
  33. atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True))
  34. # ================================
  35. # 辅助函数
  36. # ================================
  37. def check_topic(pattern:str, topic:str) -> bool:
  38. return bool(re.match(pattern, topic))
  39. # ================================
  40. # 消费者线程
  41. # ================================
  42. class MQTTConsumerThread(threading.Thread):
  43. def __init__(self):
  44. super().__init__(name= "MQTTConsumerThread")
  45. self.running = True
  46. def run(self):
  47. global shutting_down
  48. while self.running:
  49. try:
  50. msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg)
  51. except Empty:
  52. if shutting_down:
  53. break
  54. continue
  55. client, userdata, msg = msg_tuple
  56. if shutting_down:
  57. break
  58. try:
  59. executor.submit(mqtt_recv.process_message, client, userdata, msg)
  60. except RuntimeError:
  61. # 线程池已关闭,忽略
  62. break
  63. def on_message(client, userdata, msg):
  64. if not shutting_down:
  65. mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理
  66. # ================================
  67. # MQTT 线程类
  68. # ================================
  69. class MQTTClientThread(threading.Thread):
  70. def __init__(self,):
  71. super().__init__(name= "MQTTClientThread")
  72. with g_config.g_sys_conf_mtx:
  73. self.mqtt_ = g_config.g_sys_conf["mqtt"]
  74. self.client:mqtt.Client = mqtt.Client()
  75. self.publish_status = {}
  76. self.client.username_pw_set(self.mqtt_["username"], self.mqtt_["password"])
  77. self.client.on_connect = self.on_connect
  78. self.client.on_disconnect = self.on_disconnect
  79. self.client.on_publish = self.on_publish
  80. self.client.on_message = on_message
  81. self.running = True
  82. self.reconnect_delay = 5 # 重连间隔 秒
  83. self.executor = ThreadPoolExecutor(max_workers=4) # 异步发送线程池
  84. # ================================
  85. # MQTT 回调
  86. # ================================
  87. def on_connect(self, client:mqtt.Client, userdata, flags, rc):
  88. if rc == 0:
  89. LOGINFO("MQTT Connected successfully!")
  90. client.subscribe(TOPICS.dev_tracker_targets)
  91. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  92. client.subscribe(TOPICS.las_test, qos=2)
  93. else:
  94. LOGERR(f"MQTT failed to connect, return code {rc}")
  95. def on_disconnect(self, client, userdata, rc):
  96. if shutting_down:
  97. return
  98. LOGERR(f"MQTT disconnected (rc={rc})")
  99. # 循环尝试重连
  100. while not shutting_down and self.running:
  101. try:
  102. LOGINFO("Trying to reconnect to MQTT broker...")
  103. client.reconnect()
  104. LOGINFO("MQTT reconnected successfully!")
  105. break
  106. except Exception as e:
  107. LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s")
  108. time.sleep(self.reconnect_delay)
  109. def on_publish(self, client, userdata, mid):
  110. self.publish_status[mid] = "success"
  111. LOGINFO(f"Message {mid} published successfully")
  112. def send_msg(self, mqtt_msg: dict):
  113. mqtt_send_que.put(mqtt_msg) # 放入内部可靠队列
  114. # 异步发送消息
  115. def async_publish(self, mqtt_msg: dict):
  116. try:
  117. topic = mqtt_msg["topic"]
  118. msg = mqtt_msg["msg"]
  119. qos = mqtt_msg.get("qos", 0)
  120. info = self.client.publish(topic, msg, qos=qos)
  121. if info.rc == 0:
  122. self.publish_status[info.mid] = "pending"
  123. if qos == 0:
  124. self.publish_status[info.mid] = "success"
  125. else:
  126. LOGERR(f"MQTT publish failed (rc={info.rc}), topic: {topic}")
  127. mqtt_send_que.put(mqtt_msg) # 重试
  128. except Exception as e:
  129. LOGERR(f"MQTT send error: {e}, topic: {topic}")
  130. mqtt_send_que.put(mqtt_msg) # 重试
  131. def send_msg_to_mqtt(self):
  132. while self.running and not shutting_down:
  133. try:
  134. mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
  135. except Empty:
  136. continue
  137. try:
  138. topic = mqtt_msg["topic"]
  139. msg = mqtt_msg["msg"]
  140. qos = mqtt_msg.get("qos", 0)
  141. info = self.client.publish(topic, msg, qos=qos)
  142. if info.rc == 0:
  143. self.publish_status[info.mid] = "pending"
  144. if qos == 0:
  145. self.publish_status[info.mid] = "success"
  146. else:
  147. LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  148. mqtt_send_que.put(mqtt_msg) # 重试
  149. except Exception as e:
  150. LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  151. mqtt_send_que.put(mqtt_msg) # 重试
  152. def run(self):
  153. global shutting_down
  154. try:
  155. LOGINFO("Connecting to MQTT broker...")
  156. self.client.connect(self.mqtt_["broker"], self.mqtt_["port"], 60)
  157. self.client.loop_start()
  158. while self.running and not shutting_down:
  159. try:
  160. mqtt_msg = mqtt_send_que.get(timeout=0.1)
  161. # 使用线程池异步发送
  162. self.executor.submit(self.async_publish, mqtt_msg)
  163. except Empty:
  164. continue
  165. except Exception as e:
  166. LOGERR(f"MQTT thread encountered an error: {e}")
  167. finally:
  168. self.running = False
  169. self.executor.shutdown(wait=True)
  170. self.client.loop_stop()
  171. self.client.disconnect()
  172. shutting_down = True
  173. LOGINFO("MQTT thread exited")
  174. # 可靠顺序发送、支持断线重连、安全退出
  175. class RobustMQTTClient(threading.Thread):
  176. def __init__(self, max_workers=4):
  177. super().__init__(name="RobustMQTTClient")
  178. with g_config.g_sys_conf_mtx:
  179. self.mqtt_ = g_config.g_sys_conf["mqtt"]
  180. client_id: str = str(g_config.g_sys_conf["sp_id"])
  181. self.client: mqtt.Client = mqtt.Client(client_id= client_id)
  182. self.client.username_pw_set(self.mqtt_["username"], self.mqtt_["password"])
  183. self.client.on_connect = self.on_connect
  184. self.client.on_disconnect = self.on_disconnect
  185. self.client.on_publish = self.on_publish
  186. self.client.on_message = on_message
  187. self.running = True
  188. self.reconnect_delay = 5 # 重连间隔
  189. self.publish_status = {} # mid -> 状态
  190. self.send_queue = Queue() # 内部可靠顺序队列
  191. self.executor = ThreadPoolExecutor(max_workers=max_workers)
  192. self.connected = threading.Event() # 用于标记 MQTT 是否已连接
  193. # =========================
  194. # MQTT 回调
  195. # =========================
  196. def on_connect(self, client, userdata, flags, rc):
  197. if rc == 0:
  198. LOGINFO("MQTT connected successfully!")
  199. client.subscribe(TOPICS.dev_tracker_targets)
  200. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  201. client.subscribe(TOPICS.las_test, qos=2)
  202. client.subscribe(TOPICS.las_get_debug_info, qos=0)
  203. self.connected.set()
  204. else:
  205. LOGERR(f"MQTT failed to connect, rc={rc}")
  206. self.connected.clear()
  207. def on_disconnect(self, client, userdata, rc):
  208. LOGERR(f"MQTT disconnected (rc={rc})")
  209. self.connected.clear()
  210. if shutting_down:
  211. return
  212. LOGERR(f"MQTT disconnected (rc={rc})")
  213. while not shutting_down and self.running:
  214. try:
  215. LOGINFO("Trying to reconnect to MQTT broker...")
  216. client.reconnect()
  217. LOGINFO("MQTT reconnected successfully!")
  218. self.connected.set()
  219. break
  220. except Exception as e:
  221. LOGERR(f"Reconnect failed: {e}, retry in {self.reconnect_delay}s")
  222. time.sleep(self.reconnect_delay)
  223. def on_publish(self, client, userdata, mid):
  224. self.publish_status[mid] = "success"
  225. LOGINFO(f"Message {mid} published successfully")
  226. # =========================
  227. # 安全发送单条消息
  228. # =========================
  229. def _send_message(self, mqtt_msg: dict):
  230. while self.running and not shutting_down:
  231. try:
  232. # 等待连接成功再发送
  233. if not self.connected.wait(timeout=1):
  234. continue
  235. topic = mqtt_msg["topic"]
  236. payload = mqtt_msg["msg"]
  237. qos = mqtt_msg.get("qos", 0)
  238. info = self.client.publish(topic, payload, qos=qos)
  239. if info.rc != 0:
  240. LOGERR(f"MQTT publish failed (rc={info.rc}), topic={topic}, retrying")
  241. time.sleep(1)
  242. continue
  243. self.publish_status[info.mid] = "pending"
  244. while self.publish_status.get(info.mid) != "success" and not shutting_down:
  245. time.sleep(0.01)
  246. break
  247. except Exception as e:
  248. LOGERR(f"MQTT send error: {e}, topic={mqtt_msg.get('topic')}, retrying")
  249. time.sleep(1)
  250. # =========================
  251. # 外部接口:发送消息
  252. # =========================
  253. def send_msg(self, mqtt_msg: dict):
  254. self.send_queue.put(mqtt_msg) # 放入内部可靠队列
  255. # =========================
  256. # 队列处理循环
  257. # =========================
  258. def _process_queue(self):
  259. while self.running and not shutting_down:
  260. try:
  261. mqtt_msg = self.send_queue.get(timeout=0.1)
  262. # 提交线程池异步发送
  263. self.executor.submit(self._send_message, mqtt_msg)
  264. except Empty:
  265. continue
  266. # =========================
  267. # 主线程
  268. # =========================
  269. def run(self):
  270. global shutting_down
  271. try:
  272. LOGINFO("Connecting to MQTT broker...")
  273. self.client.connect(self.mqtt_["broker"], self.mqtt_["port"], 60)
  274. self.client.loop_start()
  275. self._process_queue()
  276. except Exception as e:
  277. LOGERR(f"MQTT thread encountered an error: {e}")
  278. finally:
  279. self.running = False
  280. self.executor.shutdown(wait=True)
  281. self.client.loop_stop()
  282. self.client.disconnect()
  283. shutting_down = True
  284. LOGINFO("MQTT client exited")
  285. g_mqtt_client: RobustMQTTClient = None
  286. g_mqtt_consumer: MQTTConsumerThread = None
  287. # ================================
  288. # 退出信号处理
  289. # ================================
  290. def signal_handler(sig, frame):
  291. global shutting_down
  292. LOGINFO("Exiting... shutting down MQTT and thread pool")
  293. shutting_down = True
  294. g_mqtt_client.running = False
  295. g_mqtt_client.client.loop_stop()
  296. g_mqtt_client.client.disconnect()
  297. g_mqtt_consumer.running = False
  298. executor.shutdown(wait=True)
  299. sys.exit(0)
  300. signal.signal(signal.SIGINT, signal_handler)
  301. signal.signal(signal.SIGTERM, signal_handler)