ソースを参照

修复bug:RobustMQTTClient发送消息失败

nifangxu 1 ヶ月 前
コミット
d7b2038ca0
4 ファイル変更44 行追加12 行削除
  1. 1 2
      LAS.py
  2. 36 9
      core/alarm_plan.py
  3. 6 1
      mqtt/mqtt_process.py
  4. 1 0
      mqtt/mqtt_send.py

+ 1 - 2
LAS.py

@@ -45,7 +45,6 @@ from core.alarm_plan_dispatcher import (
 import core.g_LAS as g_las
 
 
-
 # 系统初始化
 def sys_init():
     try:
@@ -113,7 +112,7 @@ def sys_init():
                 sys_comm.g_sys_conf["host_ip"]     = str(config["linux"]["host_ip"])
                 mp.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"]
 
-        sys_comm.g_sys_conf["sp_id"] = int(get_utc_time_ms())
+        sys_comm.g_sys_conf["sp_id"] = sys_comm.g_sys_conf["module_name"] + "_" + str(get_utc_time_ms())
 
         # 报警配置
         sys_comm.g_sys_conf["alarm_conf"] = sys_comm.alarm_conf

+ 36 - 9
core/alarm_plan.py

@@ -434,6 +434,9 @@ class AlarmPlan:
                 "stay_time": stay_time
             }
             event_uuid = str(uuid.uuid4())
+            remark = {
+                "sp_id": sys_comm.g_sys_conf["sp_id"]
+            }
             params = {
                 "dev_id": dev_id,
                 "uuid": event_uuid,
@@ -445,7 +448,7 @@ class AlarmPlan:
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
                 "tenant_id": self.tenant_id_,
-                "remark": json.dumps({}, ensure_ascii=False) 
+                "remark": json.dumps(remark) 
             }
             db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -493,6 +496,9 @@ class AlarmPlan:
                 "stay_time": stay_time
             }
             event_uuid = str(uuid.uuid4())
+            remark = {
+                "sp_id": sys_comm.g_sys_conf["sp_id"]
+            }
             params = {
                 "dev_id": dev_id,
                 "uuid": event_uuid,
@@ -504,7 +510,7 @@ class AlarmPlan:
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
                 "tenant_id": self.tenant_id_,
-                "remark": json.dumps({}, ensure_ascii=False) 
+                "remark": json.dumps(remark) 
             }
             db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -552,6 +558,9 @@ class AlarmPlan:
                 "stay_time": stay_time
             }
             event_uuid = str(uuid.uuid4())
+            remark = {
+                "sp_id": sys_comm.g_sys_conf["sp_id"]
+            }
             params = {
                 "dev_id": dev_id,
                 "uuid": event_uuid,
@@ -563,7 +572,7 @@ class AlarmPlan:
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
                 "tenant_id": self.tenant_id_,
-                "remark": json.dumps({}, ensure_ascii=False) 
+                "remark": json.dumps(remark) 
             }
             db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -786,6 +795,9 @@ class AlarmPlan:
                 "absence_time": absence_time
             }
             event_uuid = str(uuid.uuid4())
+            remark = {
+                "sp_id": sys_comm.g_sys_conf["sp_id"]
+            }
             params = {
                 "dev_id": dev_id,
                 "uuid": event_uuid,
@@ -797,7 +809,7 @@ class AlarmPlan:
                 "create_time": get_bj_time_s(),
                 "is_deleted": 0,
                 "tenant_id": self.tenant_id_,
-                "remark": json.dumps({}, ensure_ascii=False) 
+                "remark": json.dumps(remark) 
             }
             db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -871,6 +883,9 @@ class AlarmPlan:
 
                 # 入库
                 event_uuid = str(uuid.uuid4())
+                remark = {
+                    "sp_id": sys_comm.g_sys_conf["sp_id"]
+                }
                 params = {
                     "dev_id": dev_id,
                     "uuid": this_event_uuid,
@@ -882,7 +897,7 @@ class AlarmPlan:
                     "create_time": get_bj_time_s(),
                     "is_deleted": 0,
                     "tenant_id": self.tenant_id_,
-                    "remark": json.dumps({}, ensure_ascii=False) 
+                    "remark": json.dumps(remark) 
                 }
                 db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -938,6 +953,9 @@ class AlarmPlan:
 
                 # 入库
                 event_uuid = str(uuid.uuid4())
+                remark = {
+                    "sp_id": sys_comm.g_sys_conf["sp_id"]
+                }
                 params = {
                     "dev_id": dev_id,
                     "uuid": this_event_uuid,
@@ -949,7 +967,7 @@ class AlarmPlan:
                     "create_time": get_bj_time_s(),
                     "is_deleted": 0,
                     "tenant_id": self.tenant_id_,
-                    "remark": json.dumps({}, ensure_ascii=False) 
+                    "remark": json.dumps(remark) 
                 }
                 db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -1008,6 +1026,9 @@ class AlarmPlan:
 
                 # 入库
                 event_uuid = str(uuid.uuid4())
+                remark = {
+                    "sp_id": sys_comm.g_sys_conf["sp_id"]
+                }
                 params = {
                     "dev_id": dev_id,
                     "uuid": this_event_uuid,
@@ -1019,7 +1040,7 @@ class AlarmPlan:
                     "create_time": get_bj_time_s(),
                     "is_deleted": 0,
                     "tenant_id": self.tenant_id_,
-                    "remark": json.dumps({}, ensure_ascii=False) 
+                    "remark": json.dumps(remark) 
                 }
                 db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -1078,6 +1099,9 @@ class AlarmPlan:
 
                 # 入库
                 event_uuid = str(uuid.uuid4())
+                remark = {
+                    "sp_id": sys_comm.g_sys_conf["sp_id"]
+                }
                 params = {
                     "dev_id": dev_id,
                     "uuid": this_event_uuid,
@@ -1089,7 +1113,7 @@ class AlarmPlan:
                     "create_time": get_bj_time_s(),
                     "is_deleted": 0,
                     "tenant_id": self.tenant_id_,
-                    "remark": json.dumps({}, ensure_ascii=False) 
+                    "remark": json.dumps(remark) 
                 }
                 db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 
@@ -1145,6 +1169,9 @@ class AlarmPlan:
 
                 # 入库
                 event_uuid = str(uuid.uuid4())
+                remark = {
+                    "sp_id": sys_comm.g_sys_conf["sp_id"]
+                }
                 params = {
                     "dev_id": dev_id,
                     "uuid": this_event_uuid,
@@ -1156,7 +1183,7 @@ class AlarmPlan:
                     "create_time": get_bj_time_s(),
                     "is_deleted": 0,
                     "tenant_id": self.tenant_id_,
-                    "remark": json.dumps({}, ensure_ascii=False) 
+                    "remark": json.dumps(remark) 
                 }
                 db_req_que.put(DBRequest_Async(sql=sqls.sql_insert_events, params=params, callback=None))
 

+ 6 - 1
mqtt/mqtt_process.py

@@ -136,6 +136,9 @@ class MQTTClientThread(threading.Thread):
         self.publish_status[mid] = "success"
         LOGINFO(f"Message {mid} published successfully")
 
+    def send_msg(self, mqtt_msg: dict):
+        mqtt_send_que.put(mqtt_msg)  # 放入内部可靠队列
+
     # 异步发送消息
     def async_publish(self, mqtt_msg: dict):
         try:
@@ -209,7 +212,7 @@ class MQTTClientThread(threading.Thread):
 class RobustMQTTClient(threading.Thread):
     def __init__(self, max_workers=4):
         super().__init__(name="RobustMQTTClient")
-        client_id: str = "LAS_" + str(sys_comm.g_sys_conf["sp_id"])
+        client_id: str = str(sys_comm.g_sys_conf["sp_id"])
         self.client: mqtt.Client = mqtt.Client(client_id= client_id)
         self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
         self.client.on_connect = self.on_connect
@@ -233,8 +236,10 @@ class RobustMQTTClient(threading.Thread):
             client.subscribe(TOPICS.dev_tracker_targets)
             client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
             client.subscribe(TOPICS.las_test, qos=2)
+            self.connected.set()
         else:
             LOGERR(f"MQTT failed to connect, rc={rc}")
+            self.connected.clear()
 
     def on_disconnect(self, client, userdata, rc):
         LOGERR(f"MQTT disconnected (rc={rc})")

+ 1 - 0
mqtt/mqtt_send.py

@@ -46,6 +46,7 @@ def send_msg(topic:str, format_json:dict, qos:int=0):
             "qos":      qos
         }
         mp.g_mqtt_client.send_msg(mqtt_msg)
+        LOGINFO(f"send_msg, {topic}")
 
     except Exception as e:
         LOGERR(f"send_msg error: {e}")