dev_mng.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import threading
  2. from threading import Lock
  3. import queue
  4. import json
  5. import traceback
  6. from collections import deque
  7. from typing import Optional, List
  8. import time
  9. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  10. from common.sys_comm import (
  11. get_utc_time_ms, get_utc_time_s,
  12. get_bj_time_ms, get_bj_time_s,
  13. utc_to_bj_s)
  14. # 跟踪区域类
  15. class TrackingRegion:
  16. def __init__(self, x1=0, y1=0, z1=0, x2=0, y2=0, z2=0):
  17. # 线程锁
  18. self.lock_ = threading.Lock()
  19. self.start_x: int = x1
  20. self.start_y: int = y1
  21. self.start_z: int = z1
  22. self.stop_x: int = x2
  23. self.stop_y: int = y2
  24. self.stop_z: int = z2
  25. def __repr__(self):
  26. return (f"TrackingRegion(start=({self.start_x}, {self.start_y}, {self.start_z}), "
  27. f"stop=({self.stop_x}, {self.stop_y}, {self.stop_z}))")
  28. # 安装参数类
  29. class InstallParam:
  30. def __init__(self, mount_plain="", height=0, tracking_region=None):
  31. # 线程锁
  32. self.lock_ = threading.Lock()
  33. self.mount_plain: str = mount_plain
  34. self.isCeiling: int = 1
  35. self.height = height
  36. self.tracking_region = tracking_region if tracking_region else TrackingRegion()
  37. self.north_angle_: float = 0
  38. def __repr__(self):
  39. return (f"InstallParam(mount_plain='{self.mount_plain}', isCeiling={self.isCeiling}, "
  40. f"height={self.height}, north_angle={self.north_angle_}, "
  41. f"tracking_region={repr(self.tracking_region)})")
  42. # 网络参数类
  43. class Network:
  44. def __init__(self, ssid="", password="", ip=""):
  45. # 线程锁
  46. self.lock_ = threading.Lock()
  47. self.ssid = ssid
  48. self.password = password
  49. self.ip = ip
  50. def set_ssid(self, ssid:str):
  51. with self.lock_:
  52. self.ssid = ssid
  53. def set_password(self, password:str):
  54. with self.lock_:
  55. self.password = password
  56. def set_ip(self, ip:str):
  57. with self.lock_:
  58. self.ip = ip
  59. def __repr__(self):
  60. return f"Network(ssid='{self.ssid}', password='{self.password}' , ip='{self.ip}')"
  61. class Device():
  62. def __init__(self,
  63. dev_id: str = "",
  64. dev_name: str = "",
  65. online: int = 0,
  66. dev_type: str = "",
  67. network: Network = None,
  68. install_param: InstallParam = None):
  69. # 线程锁
  70. self.lock_ = Lock()
  71. # 基本属性
  72. self.dev_id_ = dev_id
  73. self.dev_name_ = dev_name
  74. self.online_ = online
  75. self.dev_type_ = dev_type
  76. self.network_ = network if network else Network()
  77. self.install_param_ = install_param if install_param else InstallParam()
  78. self.keepalive_: int = get_utc_time_s()
  79. # 实时数据队列
  80. self.rtd_len_: int = 600
  81. self.rtd_que_: deque = deque(maxlen=self.rtd_len_)
  82. """
  83. {
  84. "timestamp": 1727323744093,
  85. "pose": 2,
  86. "target_point": [
  87. [
  88. 0.15537149991307939,
  89. -0.17245136840002878,
  90. 0.5702038151877267,
  91. 1
  92. ],
  93. [
  94. 0.15537149991307939,
  95. -0.17245136840002878,
  96. 0.5702038151877267,
  97. 2
  98. ]
  99. ]
  100. }
  101. """
  102. # 插入新的rtd单元
  103. def put_rtd_unit(self, rtd_unit: object):
  104. """
  105. 插入新的点云数据,超过最大长度自动丢弃旧数据
  106. """
  107. with self.lock_:
  108. self.rtd_que_.append(rtd_unit)
  109. # 获取最新的rtd_que_
  110. def get_last_rtd_que(self) -> list:
  111. """
  112. 获取当前最新的点云数据(全部缓存)
  113. """
  114. with self.lock_:
  115. return list(self.rtd_que_)
  116. # 获取rtd单元
  117. def get_rtd_unit(self, index: int):
  118. with self.lock_:
  119. if not self.rtd_que_:
  120. return None
  121. try:
  122. return self.rtd_que_[index]
  123. except IndexError:
  124. return None
  125. # 获取最新的rtd单元
  126. def get_last_rtd_unit(self) -> object:
  127. with self.lock_:
  128. if self.rtd_que_:
  129. return self.rtd_que_[-1]
  130. return None
  131. # 获取 rtd_que_ 的副本
  132. def get_rtd_que_copy(self) -> list:
  133. with self.lock_:
  134. return list(self.rtd_que_)
  135. def update_keepalive(self):
  136. with self.lock_:
  137. self.keepalive_ = get_utc_time_s()
  138. def update_keepalive(self, now:int):
  139. with self.lock_:
  140. self.keepalive_ = now
  141. def get_keepalive(self):
  142. with self.lock_:
  143. return self.keepalive_
  144. class DeviceManager():
  145. def __init__(self):
  146. self.g_dev_map_lock = threading.Lock()
  147. self.g_dev_map = {} # <dev_id: str, device: Device>
  148. self.HEARTBEAT_TIMEOUT = 3600 * 24 # 超长保活时间
  149. self.running_ = False
  150. self.thread_ = None
  151. def push_dev_map(self, dev_id:str, dev_instance:Device):
  152. with self.g_dev_map_lock:
  153. self.g_dev_map[dev_id] = dev_instance
  154. def pop_dev_map(self, dev_id: str):
  155. with self.g_dev_map_lock:
  156. return self.g_dev_map.pop(dev_id, None)
  157. def find_dev_map(self, dev_id: str):
  158. with self.g_dev_map_lock:
  159. return self.g_dev_map.get(dev_id, None)
  160. def delete_dev_map(self, dev_id: str):
  161. with self.g_dev_map_lock:
  162. if dev_id in self.g_dev_map:
  163. del self.g_dev_map[dev_id]
  164. return True
  165. return False
  166. def list_all_dev(self,) -> List["Device"]:
  167. with self.g_dev_map_lock:
  168. return list(self.g_dev_map.values())
  169. # ------------------- 心跳检测相关 -------------------
  170. def _heartbeat_monitor(self):
  171. """后台线程:定时检查设备是否失活"""
  172. while self.running_:
  173. time.sleep(30)
  174. now = int(time.time())
  175. with self.g_dev_map_lock:
  176. for dev_id, device in list(self.g_dev_map.items()):
  177. last_keepalive = device.get_keepalive()
  178. if now - last_keepalive > self.HEARTBEAT_TIMEOUT:
  179. # device.online_ = 0 # 标记掉线
  180. del self.g_dev_map[dev_id] # 直接销毁实例
  181. LOGINFO(f"[WARN] Device {dev_id} heartbeat timeout destroy")
  182. def start(self):
  183. if not self.running_:
  184. self.running_ = True
  185. self.thread_ = threading.Thread(
  186. target=self._heartbeat_monitor,
  187. daemon=True,
  188. name="DevMgrThread")
  189. self.thread_.start()
  190. LOGINFO("[INFO] DeviceManager heartbeat task start")
  191. def stop(self):
  192. if self.running_:
  193. self.running_ = False
  194. if self.thread_:
  195. self.thread_.join()
  196. LOGINFO("[INFO] DeviceManager heartbeat task stop")
  197. # 回调函数,处理查询结果:查询所有的设备信息
  198. def cb_handle_query_all_dev_info(self, result, userdata):
  199. try:
  200. if result:
  201. for row in result:
  202. dev_id = row["client_id"]
  203. x1 = row["start_x"]
  204. y1 = row["start_y"]
  205. z1 = row["start_z"]
  206. x2 = row["stop_x"]
  207. y2 = row["stop_y"]
  208. z2 = row["stop_z"]
  209. mount_plain = row["mount_plain"]
  210. height = row.get("height", 0.0)
  211. height = round(height, 3) if isinstance(height, (int, float)) else 0.0
  212. install_param = InstallParam(mount_plain, height, TrackingRegion(x1, y1, z1, x2, y2, z2))
  213. dev_instance = Device(
  214. dev_id=row["client_id"],
  215. dev_name=row["dev_name"],
  216. online=row["online"],
  217. dev_type=row["dev_type"]
  218. )
  219. dev_instance.install_param_ = install_param
  220. # 更新设备信息
  221. self.push_dev_map(dev_id, dev_instance)
  222. LOGDBG(f"cb_handle_query_all_dev_info succeed")
  223. else:
  224. LOGDBG("cb_handle_query_all_dev_info, invalid result")
  225. except json.JSONDecodeError as e:
  226. tb_info = traceback.extract_tb(e.__traceback__)
  227. for frame in tb_info:
  228. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  229. except Exception as e:
  230. tb_info = traceback.extract_tb(e.__traceback__)
  231. for frame in tb_info:
  232. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  233. g_dev_mgr: DeviceManager = None
  234. def init_dev_mng():
  235. global g_dev_mgr
  236. g_dev_mgr = DeviceManager()