from datetime import datetime, timedelta import json import common.sys_comm as sys_comm from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, EC, get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s, utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s ) from core.event_type import EventType def get_query_time_range(param: dict, now: datetime = None): """ 根据 param 的 start_time / end_time 生成查询区间,返回格式化后的字符串 规则: - 如果 start_time < end_time,则区间为 [昨天.start_time, 昨天.end_time] - 否则,区间为 [昨天.end_time, 今天.start_time] """ now = now or datetime.now() # 解析时间 start_time = datetime.strptime(param["start_time"], "%H:%M").time() end_time = datetime.strptime(param["end_time"], "%H:%M").time() today = now.date() yesterday = today - timedelta(days=1) if start_time < end_time: # 区间在同一天 (昨天) start_dt = datetime.combine(yesterday, start_time) end_dt = datetime.combine(yesterday, end_time) else: # 跨天 start_dt = datetime.combine(yesterday, end_time) end_dt = datetime.combine(today, start_time) # 返回字符串 return start_dt.strftime("%Y-%m-%d %H:%M:%S"), end_dt.strftime("%Y-%m-%d %H:%M:%S") def normalize_param_time(param: dict, now: datetime = None): """ 把 param['start_time'], param['end_time'] (HH:MM) 转换成昨天的完整日期时间字符串 格式:YYYY-MM-DD HH:MM:SS """ now = now or datetime.now() yesterday = (now - timedelta(days=1)).date() def to_datetime_str(hhmm: str) -> str: t = datetime.strptime(hhmm, "%H:%M").time() dt = datetime.combine(yesterday, t) return dt.strftime("%Y-%m-%d %H:%M:%S") return { "start_time": to_datetime_str(param["start_time"]), "end_time": to_datetime_str(param["end_time"]), } def is_point_in_rect(x, y, rect): """ 判断点 (x, y) 是否在 rect 定义的矩形内 rect 格式: [left, top, w, h] """ if not rect or len(rect) != 4: return False left, top, w, h = rect right = left + w bottom = top + h return left <= x <= right and top <= y <= bottom def region_to_rect(region_str): """ 安全解析 region 字段: - 期望输入为 JSON 字符串 - 合法返回:list[float] (长度为4) 或 [] - 非法返回:[] """ if not region_str: return [] # None 或空串 try: # 尝试 JSON 解析 region = json.loads(region_str) # 允许空列表 [] if region == []: return [] # 必须是列表 if not isinstance(region, list): LOGWARN(f"region_to_rect: not a list, got {region_str}") return [] # 检查长度 if len(region) != 4: LOGWARN(f"region_to_rect: invalid length {len(region)}, data={region_str}") return [] # 检查数值类型 for i, v in enumerate(region): if not isinstance(v, (int, float)): LOGWARN(f"region_to_rect: invalid type at index {i}: {v}") return [] return region # ✅ 合法结果 except json.JSONDecodeError: LOGWARN(f"region_to_rect: JSON decode failed for region={region_str}") return [] except Exception as e: LOGERR(f"region_to_rect: unexpected error: {e}, data={region_str}") return [] def check_plan_rect_valid(event_type: int, rect: list) -> bool: """ 检查指定事件类型是否要求有有效的检测区域。 返回 True 表示 rect 合法;False 表示应跳过。 """ no_rect_event_types = { EventType.CLEAN_EXPIRE_EVENTS.value, EventType.TOILETING_FREQUENCY.value, EventType.NIGHT_TOILETING_FREQUENCY.value, EventType.TOILETING_FREQUENCY_ABNORMAL.value, EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value, EventType.BATHROOM_STAY_FREQUENCY.value, } # 对不依赖检测区域的类型,直接放行 if event_type in no_rect_event_types: return True # 其他事件需要有合法的 rect if not rect or len(rect) != 4: return False return True