dev_mng.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import threading
  2. from threading import Lock
  3. import queue
  4. import json
  5. import traceback
  6. from collections import deque
  7. from typing import Optional, List
  8. import time
  9. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  10. from common.sys_comm import (
  11. get_utc_time_ms, get_utc_time_s,
  12. get_bj_time_ms, get_bj_time_s,
  13. utc_to_bj_s)
  14. from common.sys_comm import g_sys_conf, g_sys_conf_mtx
  15. # 跟踪区域类
  16. class TrackingRegion:
  17. def __init__(self, x1=0, y1=0, z1=0, x2=0, y2=0, z2=0):
  18. # 线程锁
  19. self.lock_ = threading.Lock()
  20. self.start_x: int = x1
  21. self.start_y: int = y1
  22. self.start_z: int = z1
  23. self.stop_x: int = x2
  24. self.stop_y: int = y2
  25. self.stop_z: int = z2
  26. def __repr__(self):
  27. return (f"TrackingRegion(start=({self.start_x}, {self.start_y}, {self.start_z}), "
  28. f"stop=({self.stop_x}, {self.stop_y}, {self.stop_z}))")
  29. # 安装参数类
  30. class InstallParam:
  31. def __init__(self, mount_plain="", height=0, tracking_region=None):
  32. # 线程锁
  33. self.lock_ = threading.Lock()
  34. self.mount_plain: str = mount_plain
  35. self.isCeiling: int = 1
  36. self.height = height
  37. self.tracking_region = tracking_region if tracking_region else TrackingRegion()
  38. self.north_angle_: float = 0
  39. def __repr__(self):
  40. return (f"InstallParam(mount_plain='{self.mount_plain}', isCeiling={self.isCeiling}, "
  41. f"height={self.height}, north_angle={self.north_angle_}, "
  42. f"tracking_region={repr(self.tracking_region)})")
  43. # 网络参数类
  44. class Network:
  45. def __init__(self, ssid="", password="", ip=""):
  46. # 线程锁
  47. self.lock_ = threading.Lock()
  48. self.ssid = ssid
  49. self.password = password
  50. self.ip = ip
  51. def set_ssid(self, ssid:str):
  52. with self.lock_:
  53. self.ssid = ssid
  54. def set_password(self, password:str):
  55. with self.lock_:
  56. self.password = password
  57. def set_ip(self, ip:str):
  58. with self.lock_:
  59. self.ip = ip
  60. def __repr__(self):
  61. return f"Network(ssid='{self.ssid}', password='{self.password}' , ip='{self.ip}')"
  62. class Device():
  63. def __init__(self,
  64. dev_id: str = "",
  65. dev_name: str = "",
  66. online: int = 0,
  67. dev_type: str = "",
  68. network: Network = None,
  69. install_param: InstallParam = None):
  70. # 线程锁
  71. self.lock_ = Lock()
  72. # 基本属性
  73. self.dev_id_ = dev_id
  74. self.dev_name_ = dev_name
  75. self.online_ = online
  76. self.dev_type_ = dev_type
  77. self.network_ = network if network else Network()
  78. self.install_param_ = install_param if install_param else InstallParam()
  79. self.keepalive_: int = get_utc_time_s()
  80. # 实时数据队列
  81. self.rtd_len_: int = 100
  82. self.rtd_que_: deque = deque(maxlen=self.rtd_len_)
  83. """
  84. {
  85. "timestamp": 1727323744093,
  86. "pose": 2,
  87. "target_point": [
  88. [
  89. 0.15537149991307939,
  90. -0.17245136840002878,
  91. 0.5702038151877267,
  92. 1
  93. ],
  94. [
  95. 0.15537149991307939,
  96. -0.17245136840002878,
  97. 0.5702038151877267,
  98. 2
  99. ]
  100. ]
  101. }
  102. """
  103. # 插入新的rtd单元
  104. def put_rtd_unit(self, rtd_unit: object):
  105. """
  106. 插入新的点云数据,超过最大长度自动丢弃旧数据
  107. """
  108. with self.lock_:
  109. self.rtd_que_.append(rtd_unit)
  110. # 获取最新的rtd_que_
  111. def get_last_rtd_que(self) -> list:
  112. """
  113. 获取当前最新的点云数据(全部缓存)
  114. """
  115. with self.lock_:
  116. return list(self.rtd_que_)
  117. # 获取rtd单元
  118. def get_rtd_unit(self, index: int):
  119. with self.lock_:
  120. if not self.rtd_que_:
  121. return None
  122. try:
  123. return self.rtd_que_[index]
  124. except IndexError:
  125. return None
  126. # 获取最新的rtd单元
  127. def get_last_rtd_unit(self) -> object:
  128. with self.lock_:
  129. if self.rtd_que_:
  130. return self.rtd_que_[-1]
  131. return None
  132. # 获取 rtd_que_ 的副本
  133. def get_rtd_que_copy(self) -> list:
  134. with self.lock_:
  135. return list(self.rtd_que_)
  136. def update_keepalive(self):
  137. with self.lock_:
  138. self.keepalive_ = get_utc_time_s()
  139. def update_keepalive(self, now:int):
  140. with self.lock_:
  141. self.keepalive_ = now
  142. def get_keepalive(self):
  143. with self.lock_:
  144. return self.keepalive_
  145. class DeviceManager():
  146. def __init__(self):
  147. self.g_dev_map_lock = threading.Lock()
  148. self.g_dev_map = {} # <dev_id: str, device: Device>
  149. self.HEARTBEAT_TIMEOUT = 3600 * 24 # 超长保活时间
  150. self.running_ = False
  151. self.thread_ = None
  152. def push_dev_map(self, dev_id:str, dev_instance:Device):
  153. with self.g_dev_map_lock:
  154. self.g_dev_map[dev_id] = dev_instance
  155. def pop_dev_map(self, dev_id: str):
  156. with self.g_dev_map_lock:
  157. return self.g_dev_map.pop(dev_id, None)
  158. def find_dev_map(self, dev_id: str):
  159. with self.g_dev_map_lock:
  160. return self.g_dev_map.get(dev_id, None)
  161. def delete_dev_map(self, dev_id: str):
  162. with self.g_dev_map_lock:
  163. if dev_id in self.g_dev_map:
  164. del self.g_dev_map[dev_id]
  165. return True
  166. return False
  167. def list_all_dev(self,) -> List["Device"]:
  168. with self.g_dev_map_lock:
  169. return list(self.g_dev_map.values())
  170. # ------------------- 心跳检测相关 -------------------
  171. def _heartbeat_monitor(self):
  172. """后台线程:定时检查设备是否失活"""
  173. while self.running_:
  174. time.sleep(30)
  175. now = int(time.time())
  176. with self.g_dev_map_lock:
  177. for dev_id, device in list(self.g_dev_map.items()):
  178. last_keepalive = device.get_keepalive()
  179. if now - last_keepalive > self.HEARTBEAT_TIMEOUT:
  180. # device.online_ = 0 # 标记掉线
  181. del self.g_dev_map[dev_id] # 直接销毁实例
  182. LOGINFO(f"[WARN] Device {dev_id} heartbeat timeout destroy")
  183. def start(self):
  184. if not self.running_:
  185. self.running_ = True
  186. self.thread_ = threading.Thread(target=self._heartbeat_monitor, daemon=True)
  187. self.thread_.start()
  188. LOGINFO("[INFO] DeviceManager heartbeat task start")
  189. def stop(self):
  190. if self.running_:
  191. self.running_ = False
  192. if self.thread_:
  193. self.thread_.join()
  194. LOGINFO("[INFO] DeviceManager heartbeat task stop")
  195. # 回调函数,处理查询结果:查询所有的设备信息
  196. def cb_handle_query_all_dev_info(self, result, userdata):
  197. try:
  198. if result:
  199. for row in result:
  200. dev_id = row["client_id"]
  201. dev_instance = Device(
  202. dev_id=row["client_id"],
  203. dev_name=row["dev_name"],
  204. online=row["online"],
  205. dev_type=row["dev_type"]
  206. )
  207. # 更新设备信息
  208. self.push_dev_map(dev_id, dev_instance)
  209. LOGDBG(f"cb_handle_query_all_dev_info succeed")
  210. else:
  211. LOGDBG("cb_handle_query_all_dev_info, invalid result")
  212. except json.JSONDecodeError as e:
  213. tb_info = traceback.extract_tb(e.__traceback__)
  214. for frame in tb_info:
  215. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  216. except Exception as e:
  217. tb_info = traceback.extract_tb(e.__traceback__)
  218. for frame in tb_info:
  219. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  220. g_dev_mgr: DeviceManager = None
  221. def init_dev_mng():
  222. global g_dev_mgr
  223. g_dev_mgr = DeviceManager()