123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- from datetime import datetime, timezone, timedelta
- import os
- from enum import Enum
- import numpy
- import threading
- import re
- # 错误码定义
- class EC(Enum):
- EC_SUCCESS = 0 # 成功
- EC_FAILED = -1 # 一般错误
- alarm_conf = {
- "retention_time": 60,
- "retention_keep_time": 30,
- "retention_alarm_time": 180,
- "toilet": {
- "retention_time": 60,
- "retention_keep_time": 30,
- "retention_alarm_time": 900
- }
- }
- # 系统配置
- g_sys_conf_mtx = threading.Lock()
- g_sys_conf = {
- "module_name": "LAS",
- "host_ip": "10.206.0.8",
- "platform": 0, # 平台,0:windows本地,1:云服务器
- "sp_id": "", # 服务程序id
- # 数据库相关参数
- "db_host": "localhost",
- "db_username": "root",
- "db_password": "Hfln@147888",
- # ssh
- "ssh_host": "119.45.12.173",
- "ssh_port": 22,
- "ssh_username": "root",
- "ssh_password": "Hfln@147888",
- }
- # 日志配置
- g_log_conf_mtx = threading.Lock()
- g_log_conf = {
- "module_name": "LAS",
- "log_path": "./log/",
- "log_lvl": 0,
- "max_log_size": 50 * 1024 * 1024,
- "max_log_files": 10,
- }
- # 日志文件锁
- g_log_file_mtx = threading.Lock()
- # 获取当前utc时间(毫秒)
- def get_utc_time_ms():
- now = datetime.now()
- utc_time = now.astimezone(timezone.utc)
- utc_timestamp_ms = int(utc_time.timestamp() * 1000)
- return utc_timestamp_ms
- # 获取当前utc时间(秒)
- def get_utc_time_s():
- now = datetime.now()
- utc_time = now.astimezone(timezone.utc)
- utc_timestamp_s = int(utc_time.timestamp())
- return utc_timestamp_s
- # 获取当前北京时间(毫秒)
- def get_bj_time_ms():
- now = datetime.now()
- return now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
- # 获取当前北京时间(秒)
- def get_bj_time_s():
- now = datetime.now()
- return now.strftime("%Y-%m-%d %H:%M:%S")
- # utc时间转北京时间(秒)
- def utc_to_bj_s(utc_ts:int) -> str:
- utc_seconds = utc_ts
- utc_time = datetime.fromtimestamp(utc_seconds, tz=timezone.utc)
- bj_time = utc_time.astimezone(timezone(timedelta(hours=8)))
- return bj_time.strftime("%Y-%m-%d %H:%M:%S")
- # 时区
- UTC_TZ = timezone.utc
- BJ_TZ = timezone(timedelta(hours=8))
- # 毫秒 UTC → 北京时间
- def utc_to_bj_ms(utc_ms: int) -> str:
- dt_utc = datetime.fromtimestamp(utc_ms / 1000, tz=UTC_TZ)
- dt_bj = dt_utc.astimezone(BJ_TZ)
- return dt_bj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒
- # 毫秒 北京时间 → UTC
- def bj_to_utc_ms(bj_str: str) -> int:
- """
- bj_str 格式: "YYYY-MM-DD HH:MM:SS.sss"
- """
- dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S.%f")
- dt_bj = dt_bj.replace(tzinfo=BJ_TZ)
- dt_utc = dt_bj.astimezone(UTC_TZ)
- return int(dt_utc.timestamp() * 1000)
- # 秒 UTC → 北京时间
- def utc_to_bj_s(utc_s: int) -> str:
- dt_utc = datetime.fromtimestamp(utc_s, tz=UTC_TZ)
- dt_bj = dt_utc.astimezone(BJ_TZ)
- return dt_bj.strftime("%Y-%m-%d %H:%M:%S")
- # 秒 北京时间 → UTC
- def bj_to_utc_s(bj_str: str) -> int:
- """
- bj_str 格式: "YYYY-MM-DD HH:MM:SS"
- """
- dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S")
- dt_bj = dt_bj.replace(tzinfo=BJ_TZ)
- dt_utc = dt_bj.astimezone(UTC_TZ)
- return int(dt_utc.timestamp())
- # 控制日志文件数量
- def manage_log_files(max_files, log_path:str):
- log_files = sorted(
- (f for f in os.listdir(log_path) if f.endswith(".log")),
- key=lambda x: os.path.getctime(os.path.join(log_path, x))
- )
-
- while len(log_files) > max_files:
- oldest_file = log_files.pop(0)
- os.remove(os.path.join(log_path, oldest_file))
- # 打印日志
- def LOG(text='', title=''):
- with g_log_conf_mtx:
- module_name = g_log_conf["module_name"]
- log_path = g_log_conf["log_path"]
- max_log_size = g_log_conf["max_log_size"]
- max_files = g_log_conf["max_log_files"]
- formatted_now = get_bj_time_ms()
- file = f"{module_name}.log"
- with g_log_file_mtx:
- log_file = os.path.join(log_path, file)
- # 检查日志文件大小并处理文件重命名
- if os.path.exists(log_file) and os.path.getsize(log_file) >= max_log_size:
- timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- new_log_file = f"{log_path}{module_name}_{timestamp}.log"
- os.rename(log_file, new_log_file)
- # 控制日志文件数量
- manage_log_files(max_files, log_path)
- # 写入日志内容
- with open(log_file, 'a') as f:
- f.write('[%s][%s]:%s\n' % (formatted_now, title, text))
- # DEBUG日志, 等级0
- def LOGDBG(text=''):
- if g_log_conf["log_lvl"] <= 0:
- LOG(text=text, title="DEBUG")
- # INFO日志,等级1
- def LOGINFO(text=''):
- if g_log_conf["log_lvl"] <= 1:
- LOG(text=text, title="INFO")
- # WARN日志,等级2
- def LOGWARN(text=''):
- if g_log_conf["log_lvl"] <= 2:
- LOG(text=text, title="WARN")
- # ERROR日志,等级3
- def LOGERR(text=''):
- if g_log_conf["log_lvl"] <= 3:
- LOG(text=text, title="ERROR")
- # 姿态分类
- class POSE_CLASS_E(Enum):
- POSE_CLASS_3 = 3 # 3类
- POSE_CLASS_4 = 4 # 4类
- POSE_CLASS_5 = 5 # 5类
- e_pose_class = POSE_CLASS_E.POSE_CLASS_4
- # 姿态
- class POSE_E(Enum):
- POSE_INVALID = -1 # 无效值
- POSE_0 = 0 # 躺(跌倒)
- POSE_1 = 1 # 坐在椅子上
- POSE_2 = 2 # 坐在地上
- POSE_3 = 3 # 蹲
- POSE_4 = 4 # 站
- POSE_5 = 5 # 坐
- POSE_6 = 6 # 躺在沙发上
- POSE_7 = 7 # 躺在其他
- # 实时姿态
- realtime_pose:int = POSE_E.POSE_0.value
- pose_mutex = threading.Lock()
- def get_tracker_targets(point_cloud:list):
- target_point = numpy.mean(point_cloud, axis=0).tolist()
- tracker_targets = []
- tracker_targets.append(target_point)
- return tracker_targets
- # 获取目标target(多个点)
- def get_tracker_targets_mult(point_cloud:list):
- target_point = numpy.mean(point_cloud, axis=0).tolist()
- tracker_targets = []
- tracker_targets.append(target_point)
- return tracker_targets
- # 设备接入响应错误码
- class DEV_EC(int):
- succeed = 0 # 成功
- unauthorized = 401 # 未授权
- forbidden = 403 # 禁止访问,会话已过期
- conflict = 409 # 冲突,重复提交
- # 检查topic
- def check_topic(pattern:str, topic:str) -> bool:
- return bool(re.match(pattern, topic))
|