''' 处理mqtt接收的消息 ''' import paho.mqtt.client as mqtt import queue import json import traceback import re import common.sys_comm as sys_comm from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from common.sys_comm import get_tracker_targets, get_utc_time_ms, get_utc_time_s from common.sys_comm import POSE_E, DEV_EC import db.db_process as db_process from db.db_process import db_req_que from db.db_process import DBRequest_Async from mqtt.mqtt_topics import Topic_Pattern import mqtt.mqtt_send as mqtt_send import device.dev_mng as g_Dev from device.dev_mng import ( Device, ) from core.alarm_plan import AlarmPlan import core.g_LAS as g_las # 数据队列 raw_points_que_forpost = queue.Queue() # 实时姿态队列 ''' { "dev_id": "xxxxxx", "raw_points": [...] } ''' # 检查topic def check_topic(topic:str, pattern:str) -> bool: topic = topic.strip() return bool(re.match(pattern, topic)) """ { "timestamp": 1727323744093, "pose": 2, "target_point": [ [ 0.15537149991307939, -0.17245136840002878, 0.5702038151877267, 1 ], [ 0.15537149991307939, -0.17245136840002878, 0.5702038151877267, 2 ] ] } """ # 处理来自设备的实时消息: /dev/{device_id}/dsp_data def deal_dsp_data(msg:mqtt.MQTTMessage): try: parts = msg.topic.split('/') dev_id = parts[2] device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not g_Dev.g_dev_mgr.find_dev_map(dev_id): device = Device(dev_id) g_Dev.g_dev_mgr.push_dev_map(dev_id, device) payload = json.loads(msg.payload.decode('utf-8')) # 处理 target if ("tracker_targets" in payload): tracker_targets = payload["tracker_targets"] timestamp = get_utc_time_s() pose = POSE_E.POSE_4.value rtd_unit = { "timestamp": timestamp, "pose": pose, "target_point": tracker_targets } device.put_rtd_unit(rtd_unit) device.update_keepalive(timestamp) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 处理来自设备的实时数据: /dev/{device_id}/tracker_targets def deal_tracker_targets(msg:mqtt.MQTTMessage): try: parts = msg.topic.split('/') dev_id = parts[2] device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not g_Dev.g_dev_mgr.find_dev_map(dev_id): device = Device(dev_id) g_Dev.g_dev_mgr.push_dev_map(dev_id, device) payload = json.loads(msg.payload.decode('utf-8')) # 处理 target if ("tracker_targets" in payload): timestamp = get_utc_time_s() tracker_targets = payload["tracker_targets"] breath_rpm: float = 0.000 if (("breath_rpm" in payload) and ("breath_rpm" in payload["health"])): breath_rpm = payload["health"]["breath_rpm"] pose = POSE_E.POSE_4.value rtd_unit = { "timestamp" : timestamp, "target_point" : tracker_targets, "breath_rpm" : breath_rpm, "pose" : pose } device.put_rtd_unit(rtd_unit) device.update_keepalive(timestamp) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 实时位置姿态 def deal_realtime_pos(msg:mqtt.MQTTMessage): try: payload = json.loads(msg.payload.decode('utf-8')) dev_id = payload.get("dev_id") device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id) if not g_Dev.g_dev_mgr.find_dev_map(dev_id): return timestamp = payload.get("timestamp") pose = payload.get("pose") target_point = payload.get("target_point", []) rtd_unit = { "timestamp": timestamp, "pose": pose, "target_point": target_point } device.put_rtd_unit(rtd_unit) return except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 告警计划变更 def deal_alarm_plan_update(msg:mqtt.MQTTMessage): try: payload = json.loads(msg.payload.decode('utf-8')) plan_uuid = payload.get("plan_uuid") operation = payload.get("operation") if operation == "update": LOGINFO(f"prepare to query plan: {plan_uuid}") g_las.g_alarm_plan_mgr.query_one_alarm_plan(plan_uuid) return elif operation == "delete": LOGINFO(f"prepare to query plan: {plan_uuid}") g_las.g_alarm_plan_mgr.remove_one_alarm_plan(plan_uuid) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 告警计划变更 def deal_las_test(msg:mqtt.MQTTMessage): try: mqtt_send.las_test_resp() except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 获取调试信息 def deal_get_debug_info(msg:mqtt.MQTTMessage): try: plans_info: list = [] plan: AlarmPlan = None plans = g_las.g_alarm_plan_mgr.list_all_plan() for plan in plans: item = { "plan_uuid" : plan.plan_uuid_, "name" : plan.name_, "dev_id" : plan.dev_id_, "dev_name" : plan.dev_name_, "enable" : plan.enable_, "event_type" : plan.event_type_, "threshold_time" : plan.threshold_time_, "rect" : plan.rect_, "rect" : plan.rect_, "param" : plan.param_ } plans_info.append(item) LOGINFO(f"send debug info ...") mqtt_send.las_debug_info(plans_info) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 设备消息分发处理:/dev/# def deal_dev_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage): try: topic = msg.topic parts = msg.topic.split('/') dev_id = parts[2] # 设备信息(注册) if (check_topic(topic,Topic_Pattern.dev_login)): return deal_dev_login(msg) # 设备心跳保活 elif (check_topic(topic,Topic_Pattern.dev_keepalive)): return deal_dev_keepalive(msg) # 设备实时数据 elif (check_topic(topic,Topic_Pattern.dev_dsp_data)): return deal_dsp_data(msg) # 设备实时数据 elif (check_topic(topic,Topic_Pattern.dev_tracker_targets)): deal_tracker_targets(msg) # 点云数据 elif (check_topic(topic,Topic_Pattern.dev_cloudpoint)): return deal_cloudpoint(msg) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 设备代理分发处理:/das/# def deal_das_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage): try: topic = msg.topic parts = msg.topic.split('/') dev_id = parts[2] # if dev_id != "00FFAABBDDN": # return # 实时位置姿态 if (check_topic(topic,Topic_Pattern.das_realtime_pos)): deal_realtime_pos(msg) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # /las/# def deal_las_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage): try: topic = msg.topic parts = msg.topic.split('/') # 告警计划变更 if (check_topic(topic,Topic_Pattern.las_alarm_plan_update)): deal_alarm_plan_update(msg) # 测试 elif (check_topic(topic,Topic_Pattern.las_test)): deal_las_test(msg) # 获取调试信息 elif (check_topic(topic,Topic_Pattern.las_get_debug_info)): deal_get_debug_info(msg) except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") # 处理接收的消息入口 def process_message(client:mqtt.Client, userdata, msg:mqtt.MQTTMessage): topic = msg.topic qos = msg.qos if (check_topic(topic,Topic_Pattern.dev_all)): # 设备 deal_dev_msg(client, userdata, msg) elif (check_topic(topic,Topic_Pattern.mps_all)): # 小程序服务 return deal_mps_msg(client, userdata, msg) elif (check_topic(topic,Topic_Pattern.opc_all)): # 运维客户端 return deal_opc_msg(client, userdata, msg) elif (check_topic(topic,Topic_Pattern.das_all)): # 设备代理 return deal_das_msg(client, userdata, msg) elif (check_topic(topic,Topic_Pattern.las_all)): # las相关 deal_las_msg(client, userdata, msg) else: # LOGDBG(f"recv invalid topic: {msg.topic}") return