import threading from threading import Lock import queue import json import traceback from collections import deque from typing import Optional, List import time from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from common.sys_comm import ( get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_s) # 跟踪区域类 class TrackingRegion: def __init__(self, x1=0, y1=0, z1=0, x2=0, y2=0, z2=0): # 线程锁 self.lock_ = threading.Lock() self.start_x: int = x1 self.start_y: int = y1 self.start_z: int = z1 self.stop_x: int = x2 self.stop_y: int = y2 self.stop_z: int = z2 def __repr__(self): return (f"TrackingRegion(start=({self.start_x}, {self.start_y}, {self.start_z}), " f"stop=({self.stop_x}, {self.stop_y}, {self.stop_z}))") # 安装参数类 class InstallParam: def __init__(self, mount_plain="", height=0, tracking_region=None): # 线程锁 self.lock_ = threading.Lock() self.mount_plain: str = mount_plain self.isCeiling: int = 1 self.height = height self.tracking_region = tracking_region if tracking_region else TrackingRegion() self.north_angle_: float = 0 def __repr__(self): return (f"InstallParam(mount_plain='{self.mount_plain}', isCeiling={self.isCeiling}, " f"height={self.height}, north_angle={self.north_angle_}, " f"tracking_region={repr(self.tracking_region)})") # 网络参数类 class Network: def __init__(self, ssid="", password="", ip=""): # 线程锁 self.lock_ = threading.Lock() self.ssid = ssid self.password = password self.ip = ip def set_ssid(self, ssid:str): with self.lock_: self.ssid = ssid def set_password(self, password:str): with self.lock_: self.password = password def set_ip(self, ip:str): with self.lock_: self.ip = ip def __repr__(self): return f"Network(ssid='{self.ssid}', password='{self.password}' , ip='{self.ip}')" class Device(): def __init__(self, dev_id: str = "", dev_name: str = "", online: int = 0, dev_type: str = "", network: Network = None, install_param: InstallParam = None): # 线程锁 self.lock_ = Lock() # 基本属性 self.dev_id_ = dev_id self.dev_name_ = dev_name self.online_ = online self.dev_type_ = dev_type self.network_ = network if network else Network() self.install_param_ = install_param if install_param else InstallParam() self.keepalive_: int = get_utc_time_s() # 实时数据队列 self.rtd_len_: int = 600 self.rtd_que_: deque = deque(maxlen=self.rtd_len_) """ { "timestamp": 1727323744093, "pose": 2, "target_point": [ [ 0.15537149991307939, -0.17245136840002878, 0.5702038151877267, 1 ], [ 0.15537149991307939, -0.17245136840002878, 0.5702038151877267, 2 ] ] } """ # 插入新的rtd单元 def put_rtd_unit(self, rtd_unit: object): """ 插入新的点云数据,超过最大长度自动丢弃旧数据 """ with self.lock_: self.rtd_que_.append(rtd_unit) # 获取最新的rtd_que_ def get_last_rtd_que(self) -> list: """ 获取当前最新的点云数据(全部缓存) """ with self.lock_: return list(self.rtd_que_) # 获取rtd单元 def get_rtd_unit(self, index: int): with self.lock_: if not self.rtd_que_: return None try: return self.rtd_que_[index] except IndexError: return None # 获取最新的rtd单元 def get_last_rtd_unit(self) -> object: with self.lock_: if self.rtd_que_: return self.rtd_que_[-1] return None # 获取 rtd_que_ 的副本 def get_rtd_que_copy(self) -> list: with self.lock_: return list(self.rtd_que_) def update_keepalive(self): with self.lock_: self.keepalive_ = get_utc_time_s() def update_keepalive(self, now:int): with self.lock_: self.keepalive_ = now def get_keepalive(self): with self.lock_: return self.keepalive_ class DeviceManager(): def __init__(self): self.g_dev_map_lock = threading.Lock() self.g_dev_map = {} # self.HEARTBEAT_TIMEOUT = 3600 * 24 # 超长保活时间 self.running_ = False self.thread_ = None def push_dev_map(self, dev_id:str, dev_instance:Device): with self.g_dev_map_lock: self.g_dev_map[dev_id] = dev_instance def pop_dev_map(self, dev_id: str): with self.g_dev_map_lock: return self.g_dev_map.pop(dev_id, None) def find_dev_map(self, dev_id: str): with self.g_dev_map_lock: return self.g_dev_map.get(dev_id, None) def delete_dev_map(self, dev_id: str): with self.g_dev_map_lock: if dev_id in self.g_dev_map: del self.g_dev_map[dev_id] return True return False def list_all_dev(self,) -> List["Device"]: with self.g_dev_map_lock: return list(self.g_dev_map.values()) # ------------------- 心跳检测相关 ------------------- def _heartbeat_monitor(self): """后台线程:定时检查设备是否失活""" while self.running_: time.sleep(30) now = int(time.time()) with self.g_dev_map_lock: for dev_id, device in list(self.g_dev_map.items()): last_keepalive = device.get_keepalive() if now - last_keepalive > self.HEARTBEAT_TIMEOUT: # device.online_ = 0 # 标记掉线 del self.g_dev_map[dev_id] # 直接销毁实例 LOGINFO(f"[WARN] Device {dev_id} heartbeat timeout destroy") def start(self): if not self.running_: self.running_ = True self.thread_ = threading.Thread( target=self._heartbeat_monitor, daemon=True, name="DevMgrThread") self.thread_.start() LOGINFO("[INFO] DeviceManager heartbeat task start") def stop(self): if self.running_: self.running_ = False if self.thread_: self.thread_.join() LOGINFO("[INFO] DeviceManager heartbeat task stop") # 回调函数,处理查询结果:查询所有的设备信息 def cb_handle_query_all_dev_info(self, result, userdata): try: if result: for row in result: dev_id = row["client_id"] x1 = row["start_x"] y1 = row["start_y"] z1 = row["start_z"] x2 = row["stop_x"] y2 = row["stop_y"] z2 = row["stop_z"] mount_plain = row["mount_plain"] height = row.get("height", 0.0) height = round(height, 3) if isinstance(height, (int, float)) else 0.0 install_param = InstallParam(mount_plain, height, TrackingRegion(x1, y1, z1, x2, y2, z2)) dev_instance = Device( dev_id=row["client_id"], dev_name=row["dev_name"], online=row["online"], dev_type=row["dev_type"] ) dev_instance.install_param_ = install_param # 更新设备信息 self.push_dev_map(dev_id, dev_instance) LOGDBG(f"cb_handle_query_all_dev_info succeed") else: LOGDBG("cb_handle_query_all_dev_info, invalid result") except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") g_dev_mgr: DeviceManager = None def init_dev_mng(): global g_dev_mgr g_dev_mgr = DeviceManager()