''' 处理mqtt发送的消息 ''' import queue import json from enum import Enum import numpy import common.sys_comm as sys_comm from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from common.sys_comm import POSE_CLASS_E, POSE_E from mqtt.mqtt_topics import TOPICS import mqtt.mqtt_process as mp # 消息类型 class MSG_TYPE(Enum): MSG_DEV_STATUS = 0 # 设备状态变更 MSG_DEV_RAW_POINTS = 1 # 实时点云 MSG_REALTIME_TARGET = 2 # 实时目标位置 MSG_EVENT_FALL = 3 # 跌倒事件 MSG_EVENT_EXIST = 4 # 存在事件 def get_target_point(point_cloud:list): return numpy.mean(point_cloud, axis=0).tolist() # 创建消息相关 START # 准备将消息通过 MQTT 发送 def send_msg(topic:str, format_json:dict, qos:int=0): try: parts = topic.split('/') model = parts[1] if model != "dev": with sys_comm.g_sys_conf_mtx: format_json["sp_id"] = sys_comm.g_sys_conf["sp_id"] content:str = json.dumps(format_json) mqtt_msg = { "topic": topic, "msg": content, "qos": qos } mp.g_mqtt_client.send_msg(mqtt_msg) except Exception as e: LOGERR(f"send_msg error: {e}") # 告警事件 def alarm_event( dev_id: str, dev_name: str, uuid: str, plan_uuid: str, event_type: str, info: dict, linkage_action: dict, table: str): format_json = dict() format_json["dev_id"] = dev_id format_json["dev_name"] = dev_name format_json["uuid"] = uuid format_json["plan_uuid"] =plan_uuid format_json["event_type"] = event_type format_json["info"] = info format_json["linkage_action"] = linkage_action format_json["table"] = table send_msg(TOPICS.las_alarm_event, format_json, 2) # 测试 def las_test_resp(): module_name = sys_comm.g_sys_conf["module_name"] platform = sys_comm.g_sys_conf["platform"] host_ip = sys_comm.g_sys_conf["host_ip"] max_log_files = sys_comm.g_sys_conf["max_log_files"] max_log_size = sys_comm.g_sys_conf["max_log_size"] log_lvl = sys_comm.g_sys_conf["log_lvl"] sp_id = sys_comm.g_sys_conf["sp_id"] format_json = dict() format_json["module_name"] = module_name format_json["platform"] = platform format_json["host_ip"] = host_ip format_json["max_log_files"] = max_log_files format_json["max_log_size"] = max_log_size format_json["log_lvl"] = log_lvl format_json["sp_id"] = sp_id send_msg(TOPICS.las_test_resp, format_json, 2) # 创建消息相关 END