import threading from threading import Lock import queue import json import traceback from collections import deque from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from common.sys_comm import get_utc_time_ms, utc_to_bj_s, get_bj_time_ms, get_bj_time_s from common.sys_comm import g_sys_conf, g_sys_conf_mtx g_dev_map_lock = threading.Lock() g_dev_map = {} # # 跟踪区域类 class TrackingRegion: def __init__(self, x1=0, y1=0, z1=0, x2=0, y2=0, z2=0): # 线程锁 self.lock_ = threading.Lock() self.start_x: int = x1 self.start_y: int = y1 self.start_z: int = z1 self.stop_x: int = x2 self.stop_y: int = y2 self.stop_z: int = z2 def __repr__(self): return (f"TrackingRegion(start=({self.start_x}, {self.start_y}, {self.start_z}), " f"stop=({self.stop_x}, {self.stop_y}, {self.stop_z}))") # 安装参数类 class InstallParam: def __init__(self, mount_plain="", height=0, tracking_region=None): # 线程锁 self.lock_ = threading.Lock() self.mount_plain: str = mount_plain self.isCeiling: int = 1 self.height = height self.tracking_region = tracking_region if tracking_region else TrackingRegion() self.north_angle_: float = 0 def __repr__(self): return (f"InstallParam(mount_plain='{self.mount_plain}', isCeiling={self.isCeiling}, " f"height={self.height}, north_angle={self.north_angle_}, " f"tracking_region={repr(self.tracking_region)})") # 网络参数类 class Network: def __init__(self, ssid="", password="", ip=""): # 线程锁 self.lock_ = threading.Lock() self.ssid = ssid self.password = password self.ip = ip def set_ssid(self, ssid:str): with self.lock_: self.ssid = ssid def set_password(self, password:str): with self.lock_: self.password = password def set_ip(self, ip:str): with self.lock_: self.ip = ip def __repr__(self): return f"Network(ssid='{self.ssid}', password='{self.password}' , ip='{self.ip}')" class Device(): def __init__(self, dev_id: str = "", dev_name: str = "", online: int = 0, dev_type: str = "", network: Network = None, install_param: InstallParam = None): # 线程锁 self.lock_ = Lock() # 基本属性 self.dev_id_ = dev_id self.dev_name_ = dev_name self.online_ = online self.dev_type_ = dev_type self.network_ = network if network else Network() self.install_param_ = install_param if install_param else InstallParam() # 实时数据队列 self.rtd_len_: int = 100 self.rtd_que_: deque = deque(maxlen=self.rtd_len_) """ { "timestamp": 1727323744093, "pose": 2, "target_point": [ [ 0.15537149991307939, -0.17245136840002878, 0.5702038151877267, 1 ], [ 0.15537149991307939, -0.17245136840002878, 0.5702038151877267, 2 ] ] } """ # 插入新的rtd单元 def put_rtd_unit(self, rtd_unit: object): """ 插入新的点云数据,超过最大长度自动丢弃旧数据 """ with self.lock_: self.rtd_que_.append(rtd_unit) # 获取最新的rtd_que_ def get_last_rtd_que(self) -> list: """ 获取当前最新的点云数据(全部缓存) """ with self.lock_: return list(self.rtd_que_) # 获取rtd单元 def get_rtd_unit(self, index: int): with self.lock_: if not self.rtd_que_: return None try: return self.rtd_que_[index] except IndexError: return None # 获取最新的rtd单元 def get_last_rtd_unit(self) -> object: with self.lock_: if self.rtd_que_: return self.rtd_que_[-1] return None # 获取 rtd_que_ 的副本 def get_rtd_que_copy(self) -> list: with self.lock_: return list(self.rtd_que_) def update_dev_info(dev_id:str, dev_instance:Device): with g_dev_map_lock: if dev_id in g_dev_map: g_dev_map[dev_id] = None LOGDBG(f"update dev: {dev_id}") else: LOGDBG(f"new dev: {dev_id}") # todo 更新设备保活时间(伪) # dev_instance.set_keepalive(get_utc_time_ms()) g_dev_map[dev_id] = dev_instance # 回调函数,处理查询结果:查询所有的设备信息 def cb_handle_query_all_dev_info(result): try: if result: for row in result: dev_id = row["client_id"] dev_instance = Device( dev_id=row["client_id"], dev_name=row["dev_name"], online=row["online"], dev_type=row["dev_type"] ) # 更新设备信息 update_dev_info(dev_id, dev_instance) LOGDBG(f"cb_handle_query_all_dev_info succeed") else: LOGDBG("cb_handle_query_all_dev_info, invalid result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") def dev_map_push(dev_id: str, device: Device) -> None: with g_dev_map_lock: g_dev_map[dev_id] = device def dev_map_pop(dev_id: str) -> Device: with g_dev_map_lock: return g_dev_map.pop(dev_id, None) def dev_map_find(dev_id: str) -> Device: with g_dev_map_lock: return g_dev_map.get(dev_id, None) def dev_map_delete(dev_id: str) -> bool: with g_dev_map_lock: if dev_id in g_dev_map: del g_dev_map[dev_id] return True return False