alarm_plan_helper.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from datetime import datetime, timedelta
  2. import json
  3. import common.sys_comm as sys_comm
  4. from common.sys_comm import (
  5. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  6. get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
  7. utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
  8. )
  9. from core.event_type import EventType
  10. def get_query_time_range(param: dict, now: datetime = None):
  11. """
  12. 根据 param 的 start_time / end_time 生成查询区间,返回格式化后的字符串
  13. 规则:
  14. - 如果 start_time < end_time,则区间为 [昨天.start_time, 昨天.end_time]
  15. - 否则,区间为 [昨天.end_time, 今天.start_time]
  16. """
  17. now = now or datetime.now()
  18. # 解析时间
  19. start_time = datetime.strptime(param["start_time"], "%H:%M").time()
  20. end_time = datetime.strptime(param["end_time"], "%H:%M").time()
  21. today = now.date()
  22. yesterday = today - timedelta(days=1)
  23. if start_time < end_time:
  24. # 区间在同一天 (昨天)
  25. start_dt = datetime.combine(yesterday, start_time)
  26. end_dt = datetime.combine(yesterday, end_time)
  27. else:
  28. # 跨天
  29. start_dt = datetime.combine(yesterday, end_time)
  30. end_dt = datetime.combine(today, start_time)
  31. # 返回字符串
  32. return start_dt.strftime("%Y-%m-%d %H:%M:%S"), end_dt.strftime("%Y-%m-%d %H:%M:%S")
  33. def normalize_param_time(param: dict, now: datetime = None):
  34. """
  35. 把 param['start_time'], param['end_time'] (HH:MM) 转换成昨天的完整日期时间字符串
  36. 格式:YYYY-MM-DD HH:MM:SS
  37. """
  38. now = now or datetime.now()
  39. yesterday = (now - timedelta(days=1)).date()
  40. def to_datetime_str(hhmm: str) -> str:
  41. t = datetime.strptime(hhmm, "%H:%M").time()
  42. dt = datetime.combine(yesterday, t)
  43. return dt.strftime("%Y-%m-%d %H:%M:%S")
  44. return {
  45. "start_time": to_datetime_str(param["start_time"]),
  46. "end_time": to_datetime_str(param["end_time"]),
  47. }
  48. def is_point_in_rect(x, y, rect):
  49. """
  50. 判断点 (x, y) 是否在 rect 定义的矩形内
  51. rect 格式: [left, top, w, h]
  52. """
  53. if not rect or len(rect) != 4:
  54. return False
  55. left, top, w, h = rect
  56. right = left + w
  57. bottom = top + h
  58. return left <= x <= right and top <= y <= bottom
  59. def region_to_rect(region_str):
  60. """
  61. 安全解析 region 字段:
  62. - 期望输入为 JSON 字符串
  63. - 合法返回:list[float] (长度为4) 或 []
  64. - 非法返回:[]
  65. """
  66. if not region_str:
  67. return [] # None 或空串
  68. try:
  69. # 尝试 JSON 解析
  70. region = json.loads(region_str)
  71. # 允许空列表 []
  72. if region == []:
  73. return []
  74. # 必须是列表
  75. if not isinstance(region, list):
  76. LOGWARN(f"region_to_rect: not a list, got {region_str}")
  77. return []
  78. # 检查长度
  79. if len(region) != 4:
  80. LOGWARN(f"region_to_rect: invalid length {len(region)}, data={region_str}")
  81. return []
  82. # 检查数值类型
  83. for i, v in enumerate(region):
  84. if not isinstance(v, (int, float)):
  85. LOGWARN(f"region_to_rect: invalid type at index {i}: {v}")
  86. return []
  87. return region # ✅ 合法结果
  88. except json.JSONDecodeError:
  89. LOGWARN(f"region_to_rect: JSON decode failed for region={region_str}")
  90. return []
  91. except Exception as e:
  92. LOGERR(f"region_to_rect: unexpected error: {e}, data={region_str}")
  93. return []
  94. def check_plan_rect_valid(event_type: int, rect: list) -> bool:
  95. """
  96. 检查指定事件类型是否要求有有效的检测区域。
  97. 返回 True 表示 rect 合法;False 表示应跳过。
  98. """
  99. no_rect_event_types = {
  100. EventType.CLEAN_EXPIRE_EVENTS.value,
  101. EventType.TOILETING_FREQUENCY.value,
  102. EventType.NIGHT_TOILETING_FREQUENCY.value,
  103. EventType.TOILETING_FREQUENCY_ABNORMAL.value,
  104. EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value,
  105. EventType.BATHROOM_STAY_FREQUENCY.value,
  106. }
  107. # 对不依赖检测区域的类型,直接放行
  108. if event_type in no_rect_event_types:
  109. return True
  110. # 其他事件需要有合法的 rect
  111. if not rect or len(rect) != 4:
  112. return False
  113. return True