mqtt_process.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. '''
  2. 处理mqtt消息相关
  3. '''
  4. import paho.mqtt.client as mqtt
  5. import time
  6. import threading
  7. import re
  8. import queue
  9. import sys
  10. from queue import Queue, Empty
  11. from concurrent.futures import ThreadPoolExecutor
  12. import signal
  13. import common.sys_comm as sys_comm
  14. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  15. from mqtt.mqtt_topics import TOPICS, Topic_Pattern
  16. import mqtt.mqtt_recv as mqtt_recv
  17. import mqtt.mqtt_send as mqtt_send
  18. from mqtt.mqtt_send import mqtt_send_que # 发送队列
  19. # 格式如下
  20. '''
  21. {
  22. "topic": "topic/xxxx",
  23. "msg": "msg to be send"
  24. }
  25. '''
  26. # ================================
  27. # MQTT 配置
  28. # ================================
  29. MQTT_BROKER = "119.45.12.173" # MQTT BROKER 地址
  30. MQTT_PORT = 1883 # MQTT 端口
  31. MQTT_USERNAME = "lnradar" # MQTT 用户名
  32. MQTT_PASSWD = "lnradar" # MQTT 密码
  33. # ================================
  34. # 全局对象
  35. # ================================
  36. executor = ThreadPoolExecutor(max_workers=8)
  37. mqtt_queue = Queue() # 消息队列
  38. shutting_down = False
  39. import atexit
  40. atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True))
  41. # ================================
  42. # 辅助函数
  43. # ================================
  44. def check_topic(pattern:str, topic:str) -> bool:
  45. return bool(re.match(pattern, topic))
  46. # ================================
  47. # 消费者线程
  48. # ================================
  49. class MQTTConsumerThread(threading.Thread):
  50. def __init__(self):
  51. super().__init__()
  52. self.running = True
  53. def run(self):
  54. global shutting_down
  55. while self.running:
  56. try:
  57. msg_tuple = mqtt_queue.get(timeout=0.1) # (client, userdata, msg)
  58. except Empty:
  59. if shutting_down:
  60. break
  61. continue
  62. client, userdata, msg = msg_tuple
  63. if shutting_down:
  64. break
  65. try:
  66. executor.submit(mqtt_recv.process_message, client, userdata, msg)
  67. except RuntimeError:
  68. # 线程池已关闭,忽略
  69. break
  70. # ================================
  71. # MQTT 回调
  72. # ================================
  73. def on_connect(client, userdata, flags, rc):
  74. if rc == 0:
  75. LOGINFO("MQTT Connected successfully!")
  76. client.subscribe(TOPICS.dev_dsp_data)
  77. client.subscribe(TOPICS.mps_all)
  78. client.subscribe(TOPICS.das_all)
  79. else:
  80. LOGERR(f"MQTT failed to connect, return code {rc}")
  81. def on_message(client, userdata, msg):
  82. if not shutting_down:
  83. mqtt_queue.put((client, userdata, msg)) # 放入队列,由消费者线程处理
  84. # ================================
  85. # MQTT 线程类
  86. # ================================
  87. class MQTTClientThread(threading.Thread):
  88. def __init__(self,):
  89. threading.Thread.__init__(self)
  90. self.client:mqtt.Client = mqtt.Client()
  91. self.publish_status = {}
  92. self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
  93. self.client.on_connect = on_connect
  94. self.client.on_message = on_message
  95. self.client.on_publish = self.on_publish
  96. self.running = True
  97. def on_publish(self, client, userdata, mid):
  98. self.publish_status[mid] = "success"
  99. def send_msg_to_mqtt(self):
  100. while True:
  101. time.sleep(0.01)
  102. try:
  103. mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
  104. except Exception:
  105. if shutting_down:
  106. break
  107. continue
  108. try:
  109. topic = mqtt_msg["topic"]
  110. msg = mqtt_msg["msg"]
  111. qos = mqtt_msg.get("qos", 0)
  112. info = self.client.publish(topic, msg, qos=qos)
  113. if info.rc == 0:
  114. self.publish_status[info.mid] = "pending"
  115. if qos == 0:
  116. self.publish_status[info.mid] = "success"
  117. else:
  118. LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  119. except Exception as e:
  120. LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
  121. def run(self):
  122. global shutting_down
  123. try:
  124. self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
  125. LOGINFO("Connecting to MQTT broker...")
  126. self.client.loop_start()
  127. self.send_msg_to_mqtt()
  128. except Exception as e:
  129. LOGERR(f"MQTT thread encountered an error: {e}")
  130. finally:
  131. self.running = False
  132. self.client.loop_stop()
  133. self.client.disconnect()
  134. shutting_down = True
  135. mqtt_client: MQTTClientThread = None
  136. mqtt_consumer: MQTTConsumerThread = None
  137. # ================================
  138. # 退出信号处理
  139. # ================================
  140. def signal_handler(sig, frame):
  141. global shutting_down
  142. LOGINFO("Exiting... shutting down MQTT and thread pool")
  143. shutting_down = True
  144. mqtt_client.running = False
  145. mqtt_client.client.loop_stop()
  146. mqtt_client.client.disconnect()
  147. mqtt_consumer.running = False
  148. executor.shutdown(wait=True)
  149. sys.exit(0)
  150. signal.signal(signal.SIGINT, signal_handler)
  151. signal.signal(signal.SIGTERM, signal_handler)