mqtt_process.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. '''
  2. 处理mqtt消息相关
  3. '''
  4. import paho.mqtt.client as mqtt
  5. import time
  6. import threading
  7. import re
  8. import queue
  9. import sys
  10. from queue import Queue, Empty
  11. from concurrent.futures import ThreadPoolExecutor
  12. import signal
  13. import common.sys_comm as sys_comm
  14. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  15. from mqtt.mqtt_topics import TOPICS, Topic_Pattern
  16. import mqtt.mqtt_recv as mqtt_recv
  17. mqtt_send_que = queue.Queue() # 发送队列
  18. # 格式如下
  19. '''
  20. {
  21. "topic": "topic/xxxx",
  22. "msg": "msg to be send"
  23. }
  24. '''
  25. # ================================
  26. # MQTT 配置
  27. # ================================
  28. MQTT_BROKER = "119.45.12.173" # MQTT BROKER 地址
  29. MQTT_PORT = 1883 # MQTT 端口
  30. MQTT_USERNAME = "lnradar" # MQTT 用户名
  31. MQTT_PASSWD = "lnradar" # MQTT 密码
  32. # ================================
  33. # 全局对象
  34. # ================================
  35. executor = ThreadPoolExecutor(max_workers=8)
  36. mqtt_queue = Queue() # 消息队列
  37. shutting_down = False
  38. import atexit
  39. atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True))
  40. # ================================
  41. # 辅助函数
  42. # ================================
  43. def check_topic(pattern:str, topic:str) -> bool:
  44. return bool(re.match(pattern, topic))
  45. # ================================
  46. # 消费者线程
  47. # ================================
  48. class MQTTConsumerThread(threading.Thread):
  49. def __init__(self):
  50. super().__init__(name= "MQTTConsumerThread")
  51. self.running = True
  52. def run(self):
  53. global shutting_down
  54. while self.running:
  55. try:
  56. msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg)
  57. except Empty:
  58. if shutting_down:
  59. break
  60. continue
  61. client, userdata, msg = msg_tuple
  62. if shutting_down:
  63. break
  64. try:
  65. executor.submit(mqtt_recv.process_message, client, userdata, msg)
  66. except RuntimeError:
  67. # 线程池已关闭,忽略
  68. break
  69. def on_message(client, userdata, msg):
  70. if not shutting_down:
  71. mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理
  72. # ================================
  73. # MQTT 线程类
  74. # ================================
  75. class MQTTClientThread(threading.Thread):
  76. def __init__(self,):
  77. super().__init__(name= "MQTTClientThread")
  78. self.client:mqtt.Client = mqtt.Client()
  79. self.publish_status = {}
  80. self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
  81. self.client.on_connect = self.on_connect
  82. self.client.on_disconnect = self.on_disconnect
  83. self.client.on_publish = self.on_publish
  84. self.client.on_message = on_message
  85. self.running = True
  86. self.reconnect_delay = 5 # 重连间隔 秒
  87. self.executor = ThreadPoolExecutor(max_workers=4) # 异步发送线程池
  88. # ================================
  89. # MQTT 回调
  90. # ================================
  91. def on_connect(self, client:mqtt.Client, userdata, flags, rc):
  92. if rc == 0:
  93. LOGINFO("MQTT Connected successfully!")
  94. client.subscribe(TOPICS.dev_tracker_targets)
  95. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  96. client.subscribe(TOPICS.las_test, qos=2)
  97. else:
  98. LOGERR(f"MQTT failed to connect, return code {rc}")
  99. def on_disconnect(self, client, userdata, rc):
  100. if shutting_down:
  101. return
  102. LOGERR(f"MQTT disconnected (rc={rc})")
  103. # 循环尝试重连
  104. while not shutting_down and self.running:
  105. try:
  106. LOGINFO("Trying to reconnect to MQTT broker...")
  107. client.reconnect()
  108. LOGINFO("MQTT reconnected successfully!")
  109. break
  110. except Exception as e:
  111. LOGERR(f"MQTT reconnect failed: {e}, retry in {self.reconnect_delay}s")
  112. time.sleep(self.reconnect_delay)
  113. def on_publish(self, client, userdata, mid):
  114. self.publish_status[mid] = "success"
  115. LOGINFO(f"Message {mid} published successfully")
  116. def send_msg(self, mqtt_msg: dict):
  117. mqtt_send_que.put(mqtt_msg) # 放入内部可靠队列
  118. # 异步发送消息
  119. def async_publish(self, mqtt_msg: dict):
  120. try:
  121. topic = mqtt_msg["topic"]
  122. msg = mqtt_msg["msg"]
  123. qos = mqtt_msg.get("qos", 0)
  124. info = self.client.publish(topic, msg, qos=qos)
  125. if info.rc == 0:
  126. self.publish_status[info.mid] = "pending"
  127. if qos == 0:
  128. self.publish_status[info.mid] = "success"
  129. else:
  130. LOGERR(f"MQTT publish failed (rc={info.rc}), topic: {topic}")
  131. mqtt_send_que.put(mqtt_msg) # 重试
  132. except Exception as e:
  133. LOGERR(f"MQTT send error: {e}, topic: {topic}")
  134. mqtt_send_que.put(mqtt_msg) # 重试
  135. def send_msg_to_mqtt(self):
  136. while self.running and not shutting_down:
  137. try:
  138. mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
  139. except Empty:
  140. continue
  141. try:
  142. topic = mqtt_msg["topic"]
  143. msg = mqtt_msg["msg"]
  144. qos = mqtt_msg.get("qos", 0)
  145. info = self.client.publish(topic, msg, qos=qos)
  146. if info.rc == 0:
  147. self.publish_status[info.mid] = "pending"
  148. if qos == 0:
  149. self.publish_status[info.mid] = "success"
  150. else:
  151. LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  152. mqtt_send_que.put(mqtt_msg) # 重试
  153. except Exception as e:
  154. LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  155. mqtt_send_que.put(mqtt_msg) # 重试
  156. def run(self):
  157. global shutting_down
  158. try:
  159. LOGINFO("Connecting to MQTT broker...")
  160. self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
  161. self.client.loop_start()
  162. while self.running and not shutting_down:
  163. try:
  164. mqtt_msg = mqtt_send_que.get(timeout=0.1)
  165. # 使用线程池异步发送
  166. self.executor.submit(self.async_publish, mqtt_msg)
  167. except Empty:
  168. continue
  169. except Exception as e:
  170. LOGERR(f"MQTT thread encountered an error: {e}")
  171. finally:
  172. self.running = False
  173. self.executor.shutdown(wait=True)
  174. self.client.loop_stop()
  175. self.client.disconnect()
  176. shutting_down = True
  177. LOGINFO("MQTT thread exited")
  178. # 可靠顺序发送、支持断线重连、安全退出
  179. class RobustMQTTClient(threading.Thread):
  180. def __init__(self, max_workers=4):
  181. super().__init__(name="RobustMQTTClient")
  182. client_id: str = str(sys_comm.g_sys_conf["sp_id"])
  183. self.client: mqtt.Client = mqtt.Client(client_id= client_id)
  184. self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
  185. self.client.on_connect = self.on_connect
  186. self.client.on_disconnect = self.on_disconnect
  187. self.client.on_publish = self.on_publish
  188. self.client.on_message = on_message
  189. self.running = True
  190. self.reconnect_delay = 5 # 重连间隔
  191. self.publish_status = {} # mid -> 状态
  192. self.send_queue = Queue() # 内部可靠顺序队列
  193. self.executor = ThreadPoolExecutor(max_workers=max_workers)
  194. self.connected = threading.Event() # 用于标记 MQTT 是否已连接
  195. # =========================
  196. # MQTT 回调
  197. # =========================
  198. def on_connect(self, client, userdata, flags, rc):
  199. if rc == 0:
  200. LOGINFO("MQTT connected successfully!")
  201. client.subscribe(TOPICS.dev_tracker_targets)
  202. client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
  203. client.subscribe(TOPICS.las_test, qos=2)
  204. client.subscribe(TOPICS.las_get_debug_info, qos=0)
  205. self.connected.set()
  206. else:
  207. LOGERR(f"MQTT failed to connect, rc={rc}")
  208. self.connected.clear()
  209. def on_disconnect(self, client, userdata, rc):
  210. LOGERR(f"MQTT disconnected (rc={rc})")
  211. self.connected.clear()
  212. if shutting_down:
  213. return
  214. LOGERR(f"MQTT disconnected (rc={rc})")
  215. while not shutting_down and self.running:
  216. try:
  217. LOGINFO("Trying to reconnect to MQTT broker...")
  218. client.reconnect()
  219. LOGINFO("MQTT reconnected successfully!")
  220. self.connected.set()
  221. break
  222. except Exception as e:
  223. LOGERR(f"Reconnect failed: {e}, retry in {self.reconnect_delay}s")
  224. time.sleep(self.reconnect_delay)
  225. def on_publish(self, client, userdata, mid):
  226. self.publish_status[mid] = "success"
  227. LOGINFO(f"Message {mid} published successfully")
  228. # =========================
  229. # 安全发送单条消息
  230. # =========================
  231. def _send_message(self, mqtt_msg: dict):
  232. while self.running and not shutting_down:
  233. try:
  234. # 等待连接成功再发送
  235. if not self.connected.wait(timeout=1):
  236. continue
  237. topic = mqtt_msg["topic"]
  238. payload = mqtt_msg["msg"]
  239. qos = mqtt_msg.get("qos", 0)
  240. info = self.client.publish(topic, payload, qos=qos)
  241. if info.rc != 0:
  242. LOGERR(f"MQTT publish failed (rc={info.rc}), topic={topic}, retrying")
  243. time.sleep(1)
  244. continue
  245. self.publish_status[info.mid] = "pending"
  246. while self.publish_status.get(info.mid) != "success" and not shutting_down:
  247. time.sleep(0.01)
  248. break
  249. except Exception as e:
  250. LOGERR(f"MQTT send error: {e}, topic={mqtt_msg.get('topic')}, retrying")
  251. time.sleep(1)
  252. # =========================
  253. # 外部接口:发送消息
  254. # =========================
  255. def send_msg(self, mqtt_msg: dict):
  256. self.send_queue.put(mqtt_msg) # 放入内部可靠队列
  257. # =========================
  258. # 队列处理循环
  259. # =========================
  260. def _process_queue(self):
  261. while self.running and not shutting_down:
  262. try:
  263. mqtt_msg = self.send_queue.get(timeout=0.1)
  264. # 提交线程池异步发送
  265. self.executor.submit(self._send_message, mqtt_msg)
  266. except Empty:
  267. continue
  268. # =========================
  269. # 主线程
  270. # =========================
  271. def run(self):
  272. global shutting_down
  273. try:
  274. LOGINFO("Connecting to MQTT broker...")
  275. self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
  276. self.client.loop_start()
  277. self._process_queue()
  278. except Exception as e:
  279. LOGERR(f"MQTT thread encountered an error: {e}")
  280. finally:
  281. self.running = False
  282. self.executor.shutdown(wait=True)
  283. self.client.loop_stop()
  284. self.client.disconnect()
  285. shutting_down = True
  286. LOGINFO("MQTT client exited")
  287. g_mqtt_client: RobustMQTTClient = None
  288. g_mqtt_consumer: MQTTConsumerThread = None
  289. # ================================
  290. # 退出信号处理
  291. # ================================
  292. def signal_handler(sig, frame):
  293. global shutting_down
  294. LOGINFO("Exiting... shutting down MQTT and thread pool")
  295. shutting_down = True
  296. g_mqtt_client.running = False
  297. g_mqtt_client.client.loop_stop()
  298. g_mqtt_client.client.disconnect()
  299. g_mqtt_consumer.running = False
  300. executor.shutdown(wait=True)
  301. sys.exit(0)
  302. signal.signal(signal.SIGINT, signal_handler)
  303. signal.signal(signal.SIGTERM, signal_handler)