123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- import threading
- from threading import Lock
- import queue
- import json
- import traceback
- from collections import deque
- from typing import Optional, List
- import time
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from common.sys_comm import (
- get_utc_time_ms, get_utc_time_s,
- get_bj_time_ms, get_bj_time_s,
- utc_to_bj_s)
- # 跟踪区域类
- class TrackingRegion:
- def __init__(self, x1=0, y1=0, z1=0, x2=0, y2=0, z2=0):
- # 线程锁
- self.lock_ = threading.Lock()
- self.start_x: int = x1
- self.start_y: int = y1
- self.start_z: int = z1
- self.stop_x: int = x2
- self.stop_y: int = y2
- self.stop_z: int = z2
- def __repr__(self):
- return (f"TrackingRegion(start=({self.start_x}, {self.start_y}, {self.start_z}), "
- f"stop=({self.stop_x}, {self.stop_y}, {self.stop_z}))")
- # 安装参数类
- class InstallParam:
- def __init__(self, mount_plain="", height=0, tracking_region=None):
- # 线程锁
- self.lock_ = threading.Lock()
- self.mount_plain: str = mount_plain
- self.isCeiling: int = 1
- self.height = height
- self.tracking_region = tracking_region if tracking_region else TrackingRegion()
- self.north_angle_: float = 0
- def __repr__(self):
- return (f"InstallParam(mount_plain='{self.mount_plain}', isCeiling={self.isCeiling}, "
- f"height={self.height}, north_angle={self.north_angle_}, "
- f"tracking_region={repr(self.tracking_region)})")
- # 网络参数类
- class Network:
- def __init__(self, ssid="", password="", ip=""):
- # 线程锁
- self.lock_ = threading.Lock()
- self.ssid = ssid
- self.password = password
- self.ip = ip
- def set_ssid(self, ssid:str):
- with self.lock_:
- self.ssid = ssid
- def set_password(self, password:str):
- with self.lock_:
- self.password = password
- def set_ip(self, ip:str):
- with self.lock_:
- self.ip = ip
- def __repr__(self):
- return f"Network(ssid='{self.ssid}', password='{self.password}' , ip='{self.ip}')"
- class Device():
- def __init__(self,
- dev_id: str = "",
- dev_name: str = "",
- online: int = 0,
- dev_type: str = "",
- network: Network = None,
- install_param: InstallParam = None):
- # 线程锁
- self.lock_ = Lock()
- # 基本属性
- self.dev_id_ = dev_id
- self.dev_name_ = dev_name
- self.online_ = online
- self.dev_type_ = dev_type
- self.network_ = network if network else Network()
- self.install_param_ = install_param if install_param else InstallParam()
- self.keepalive_: int = get_utc_time_s()
- # 实时数据队列
- self.rtd_len_: int = 600
- self.rtd_que_: deque = deque(maxlen=self.rtd_len_)
- """
- {
- "timestamp": 1727323744093,
- "pose": 2,
- "target_point": [
- [
- 0.15537149991307939,
- -0.17245136840002878,
- 0.5702038151877267,
- 1
- ],
- [
- 0.15537149991307939,
- -0.17245136840002878,
- 0.5702038151877267,
- 2
- ]
- ]
- }
- """
- # 插入新的rtd单元
- def put_rtd_unit(self, rtd_unit: object):
- """
- 插入新的点云数据,超过最大长度自动丢弃旧数据
- """
- with self.lock_:
- self.rtd_que_.append(rtd_unit)
- # 获取最新的rtd_que_
- def get_last_rtd_que(self) -> list:
- """
- 获取当前最新的点云数据(全部缓存)
- """
- with self.lock_:
- return list(self.rtd_que_)
- # 获取rtd单元
- def get_rtd_unit(self, index: int):
- with self.lock_:
- if not self.rtd_que_:
- return None
- try:
- return self.rtd_que_[index]
- except IndexError:
- return None
- # 获取最新的rtd单元
- def get_last_rtd_unit(self) -> object:
- with self.lock_:
- if self.rtd_que_:
- return self.rtd_que_[-1]
- return None
- # 获取 rtd_que_ 的副本
- def get_rtd_que_copy(self) -> list:
- with self.lock_:
- return list(self.rtd_que_)
- def update_keepalive(self):
- with self.lock_:
- self.keepalive_ = get_utc_time_s()
- def update_keepalive(self, now:int):
- with self.lock_:
- self.keepalive_ = now
- def get_keepalive(self):
- with self.lock_:
- return self.keepalive_
- class DeviceManager():
- def __init__(self):
- self.g_dev_map_lock = threading.Lock()
- self.g_dev_map = {} # <dev_id: str, device: Device>
- self.HEARTBEAT_TIMEOUT = 3600 * 24 # 超长保活时间
- self.running_ = False
- self.thread_ = None
- def push_dev_map(self, dev_id:str, dev_instance:Device):
- with self.g_dev_map_lock:
- self.g_dev_map[dev_id] = dev_instance
- def pop_dev_map(self, dev_id: str):
- with self.g_dev_map_lock:
- return self.g_dev_map.pop(dev_id, None)
- def find_dev_map(self, dev_id: str):
- with self.g_dev_map_lock:
- return self.g_dev_map.get(dev_id, None)
- def delete_dev_map(self, dev_id: str):
- with self.g_dev_map_lock:
- if dev_id in self.g_dev_map:
- del self.g_dev_map[dev_id]
- return True
- return False
- def list_all_dev(self,) -> List["Device"]:
- with self.g_dev_map_lock:
- return list(self.g_dev_map.values())
- # ------------------- 心跳检测相关 -------------------
- def _heartbeat_monitor(self):
- """后台线程:定时检查设备是否失活"""
- while self.running_:
- time.sleep(30)
- now = int(time.time())
- with self.g_dev_map_lock:
- for dev_id, device in list(self.g_dev_map.items()):
- last_keepalive = device.get_keepalive()
- if now - last_keepalive > self.HEARTBEAT_TIMEOUT:
- # device.online_ = 0 # 标记掉线
- del self.g_dev_map[dev_id] # 直接销毁实例
- LOGINFO(f"[WARN] Device {dev_id} heartbeat timeout destroy")
- def start(self):
- if not self.running_:
- self.running_ = True
- self.thread_ = threading.Thread(
- target=self._heartbeat_monitor,
- daemon=True,
- name="DevMgrThread")
- self.thread_.start()
- LOGINFO("[INFO] DeviceManager heartbeat task start")
- def stop(self):
- if self.running_:
- self.running_ = False
- if self.thread_:
- self.thread_.join()
- LOGINFO("[INFO] DeviceManager heartbeat task stop")
- # 回调函数,处理查询结果:查询所有的设备信息
- def cb_handle_query_all_dev_info(self, result, userdata):
- try:
- if result:
- for row in result:
- dev_id = row["client_id"]
- x1 = row["start_x"]
- y1 = row["start_y"]
- z1 = row["start_z"]
- x2 = row["stop_x"]
- y2 = row["stop_y"]
- z2 = row["stop_z"]
- mount_plain = row["mount_plain"]
- height = row.get("height", 0.0)
- height = round(height, 3) if isinstance(height, (int, float)) else 0.0
- install_param = InstallParam(mount_plain, height, TrackingRegion(x1, y1, z1, x2, y2, z2))
- dev_instance = Device(
- dev_id=row["client_id"],
- dev_name=row["dev_name"],
- online=row["online"],
- dev_type=row["dev_type"]
- )
- dev_instance.install_param_ = install_param
- # 更新设备信息
- self.push_dev_map(dev_id, dev_instance)
-
- LOGDBG(f"cb_handle_query_all_dev_info succeed")
- else:
- LOGDBG("cb_handle_query_all_dev_info, invalid result")
-
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- g_dev_mgr: DeviceManager = None
- def init_dev_mng():
- global g_dev_mgr
- g_dev_mgr = DeviceManager()
|