mqtt_send.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. '''
  2. 处理mqtt发送的消息
  3. '''
  4. import queue
  5. import json
  6. from enum import Enum
  7. import numpy
  8. mqtt_send_que = queue.Queue() # 发送队列
  9. import common.sys_comm as sys_comm
  10. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  11. from common.sys_comm import MODEL_E, POSE_CLASS_E, POSE_E
  12. from common.sys_comm import g_sys_conf, g_sys_conf_mtx
  13. from mqtt.mqtt_topics import TOPICS
  14. # 消息类型
  15. class MSG_TYPE(Enum):
  16. MSG_DEV_STATUS = 0 # 设备状态变更
  17. MSG_DEV_RAW_POINTS = 1 # 实时点云
  18. MSG_REALTIME_TARGET = 2 # 实时目标位置
  19. MSG_EVENT_FALL = 3 # 跌倒事件
  20. MSG_EVENT_EXIST = 4 # 存在事件
  21. def get_target_point(point_cloud:list):
  22. return numpy.mean(point_cloud, axis=0).tolist()
  23. # 创建消息相关 START
  24. # 准备将消息通过 MQTT 发送
  25. def send_msg(topic:str, format_json:dict, qos:int=0):
  26. try:
  27. parts = topic.split('/')
  28. model = parts[1]
  29. if model != "dev":
  30. with g_sys_conf_mtx:
  31. format_json["sp_id"] = g_sys_conf["sp_id"]
  32. content:str = json.dumps(format_json)
  33. mqtt_msg = {
  34. "topic": topic,
  35. "msg": content,
  36. "qos": qos
  37. }
  38. mqtt_send_que.put(mqtt_msg)
  39. except Exception as e:
  40. LOGERR(f"send_msg error: {e}")
  41. # 告警事件
  42. def alarm_event(
  43. dev_id: str,
  44. uuid: str,
  45. plan_uuid: str,
  46. event_type: str,
  47. info: dict,
  48. table: str):
  49. format_json = dict()
  50. format_json["dev_id"] = dev_id
  51. format_json["uuid"] = uuid
  52. format_json["plan_uuid"] =plan_uuid
  53. format_json["event_type"] = event_type
  54. format_json["info"] = info
  55. format_json["table"] = table
  56. send_msg(TOPICS.las_alarm_event, format_json)
  57. # 创建消息相关 END