from datetime import datetime, timezone, timedelta import os from enum import Enum import numpy import threading import re alarm_conf = { "retention_time": 60, "retention_keep_time": 30, "retention_alarm_time": 180, "toilet": { "retention_time": 60, "retention_keep_time": 30, "retention_alarm_time": 900 } } # 系统配置 g_sys_conf_mtx = threading.Lock() g_sys_conf = { "module_name": "LAS", "host_ip": "10.206.0.8", "platform": 0, # 平台,0:windows本地,1:云服务器 "sp_id": "", # 服务程序id # 数据库相关参数 "db_host": "localhost", "db_username": "root", "db_password": "Hfln@147888", # ssh "ssh_host": "119.45.12.173", "ssh_port": 22, "ssh_username": "root", "ssh_password": "Hfln@147888", } # 日志配置 g_log_conf_mtx = threading.Lock() g_log_conf = { "module_name": "LAS", "log_path": "./log/", "log_lvl": 0, "max_log_size": 50 * 1024 * 1024, "max_log_files": 10, } # 日志文件锁 g_log_file_mtx = threading.Lock() # 错误码定义 class EC(Enum): EC_SUCCESS = 0 # 成功 EC_FAILED = -1 # 一般错误 # 获取当前utc时间(毫秒) def get_utc_time_ms(): now = datetime.now() utc_time = now.astimezone(timezone.utc) utc_timestamp_ms = int(utc_time.timestamp() * 1000) return utc_timestamp_ms # 获取当前utc时间(秒) def get_utc_time_s(): now = datetime.now() utc_time = now.astimezone(timezone.utc) utc_timestamp_s = int(utc_time.timestamp()) return utc_timestamp_s # 获取当前北京时间(毫秒) def get_bj_time_ms(): now = datetime.now() return now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 获取当前北京时间(秒) def get_bj_time_s(): now = datetime.now() return now.strftime("%Y-%m-%d %H:%M:%S") # utc时间转北京时间(秒) def utc_to_bj_s(utc_ts:int) -> str: utc_seconds = utc_ts utc_time = datetime.fromtimestamp(utc_seconds, tz=timezone.utc) bj_time = utc_time.astimezone(timezone(timedelta(hours=8))) return bj_time.strftime("%Y-%m-%d %H:%M:%S") # 时区 UTC_TZ = timezone.utc BJ_TZ = timezone(timedelta(hours=8)) # 毫秒 UTC → 北京时间 def utc_to_bj_ms(utc_ms: int) -> str: dt_utc = datetime.fromtimestamp(utc_ms / 1000, tz=UTC_TZ) dt_bj = dt_utc.astimezone(BJ_TZ) return dt_bj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒 # 毫秒 北京时间 → UTC def bj_to_utc_ms(bj_str: str) -> int: """ bj_str 格式: "YYYY-MM-DD HH:MM:SS.sss" """ dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S.%f") dt_bj = dt_bj.replace(tzinfo=BJ_TZ) dt_utc = dt_bj.astimezone(UTC_TZ) return int(dt_utc.timestamp() * 1000) # 秒 UTC → 北京时间 def utc_to_bj_s(utc_s: int) -> str: dt_utc = datetime.fromtimestamp(utc_s, tz=UTC_TZ) dt_bj = dt_utc.astimezone(BJ_TZ) return dt_bj.strftime("%Y-%m-%d %H:%M:%S") # 秒 北京时间 → UTC def bj_to_utc_s(bj_str: str) -> int: """ bj_str 格式: "YYYY-MM-DD HH:MM:SS" """ dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S") dt_bj = dt_bj.replace(tzinfo=BJ_TZ) dt_utc = dt_bj.astimezone(UTC_TZ) return int(dt_utc.timestamp()) # 控制日志文件数量 def manage_log_files(max_files, log_path:str): log_files = sorted( (f for f in os.listdir(log_path) if f.endswith(".log")), key=lambda x: os.path.getctime(os.path.join(log_path, x)) ) while len(log_files) > max_files: oldest_file = log_files.pop(0) os.remove(os.path.join(log_path, oldest_file)) # 打印日志 def LOG(text='', title=''): with g_log_conf_mtx: module_name = g_log_conf["module_name"] log_path = g_log_conf["log_path"] max_log_size = g_log_conf["max_log_size"] max_files = g_log_conf["max_log_files"] formatted_now = get_bj_time_ms() file = f"{module_name}.log" with g_log_file_mtx: log_file = os.path.join(log_path, file) # 检查日志文件大小并处理文件重命名 if os.path.exists(log_file) and os.path.getsize(log_file) >= max_log_size: timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") new_log_file = f"{log_path}{module_name}_{timestamp}.log" os.rename(log_file, new_log_file) # 控制日志文件数量 manage_log_files(max_files, log_path) # 写入日志内容 with open(log_file, 'a') as f: f.write('[%s][%s]:%s\n' % (formatted_now, title, text)) # DEBUG日志, 等级0 def LOGDBG(text=''): if g_log_conf["log_lvl"] <= 0: LOG(text=text, title="DEBUG") # INFO日志,等级1 def LOGINFO(text=''): if g_log_conf["log_lvl"] <= 1: LOG(text=text, title="INFO") # WARN日志,等级2 def LOGWARN(text=''): if g_log_conf["log_lvl"] <= 2: LOG(text=text, title="WARN") # ERROR日志,等级3 def LOGERR(text=''): if g_log_conf["log_lvl"] <= 3: LOG(text=text, title="ERROR") # 姿态分类 class POSE_CLASS_E(Enum): POSE_CLASS_3 = 3 # 3类 POSE_CLASS_4 = 4 # 4类 POSE_CLASS_5 = 5 # 5类 e_pose_class = POSE_CLASS_E.POSE_CLASS_4 # 姿态 class POSE_E(Enum): POSE_INVALID = -1 # 无效值 POSE_0 = 0 # 躺(跌倒) POSE_1 = 1 # 坐在椅子上 POSE_2 = 2 # 坐在地上 POSE_3 = 3 # 蹲 POSE_4 = 4 # 站 POSE_5 = 5 # 坐 POSE_6 = 6 # 躺在沙发上 POSE_7 = 7 # 躺在其他 # 实时姿态 realtime_pose:int = POSE_E.POSE_0.value pose_mutex = threading.Lock() def get_tracker_targets(point_cloud:list): target_point = numpy.mean(point_cloud, axis=0).tolist() tracker_targets = [] tracker_targets.append(target_point) return tracker_targets # 获取目标target(多个点) def get_tracker_targets_mult(point_cloud:list): target_point = numpy.mean(point_cloud, axis=0).tolist() tracker_targets = [] tracker_targets.append(target_point) return tracker_targets # 设备接入响应错误码 class DEV_EC(int): succeed = 0 # 成功 unauthorized = 401 # 未授权 forbidden = 403 # 禁止访问,会话已过期 conflict = 409 # 冲突,重复提交 # 检查topic def check_topic(pattern:str, topic:str) -> bool: return bool(re.match(pattern, topic))