nifangxu 1 mēnesi atpakaļ
vecāks
revīzija
1c0b178741
4 mainītis faili ar 59 papildinājumiem un 8 dzēšanām
  1. 4 3
      mqtt/mqtt_process.py
  2. 48 0
      mqtt/mqtt_recv.py
  3. 1 1
      mqtt/mqtt_send.py
  4. 6 4
      mqtt/mqtt_topics.py

+ 4 - 3
mqtt/mqtt_process.py

@@ -84,12 +84,13 @@ class MQTTConsumerThread(threading.Thread):
 # ================================
 # MQTT 回调
 # ================================
-def on_connect(client, userdata, flags, rc):
+def on_connect(client:mqtt.Client, userdata, flags, rc):
     if rc == 0:
         LOGINFO("MQTT Connected successfully!")
         client.subscribe(TOPICS.dev_dsp_data)
-        client.subscribe(TOPICS.mps_all)
-        client.subscribe(TOPICS.das_all)
+        # client.subscribe(TOPICS.das_all)
+        # client.subscribe(TOPICS.mps_all)
+        client.subscribe(TOPICS.las_alarm_plan_update, qos=2)
     else:
         LOGERR(f"MQTT failed to connect, return code {rc}")
 

+ 48 - 0
mqtt/mqtt_recv.py

@@ -132,6 +132,32 @@ def deal_realtime_pos(msg:mqtt.MQTTMessage):
             LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
 
 
+
+
+# 告警计划更新
+def deal_alarm_plan_update(msg:mqtt.MQTTMessage):
+    try:
+        payload = json.loads(msg.payload.decode('utf-8'))
+        plan_uuid   = payload.get("plan_uuid")
+        operation   = payload.get("operation")
+
+        # todo
+        if operation == "update":
+            return
+        elif operation == "delete":
+            return
+
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
 # 设备消息分发处理:/dev/#
 def deal_dev_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage):
     try:
@@ -196,6 +222,26 @@ def deal_das_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage):
             LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
 
 
+# /las/#
+def deal_las_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage):
+    try:
+        topic = msg.topic
+        parts = msg.topic.split('/')
+
+        # 实时位置姿态
+        if (check_topic(topic,Topic_Pattern.las_alarm_plan_update)):
+            deal_alarm_plan_update(msg)
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
 # 处理接收的消息入口
 def process_message(client:mqtt.Client, userdata, msg:mqtt.MQTTMessage):
     topic = msg.topic
@@ -211,6 +257,8 @@ def process_message(client:mqtt.Client, userdata, msg:mqtt.MQTTMessage):
         deal_opc_msg(client, userdata, msg)
     elif (check_topic(topic,Topic_Pattern.das_all)):     # 设备接入
         deal_das_msg(client, userdata, msg)
+    elif (check_topic(topic,Topic_Pattern.las_all)):     # 只接受告警配置相关
+        deal_las_msg(client, userdata, msg)
     else:
         # LOGDBG(f"recv invalid topic: {msg.topic}")
         return

+ 1 - 1
mqtt/mqtt_send.py

@@ -69,7 +69,7 @@ def alarm_event(
     format_json["event_type"] = event_type
     format_json["info"] = info
     format_json["table"] = table
-    send_msg(TOPICS.las_alarm_event, format_json)
+    send_msg(TOPICS.las_alarm_event, format_json, 2)
 
 
 

+ 6 - 4
mqtt/mqtt_topics.py

@@ -46,8 +46,9 @@ class TOPICS(str):
     opc_set_alarm_param = "/opc/set_alarm_param"            # 设置告警参数
 
     # 告警联动服务
-    las_all             = "/las/#" 
-    las_alarm_event     = "/las/alarm_event"                # 上报告警事件
+    las_all                 = "/las/#"
+    las_alarm_plan_update   = "/las/alarm_plan_update"      # 告警计划变更
+    las_alarm_event         = "/las/alarm_event"            # 上报告警事件
 
 
 # topic匹配规则
@@ -103,5 +104,6 @@ class Topic_Pattern(str):
     opc_set_alarm_param = r"^/opc/set_alarm_param$"             # 设置告警参数
 
     # 告警联动服务
-    las_all             = r"^/las/.*$"
-    las_alarm_event = r"^/las/alarm_eventr$"                # 上报告警事件
+    las_all                 = r"^/las/.*$"
+    las_alarm_plan_update   = r"^/las/alarm_plan_update$"       # 告警计划变更
+    las_alarm_event         = r"^/las/alarm_eventr$"            # 上报告警事件