sys_comm.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from datetime import datetime, timezone, timedelta
  2. import os
  3. from enum import Enum
  4. import numpy
  5. import threading
  6. import re
  7. # 错误码定义
  8. class EC(Enum):
  9. EC_SUCCESS = 0 # 成功
  10. EC_FAILED = -1 # 一般错误
  11. # 日志配置
  12. g_log_conf_mtx = threading.Lock()
  13. g_log_conf = {
  14. "module_name": "LAS",
  15. "log_path": "./log/",
  16. "log_lvl": 0,
  17. "max_log_size": 50 * 1024 * 1024,
  18. "max_log_files": 10,
  19. }
  20. # 日志文件锁
  21. g_log_file_mtx = threading.Lock()
  22. # 获取当前utc时间(毫秒)
  23. def get_utc_time_ms():
  24. now = datetime.now()
  25. utc_time = now.astimezone(timezone.utc)
  26. utc_timestamp_ms = int(utc_time.timestamp() * 1000)
  27. return utc_timestamp_ms
  28. # 获取当前utc时间(秒)
  29. def get_utc_time_s():
  30. now = datetime.now()
  31. utc_time = now.astimezone(timezone.utc)
  32. utc_timestamp_s = int(utc_time.timestamp())
  33. return utc_timestamp_s
  34. # 获取当前北京时间(毫秒)
  35. def get_bj_time_ms():
  36. now = datetime.now()
  37. return now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
  38. # 获取当前北京时间(秒)
  39. def get_bj_time_s():
  40. now = datetime.now()
  41. return now.strftime("%Y-%m-%d %H:%M:%S")
  42. # utc时间转北京时间(秒)
  43. def utc_to_bj_s(utc_ts:int) -> str:
  44. utc_seconds = utc_ts
  45. utc_time = datetime.fromtimestamp(utc_seconds, tz=timezone.utc)
  46. bj_time = utc_time.astimezone(timezone(timedelta(hours=8)))
  47. return bj_time.strftime("%Y-%m-%d %H:%M:%S")
  48. # 时区
  49. UTC_TZ = timezone.utc
  50. BJ_TZ = timezone(timedelta(hours=8))
  51. # 毫秒 UTC → 北京时间
  52. def utc_to_bj_ms(utc_ms: int) -> str:
  53. dt_utc = datetime.fromtimestamp(utc_ms / 1000, tz=UTC_TZ)
  54. dt_bj = dt_utc.astimezone(BJ_TZ)
  55. return dt_bj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒
  56. # 毫秒 北京时间 → UTC
  57. def bj_to_utc_ms(bj_str: str) -> int:
  58. """
  59. bj_str 格式: "YYYY-MM-DD HH:MM:SS.sss"
  60. """
  61. dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S.%f")
  62. dt_bj = dt_bj.replace(tzinfo=BJ_TZ)
  63. dt_utc = dt_bj.astimezone(UTC_TZ)
  64. return int(dt_utc.timestamp() * 1000)
  65. # 秒 UTC → 北京时间
  66. def utc_to_bj_s(utc_s: int) -> str:
  67. dt_utc = datetime.fromtimestamp(utc_s, tz=UTC_TZ)
  68. dt_bj = dt_utc.astimezone(BJ_TZ)
  69. return dt_bj.strftime("%Y-%m-%d %H:%M:%S")
  70. # 秒 北京时间 → UTC
  71. def bj_to_utc_s(bj_str: str) -> int:
  72. """
  73. bj_str 格式: "YYYY-MM-DD HH:MM:SS"
  74. """
  75. dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S")
  76. dt_bj = dt_bj.replace(tzinfo=BJ_TZ)
  77. dt_utc = dt_bj.astimezone(UTC_TZ)
  78. return int(dt_utc.timestamp())
  79. # 控制日志文件数量
  80. def manage_log_files(max_files, log_path:str):
  81. log_files = sorted(
  82. (f for f in os.listdir(log_path) if f.endswith(".log")),
  83. key=lambda x: os.path.getctime(os.path.join(log_path, x))
  84. )
  85. while len(log_files) > max_files:
  86. oldest_file = log_files.pop(0)
  87. os.remove(os.path.join(log_path, oldest_file))
  88. # 打印日志
  89. def LOG(text='', title=''):
  90. with g_log_conf_mtx:
  91. module_name = g_log_conf["module_name"]
  92. log_path = g_log_conf["log_path"]
  93. max_log_size = g_log_conf["max_log_size"]
  94. max_files = g_log_conf["max_log_files"]
  95. formatted_now = get_bj_time_ms()
  96. file = f"{module_name}.log"
  97. with g_log_file_mtx:
  98. log_file = os.path.join(log_path, file)
  99. # 检查日志文件大小并处理文件重命名
  100. if os.path.exists(log_file) and os.path.getsize(log_file) >= max_log_size:
  101. timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
  102. new_log_file = f"{log_path}{module_name}_{timestamp}.log"
  103. os.rename(log_file, new_log_file)
  104. # 控制日志文件数量
  105. manage_log_files(max_files, log_path)
  106. # 写入日志内容
  107. with open(log_file, 'a') as f:
  108. f.write('[%s][%s]:%s\n' % (formatted_now, title, text))
  109. # DEBUG日志, 等级0
  110. def LOGDBG(text=''):
  111. if g_log_conf["log_lvl"] <= 0:
  112. LOG(text=text, title="DEBUG")
  113. # INFO日志,等级1
  114. def LOGINFO(text=''):
  115. if g_log_conf["log_lvl"] <= 1:
  116. LOG(text=text, title="INFO")
  117. # WARN日志,等级2
  118. def LOGWARN(text=''):
  119. if g_log_conf["log_lvl"] <= 2:
  120. LOG(text=text, title="WARN")
  121. # ERROR日志,等级3
  122. def LOGERR(text=''):
  123. if g_log_conf["log_lvl"] <= 3:
  124. LOG(text=text, title="ERROR")
  125. # 姿态分类
  126. class POSE_CLASS_E(Enum):
  127. POSE_CLASS_3 = 3 # 3类
  128. POSE_CLASS_4 = 4 # 4类
  129. POSE_CLASS_5 = 5 # 5类
  130. e_pose_class = POSE_CLASS_E.POSE_CLASS_4
  131. # 姿态
  132. class POSE_E(Enum):
  133. POSE_INVALID = -1 # 无效值
  134. POSE_0 = 0 # 躺(跌倒)
  135. POSE_1 = 1 # 坐在椅子上
  136. POSE_2 = 2 # 坐在地上
  137. POSE_3 = 3 # 蹲
  138. POSE_4 = 4 # 站
  139. POSE_5 = 5 # 坐
  140. POSE_6 = 6 # 躺在沙发上
  141. POSE_7 = 7 # 躺在其他
  142. # 实时姿态
  143. realtime_pose:int = POSE_E.POSE_0.value
  144. pose_mutex = threading.Lock()
  145. def get_tracker_targets(point_cloud:list):
  146. target_point = numpy.mean(point_cloud, axis=0).tolist()
  147. tracker_targets = []
  148. tracker_targets.append(target_point)
  149. return tracker_targets
  150. # 获取目标target(多个点)
  151. def get_tracker_targets_mult(point_cloud:list):
  152. target_point = numpy.mean(point_cloud, axis=0).tolist()
  153. tracker_targets = []
  154. tracker_targets.append(target_point)
  155. return tracker_targets
  156. # 设备接入响应错误码
  157. class DEV_EC(int):
  158. succeed = 0 # 成功
  159. unauthorized = 401 # 未授权
  160. forbidden = 403 # 禁止访问,会话已过期
  161. conflict = 409 # 冲突,重复提交
  162. # 检查topic
  163. def check_topic(pattern:str, topic:str) -> bool:
  164. return bool(re.match(pattern, topic))