123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- from datetime import datetime, timedelta
- import json
- import common.sys_comm as sys_comm
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
- get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
- utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
- )
- from core.event_type import EventType
- def get_query_time_range(param: dict, now: datetime = None):
- """
- 根据 param 的 start_time / end_time 生成查询区间,返回格式化后的字符串
- 规则:
- - 如果 start_time < end_time,则区间为 [昨天.start_time, 昨天.end_time]
- - 否则,区间为 [昨天.end_time, 今天.start_time]
- """
- now = now or datetime.now()
- # 解析时间
- start_time = datetime.strptime(param["start_time"], "%H:%M").time()
- end_time = datetime.strptime(param["end_time"], "%H:%M").time()
- today = now.date()
- yesterday = today - timedelta(days=1)
- if start_time < end_time:
- # 区间在同一天 (昨天)
- start_dt = datetime.combine(yesterday, start_time)
- end_dt = datetime.combine(yesterday, end_time)
- else:
- # 跨天
- start_dt = datetime.combine(yesterday, end_time)
- end_dt = datetime.combine(today, start_time)
- # 返回字符串
- return start_dt.strftime("%Y-%m-%d %H:%M:%S"), end_dt.strftime("%Y-%m-%d %H:%M:%S")
- def normalize_param_time(param: dict, now: datetime = None):
- """
- 把 param['start_time'], param['end_time'] (HH:MM) 转换成昨天的完整日期时间字符串
- 格式:YYYY-MM-DD HH:MM:SS
- """
- now = now or datetime.now()
- yesterday = (now - timedelta(days=1)).date()
- def to_datetime_str(hhmm: str) -> str:
- t = datetime.strptime(hhmm, "%H:%M").time()
- dt = datetime.combine(yesterday, t)
- return dt.strftime("%Y-%m-%d %H:%M:%S")
- return {
- "start_time": to_datetime_str(param["start_time"]),
- "end_time": to_datetime_str(param["end_time"]),
- }
- def is_point_in_rect(x, y, rect):
- """
- 判断点 (x, y) 是否在 rect 定义的矩形内
- rect 格式: [left, top, w, h]
- """
- if not rect or len(rect) != 4:
- return False
- left, top, w, h = rect
- right = left + w
- bottom = top + h
- return left <= x <= right and top <= y <= bottom
- def region_to_rect(region_str):
- """
- 安全解析 region 字段:
- - 期望输入为 JSON 字符串
- - 合法返回:list[float] (长度为4) 或 []
- - 非法返回:[]
- """
- if not region_str:
- return [] # None 或空串
- try:
- # 尝试 JSON 解析
- region = json.loads(region_str)
- # 允许空列表 []
- if region == []:
- return []
- # 必须是列表
- if not isinstance(region, list):
- LOGWARN(f"region_to_rect: not a list, got {region_str}")
- return []
- # 检查长度
- if len(region) != 4:
- LOGWARN(f"region_to_rect: invalid length {len(region)}, data={region_str}")
- return []
- # 检查数值类型
- for i, v in enumerate(region):
- if not isinstance(v, (int, float)):
- LOGWARN(f"region_to_rect: invalid type at index {i}: {v}")
- return []
- return region # ✅ 合法结果
- except json.JSONDecodeError:
- LOGWARN(f"region_to_rect: JSON decode failed for region={region_str}")
- return []
- except Exception as e:
- LOGERR(f"region_to_rect: unexpected error: {e}, data={region_str}")
- return []
- def check_plan_rect_valid(event_type: int, rect: list) -> bool:
- """
- 检查指定事件类型是否要求有有效的检测区域。
- 返回 True 表示 rect 合法;False 表示应跳过。
- """
- required_rect_event_types = {
- EventType.STAY_DETECTION.value, # 停留事件
- EventType.RETENTION_DETECTION.value, # 滞留事件
- EventType.TOILETING_DETECTION.value, # 如厕事件
- EventType.TARGET_ABSENCE.value, # 异常消失
- EventType.SLEEP_MONITORING.value, # 睡眠检测
- }
- # 对不依赖检测区域的类型,直接放行
- if event_type not in required_rect_event_types:
- return True
- # 其他事件需要有合法的 rect
- if not rect or len(rect) != 4:
- return False
- return True
|