nifangxu 1 сар өмнө
parent
commit
93f7f7e40b

+ 0 - 0
0_告警联动服务


+ 213 - 0
LAS.py

@@ -0,0 +1,213 @@
+#!/usr/bin/env python3
+
+from threading import Thread
+import configparser
+import os
+import time
+import traceback
+import sys
+import json
+
+import common.sys_comm as sys_comm
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from common.sys_comm import get_utc_time_ms
+from common.sys_comm import g_sys_conf, g_sys_conf_mtx, g_log_conf, g_log_conf_mtx
+
+from mqtt import mqtt_process
+from mqtt.mqtt_process import MQTTClientThread, MQTTConsumerThread, mqtt_client, mqtt_consumer
+
+import db.db_process as db_process
+from db.db_process import db_req_que
+from db.db_process import DBRequest
+import db.db_sqls as sqls
+
+from device.dev_mng import (
+    cb_handle_query_all_dev_info
+)
+
+
+import core.alarm_plan_manager as ap_mgr
+from core.alarm_plan_manager import(
+    AlarmPlanManager,
+    init_alarm_plan_mgr,
+    start_event_dispatcher,
+    start_alarm_plan_mgr,
+    cb_handle_query_all_alarm_plan_info
+)
+
+import core.g_LAS as g_las
+
+
+
+# 系统初始化
+def sys_init():
+    try:
+        # 创建日志目录
+        if not os.path.exists("./log/"):
+            os.makedirs("./log/")
+        print("create log dir succeed !")
+
+        LOGDBG(f" ================ system init ...")
+        print(f" ================ system init ...")
+
+        # 读取配置文件
+        config = configparser.ConfigParser()
+        with open('./conf.ini', 'r', encoding='utf-8') as f:
+            config.read_file(f)
+            if not (config.has_option('conf', 'module_name') and
+                    config.has_option('conf', 'platform') and
+                    config.has_option('conf', 'db_host') and
+                    config.has_option('conf', 'log_lvl') and
+                    config.has_option('conf', 'max_log_files') and
+                    config.has_option('conf', 'max_log_files')):
+                LOGDBG("sys_init failed, invalid conf.ini param")
+                return 0
+
+            if not (config.has_option('conf', 'db_host')):
+                LOGDBG("sys_init failed, invalid db param")
+                return 0
+
+            if not (config.has_option('linux', 'host_ip') and 
+                    config.has_option('windows', 'host_ip') and 
+                    config.has_option('windows', 'server_ip') and 
+                    config.has_option('windows', 'ssh_host') and 
+                    config.has_option('windows', 'ssh_port')):
+                LOGDBG("sys_init failed, invalid host_ip param")
+                return 0
+        LOGDBG("read conf.ini succeed !")
+
+        
+
+        # 初始化日志配置
+        with g_log_conf_mtx:
+            g_log_conf["module_name"]     = str(config["conf"]["module_name"])
+            g_log_conf["log_lvl"]         = int(config["conf"]["log_lvl"])
+            g_log_conf["max_log_size"]    = int(config["conf"]["max_log_size"]) * 1024 * 1024
+            g_log_conf["max_log_files"]   = int(config["conf"]["max_log_files"])
+        LOGDBG("log init succeed")
+
+        # 初始化系统配置
+        with g_sys_conf_mtx:
+            g_sys_conf["module_name"]     = str(config["conf"]["module_name"])
+            g_sys_conf["platform"]        = int(config["conf"]["platform"])
+            g_sys_conf["db_host"]         = str(config["conf"]["db_host"])
+            g_sys_conf["log_lvl"]         = int(config["conf"]["log_lvl"])
+            g_sys_conf["max_log_size"]    = int(config["conf"]["max_log_size"]) * 1024 * 1024
+            g_sys_conf["max_log_files"]   = int(config["conf"]["max_log_files"])
+
+            # windows 本地
+            if g_sys_conf["platform"] == 0:
+                g_sys_conf["host_ip"]     = str(config["windows"]["host_ip"])
+                g_sys_conf["server_ip"]   = str(config["windows"]["server_ip"])
+                g_sys_conf["ssh_host"]    = str(config["windows"]["ssh_host"])
+                g_sys_conf["ssh_port"]    = int(config["windows"]["ssh_port"])
+                mqtt_process.MQTT_BROKER = g_sys_conf["server_ip"]
+            # linux 服务器
+            elif g_sys_conf["platform"] == 1:
+                g_sys_conf["host_ip"]     = str(config["linux"]["host_ip"])
+                mqtt_process.MQTT_BROKER = g_sys_conf["host_ip"]
+
+        g_sys_conf["sp_id"] = int(get_utc_time_ms())
+
+        # 报警配置
+        g_sys_conf["alarm_conf"] = sys_comm.alarm_conf
+
+
+        # 启动成功,打印系统信息
+        module_name     = g_sys_conf["module_name"]
+        platform        = g_sys_conf["platform"]
+        host_ip         = g_sys_conf["host_ip"]
+        max_log_files   = g_sys_conf["max_log_files"]
+        max_log_size    = g_sys_conf["max_log_size"]
+        log_lvl         = g_sys_conf["log_lvl"]
+        sp_id           = g_sys_conf["sp_id"]
+        LOGINFO(f" ================ system init succeed !")
+        LOGINFO(f" ================ module         : {module_name}")
+        LOGINFO(f" ================ platform       : {platform}")
+        LOGINFO(f" ================ host_ip        : {host_ip}")
+        LOGINFO(f" ================ max_log_files  : {max_log_files}")
+        LOGINFO(f" ================ max_log_size   : {max_log_size}")
+        LOGINFO(f" ================ log_lvl        : {log_lvl}")
+        LOGINFO(f" ================ sp_id          : {sp_id}")
+        LOGINFO(f" ================ version        : v B1.0")
+
+        print(f" ================ system init succeed !")
+        print(f" ================ module         : {module_name}")
+        print(f" ================ platform       : {platform}")
+        print(f" ================ host_ip        : {host_ip}")
+        print(f" ================ max_log_files  : {max_log_files}")
+        print(f" ================ max_log_size   : {max_log_size}")
+        print(f" ================ log_lvl        : {log_lvl}")
+        print(f" ================ sp_id          : {sp_id}")
+        print(f" ================ version        : v B1.0")
+
+        return 0
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        return -1
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+        return -1
+
+
+
+
+
+
+# 轮循函数,定期执行一些任务
+def run():
+    # 轮循处理任务
+    while True:
+        time.sleep(1)
+
+
+# 主线程
+def main_process():
+    if not g_las.g_alarm_plan_mgr:
+        return -1
+
+    # 查询所有设备信息
+    db_req_que.put(DBRequest(sql=sqls.sql_query_all_dev_info,
+                             callback=cb_handle_query_all_dev_info))
+
+    # 查询所有告警计划
+    db_req_que.put(DBRequest(sql=sqls.sql_query_all_alarm_plan,
+                             callback=cb_handle_query_all_alarm_plan_info))
+
+    # 轮循函数
+    run()
+
+
+def main():
+
+    # 初始化
+    if (0 != sys_init()):
+        sys.exit(-1)
+
+    # 初始化LAS
+    init_alarm_plan_mgr()
+
+    # 数据库处理线程
+    db_process.create_db_process().start()
+
+    # MQTT 消息线程
+    mqtt_client = MQTTClientThread()
+    mqtt_client.start()
+    mqtt_consumer = MQTTConsumerThread()
+    mqtt_consumer.start()
+
+    # 事件分发器
+    start_event_dispatcher()
+    # 告警计划管理器
+    start_alarm_plan_mgr()
+
+    # 主线程
+    main_process()
+
+
+main()

+ 0 - 0
__init__.py


+ 238 - 0
common/sys_comm.py

@@ -0,0 +1,238 @@
+from datetime import datetime, timezone, timedelta
+import os
+from enum import Enum
+import numpy
+import threading
+import re
+
+alarm_conf = {
+    "retention_time": 60,
+    "retention_keep_time": 30,
+    "retention_alarm_time": 180,
+    "toilet": {
+        "retention_time": 60,
+        "retention_keep_time": 30,
+        "retention_alarm_time": 900
+    }
+}
+
+# 系统配置
+g_sys_conf_mtx = threading.Lock()
+g_sys_conf = {
+    "module_name": "LAS",
+    "host_ip": "10.206.0.8",
+    "platform": 0,          # 平台,0:windows本地,1:云服务器
+    "sp_id": "",     # 服务程序id
+
+    # 数据库相关参数
+    "db_host": "localhost",
+    "db_username": "root",
+    "db_password": "Hfln@1024",
+
+    # ssh
+    "ssh_host": "119.45.12.173",
+    "ssh_port": 22,
+    "ssh_username": "root",
+    "ssh_password": "Hfln@667788",
+}
+
+# 日志配置
+g_log_conf_mtx = threading.Lock()
+g_log_conf = {
+    "module_name": "LAS",
+    "log_path": "./log/",
+    "log_lvl": 0,
+    "max_log_size": 50 * 1024 * 1024,
+    "max_log_files": 10,
+}
+
+# 日志文件锁
+g_log_file_mtx = threading.Lock()
+
+# 获取当前utc时间(毫秒)
+def get_utc_time_ms():
+    now = datetime.now()
+    utc_time = now.astimezone(timezone.utc)
+    utc_timestamp_ms = int(utc_time.timestamp() * 1000)
+    return utc_timestamp_ms
+
+# 获取当前utc时间(秒)
+def get_utc_time_s():
+    now = datetime.now()
+    utc_time = now.astimezone(timezone.utc)
+    utc_timestamp_s = int(utc_time.timestamp())
+    return utc_timestamp_s
+
+# 获取当前北京时间(毫秒)
+def get_bj_time_ms():
+    now = datetime.now()
+    return now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+
+# 获取当前北京时间(秒)
+def get_bj_time_s():
+    now = datetime.now()
+    return now.strftime("%Y-%m-%d %H:%M:%S")
+
+# utc时间转北京时间(秒)
+def utc_to_bj_s(utc_ts:int) -> str:
+    utc_seconds = utc_ts
+    utc_time = datetime.fromtimestamp(utc_seconds, tz=timezone.utc)
+    bj_time = utc_time.astimezone(timezone(timedelta(hours=8)))
+
+    return bj_time.strftime("%Y-%m-%d %H:%M:%S")
+
+# 时区
+UTC_TZ = timezone.utc
+BJ_TZ  = timezone(timedelta(hours=8))
+
+
+# 毫秒 UTC → 北京时间
+def utc_to_bj_ms(utc_ms: int) -> str:
+    dt_utc = datetime.fromtimestamp(utc_ms / 1000, tz=UTC_TZ)
+    dt_bj  = dt_utc.astimezone(BJ_TZ)
+    return dt_bj.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]  # 保留毫秒
+
+# 毫秒 北京时间 → UTC
+def bj_to_utc_ms(bj_str: str) -> int:
+    """
+    bj_str 格式: "YYYY-MM-DD HH:MM:SS.sss"
+    """
+    dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S.%f")
+    dt_bj = dt_bj.replace(tzinfo=BJ_TZ)
+    dt_utc = dt_bj.astimezone(UTC_TZ)
+    return int(dt_utc.timestamp() * 1000)
+
+# 秒 UTC → 北京时间
+def utc_to_bj_s(utc_s: int) -> str:
+    dt_utc = datetime.fromtimestamp(utc_s, tz=UTC_TZ)
+    dt_bj  = dt_utc.astimezone(BJ_TZ)
+    return dt_bj.strftime("%Y-%m-%d %H:%M:%S")
+
+# 秒 北京时间 → UTC
+def bj_to_utc_s(bj_str: str) -> int:
+    """
+    bj_str 格式: "YYYY-MM-DD HH:MM:SS"
+    """
+    dt_bj = datetime.strptime(bj_str, "%Y-%m-%d %H:%M:%S")
+    dt_bj = dt_bj.replace(tzinfo=BJ_TZ)
+    dt_utc = dt_bj.astimezone(UTC_TZ)
+    return int(dt_utc.timestamp())
+
+
+
+# 控制日志文件数量
+def manage_log_files(max_files, log_path:str):
+    log_files = sorted(
+        (f for f in os.listdir(log_path) if f.endswith(".log")),
+        key=lambda x: os.path.getctime(os.path.join(log_path, x))
+    )
+    
+    while len(log_files) > max_files:
+        oldest_file = log_files.pop(0)
+        os.remove(os.path.join(log_path, oldest_file))
+
+# 打印日志
+def LOG(text='', title=''):
+    with g_log_conf_mtx:
+        module_name     = g_log_conf["module_name"]
+        log_path        = g_log_conf["log_path"]
+        max_log_size    = g_log_conf["max_log_size"]
+        max_files       = g_log_conf["max_log_files"]
+    formatted_now = get_bj_time_ms()
+
+    file = f"{module_name}.log"
+    with g_log_file_mtx:
+        log_file = os.path.join(log_path, file)
+
+        # 检查日志文件大小并处理文件重命名
+        if os.path.exists(log_file) and os.path.getsize(log_file) >= max_log_size:
+            timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
+            new_log_file = f"{log_path}{module_name}_{timestamp}.log"
+            os.rename(log_file, new_log_file)
+
+            # 控制日志文件数量
+            manage_log_files(max_files, log_path)
+
+        # 写入日志内容
+        with open(log_file, 'a') as f:
+            f.write('[%s][%s]:%s\n' % (formatted_now, title, text))
+
+# DEBUG日志, 等级0
+def LOGDBG(text=''):
+    if g_log_conf["log_lvl"] <= 0:
+        LOG(text=text, title="DEBUG")
+
+# INFO日志,等级1
+def LOGINFO(text=''):
+    if g_log_conf["log_lvl"] <= 1:
+        LOG(text=text, title="INFO")
+
+# WARN日志,等级2
+def LOGWARN(text=''):
+    if g_log_conf["log_lvl"] <= 2:
+        LOG(text=text, title="WARN")
+
+# ERROR日志,等级3
+def LOGERR(text=''):
+    if g_log_conf["log_lvl"] <= 3:
+        LOG(text=text, title="ERROR")
+
+
+
+# 模型
+class MODEL_E(Enum):
+    MODEL_LIBO = 0  # 李博
+    MODEL_ANDA = 1  # 安大
+e_model = MODEL_E.MODEL_LIBO
+
+# 姿态分类
+class POSE_CLASS_E(Enum):
+    POSE_CLASS_3 = 3  # 3类
+    POSE_CLASS_4 = 4  # 4类
+    POSE_CLASS_5 = 5  # 5类
+e_pose_class = POSE_CLASS_E.POSE_CLASS_4
+
+# 姿态
+class POSE_E(Enum):
+    POSE_INVALID = -1 # 无效值
+    POSE_0 = 0  # 躺(跌倒)
+    POSE_1 = 1  # 坐在椅子上
+    POSE_2 = 2  # 坐在地上
+    POSE_3 = 3  # 蹲
+    POSE_4 = 4  # 站
+    POSE_5 = 5  # 坐
+    POSE_6 = 6  # 躺在沙发上
+    POSE_7 = 7  # 躺在其他
+
+# 实时姿态
+realtime_pose:int = POSE_E.POSE_0.value
+pose_mutex = threading.Lock()
+
+
+
+def get_tracker_targets(point_cloud:list):
+    target_point = numpy.mean(point_cloud, axis=0).tolist()
+    tracker_targets = []
+    tracker_targets.append(target_point)
+    return tracker_targets
+
+# 获取目标target(多个点)
+def get_tracker_targets_mult(point_cloud:list):
+    target_point = numpy.mean(point_cloud, axis=0).tolist()
+    tracker_targets = []
+    tracker_targets.append(target_point)
+    return tracker_targets
+
+
+
+# 设备接入响应错误码
+class DEV_EC(int):
+    succeed         = 0 # 成功
+    unauthorized    = 401   # 未授权
+    forbidden       = 403   # 禁止访问,会话已过期
+    conflict        = 409   # 冲突,重复提交
+
+
+# 检查topic
+def check_topic(pattern:str, topic:str) -> bool:
+    return bool(re.match(pattern, topic))

+ 41 - 0
conf.ini

@@ -0,0 +1,41 @@
+[conf]
+; 模块名称
+module_name = LAS
+
+; 平台,0:windows本地,1:云服务器
+platform = 0
+
+; 数据库
+db_host = localhost
+
+; 日志等级,从低到高(0-3, DEBUG, INFO, WARN, ERROR)
+log_lvl = 0
+; 日志文件大小(MB)
+max_log_size = 50
+; 日志文件数量
+max_log_files = 10
+
+[windows]
+; 本地ip
+host_ip = 192.168.1.17
+; 生产
+; server_ip = 119.45.12.173
+; 测试
+server_ip = 43.137.10.199
+
+; ssh
+; 生产
+; ssh_host = 119.45.12.173
+; 测试
+ssh_host = 43.137.10.199
+ssh_port = 22
+
+[linux]
+; 生产
+host_ip = 10.206.0.8
+; 测试
+; host_ip = 10.206.0.15
+
+
+
+

+ 0 - 0
core/alarm_checker.py


+ 24 - 0
core/alarm_event.py

@@ -0,0 +1,24 @@
+from typing import List, Tuple, Optional
+from datetime import datetime, time, date
+from threading import Thread, Lock
+from enum import Enum
+
+from core.event_type import EventType
+
+class AlarmEvent:
+    def __init__(self,
+                 event_id: str,
+                 dev_id: str,
+                 event_type: EventType,
+                 timestamp: datetime,
+                 region: list,
+                 target: list
+                 ):
+        self.lock_          = Lock()
+        self.event_id_      = event_id
+        self.dev_id_        = dev_id
+        self.event_type_    = event_type
+        self.timestamp_     = timestamp
+        self.region_        = region
+
+        self.target_        = target

+ 476 - 0
core/alarm_plan.py

@@ -0,0 +1,476 @@
+from typing import List, Tuple, Optional
+from datetime import datetime, time, date
+from threading import Thread, Lock
+from enum import Enum
+import uuid
+import json
+import traceback
+from datetime import datetime, timezone, timedelta
+
+from common.sys_comm import (
+    LOGDBG, LOGINFO, LOGWARN, LOGERR,
+    get_utc_time_ms, get_utc_time_s, get_bj_time_ms, get_bj_time_s,
+    utc_to_bj_ms, bj_to_utc_ms, utc_to_bj_s, bj_to_utc_s
+)
+
+from core.time_plan import TimePlan
+from core.event_type import EventType, event_desc_map
+import core.g_LAS as g_las
+from device.dev_mng import (
+    Device,
+    dev_map_push, dev_map_pop, dev_map_find, dev_map_delete
+)
+
+from db.db_process import (
+    db_req_que, DBRequest
+)
+import db.db_sqls as sqls
+import mqtt.mqtt_send as mqtt_send
+from device.dev_mng import g_dev_map, g_dev_map_lock
+
+
+class EventAttr_Base:
+    def __init__(self):
+        return
+
+# 事件属性 事件事件
+class EventAttr_StayDetection(EventAttr_Base):
+    def __init__(self):
+        self.enter_ts_: int  = -1    # 进入时间(s)
+        self.leave_ts_: int  = -1    # 离开时间(s)
+        self.stay_time_: int = -1    # 停留时长(s)
+        return
+
+    def reset(self):
+        self.enter_ts_  = -1
+        self.leave_ts_  = -1
+        self.stay_time_ = -1
+
+
+# 事件属性 滞留事件
+class EventAttr_RetentionDetection(EventAttr_Base):
+    def __init__(self):
+        self.enter_ts_: int  = -1    # 进入时间(s)
+        self.leave_ts_: int  = -1    # 离开时间(s)
+        self.stay_time_: int = -1    # 停留时长(s)
+        return
+
+    def reset(self):
+        self.enter_ts_  = -1
+        self.leave_ts_  = -1
+        self.stay_time_ = -1
+
+
+# 事件属性 如厕事件
+class EventAttr_ToiletingDetection(EventAttr_Base):
+    def __init__(self):
+        self.enter_ts_: int  = -1    # 进入时间(ms)
+        self.leave_ts_: int  = -1    # 离开时间(ms)
+        self.stay_time_: int = -1    # 停留时长(ms)
+        return
+
+    def reset(self):
+        self.enter_ts_  = -1
+        self.leave_ts_  = -1
+        self.stay_time_ = -1
+
+class AlarmPlan:
+    def __init__(self,
+                 plan_uuid: str,
+                 name: str,
+                 dev_id: str,
+                 enable: bool,
+                 time_plan: TimePlan,
+                 rect: list,
+                 event_type: int,
+                 threshold_time: int,
+                 merge_time: int
+                 ):
+        self.lock_          = Lock()
+        self.plan_uuid_     = plan_uuid     # 计划id
+        self.name_          = name          # 计划名称
+        self.dev_id_        = dev_id        # 设备id
+        self.enable_        = enable        # 是否启用
+        self.time_plan_     = time_plan     # 时间计划
+
+        # 维护状态(根据TimePlanu判断)
+        self.status_ = 0     # 0未激活,1激活,-1过期
+        self.status_update_ts_ = -1   # 状态更新时间,初始值为-1
+
+        # 事件属性表
+        self.event_attr_map = {
+            EventType.STAY_DETECTION.value: EventAttr_StayDetection,
+            EventType.RETENTION_DETECTION.value: EventAttr_RetentionDetection,
+            EventType.TOILETING_DETECTION.value: EventAttr_ToiletingDetection
+        }
+
+        # 事件触发参数
+        self.rect_          = rect        # 检测区域  [left, top, width, height]
+        self.threshold_time_    = threshold_time    # 触发时间阈值
+        self.merge_time_    = merge_time    # 归并时间窗口
+        self.event_type_    = event_type    # 事件类型
+        self.event_attr_    = self.init_event_attr()    # 事件属性
+        if self.event_attr_ is None:
+            raise ValueError(f"Invalid event_type: {event_type}")
+
+
+    def execute(self):
+        if self.status_ != 1:
+            return
+        g_las.g_event_dispatcher.dispatch(self.event_type_, self)
+
+
+    # 更新激活状态
+    def update_status(self, now: Optional[datetime] = None) -> None:
+        now = now or datetime.now()
+        old_status = self.status_
+
+        if not self.enable_:
+            self.status_ = 0
+        else:
+            now_fmt = now.strftime("%Y-%m-%d")
+            # 过期
+            if now_fmt > self.time_plan_.stop_date_:
+                self.status_ = -1
+            elif now_fmt < self.time_plan_.start_date_:
+                self.status_ = 0
+            elif self.time_plan_.is_active_now(now):
+                self.status_ = 1
+            else:
+                self.status_ = 0
+
+        if self.status_ != old_status:
+            self.status_update_ts = int(now.timestamp())
+            LOGINFO(f"[Status] plan {self.plan_uuid_} status_ changed {old_status} -> {self.status_}")
+
+
+    def is_point_in_rect(self, x:float, y:float, rect:list) -> bool:
+        rx, ry, rw, rh = rect
+        x_min = min(rx, rx + rw)
+        x_max = max(rx, rx + rw)
+        y_min = min(ry, ry - rh)
+        y_max = max(ry, ry - rh)
+        bRet: bool = x_min <= x <= x_max and y_min <= y <= y_max
+        return bRet
+
+    # 查找最近 t 秒内,最后一个落在 rect 内的 target_point 的 rtd_unit
+    def find_latest_rtd_in_region(self, device: Device, rect: list, now: int=None, t: int=1):
+        now_s = now if now else get_utc_time_s()
+        rtd_que_copy = device.get_rtd_que_copy()
+        with self.lock_:
+            for rtd_unit in reversed(rtd_que_copy):  # 倒序扫描
+                ts_s = int(rtd_unit.get("timestamp", 0))
+                if now_s - ts_s > t:
+                    break  # 已经超过 t 秒,可以直接结束
+                # 检查点是否在区域内
+                for pt in rtd_unit.get("target_point", []):
+                    if len(pt) >= 2:
+                        x, y = pt[0], pt[1]
+                        if self.is_point_in_rect(x, y, rect):
+                            return rtd_unit
+        return None
+
+
+
+    # 初始化事件属性
+    def init_event_attr(self):
+        event_cls = self.event_attr_map.get(self.event_type_)
+        if event_cls is None:
+            return None
+
+        event_attr    = event_cls()
+        return event_attr
+
+
+    # 停留事件
+    def handle_stay_detection(self):
+        try:
+            dev_id = self.dev_id_
+            device:Device = dev_map_find(dev_id)
+            if not device:
+                return
+            now = get_utc_time_s()
+
+            # 查找最新的落在检测区域的目标
+            rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
+            if rtd_unit:
+                timestamp = rtd_unit["timestamp"]
+                pose = rtd_unit["pose"]
+                target_point = rtd_unit["target_point"]
+
+                if self.event_attr_.enter_ts_ == -1:
+                    self.event_attr_.enter_ts_ = timestamp
+                else:
+                    self.event_attr_.leave_ts_ = timestamp
+
+            if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
+                return
+
+            # 归并时间内,不认为事件结束
+            if now - self.event_attr_.leave_ts_  < self.merge_time_:
+                return
+
+            self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
+            stay_time =self.event_attr_.stay_time_
+            # 时间小于触发时间阈值,忽略并重置
+            if stay_time < self.threshold_time_ :
+                self.event_attr_.reset()
+                LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
+                return
+
+            # 构造事件
+            # 入库
+            info = {
+                "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "stay_time": stay_time
+
+            }
+            event_uuid = str(uuid.uuid4())
+            params = {
+                "dev_id": dev_id,
+                "uuid": event_uuid,
+                "plan_uuid": self.plan_uuid_,
+                "event_type": event_desc_map[self.event_type_],
+                "info": json.dumps(info),
+                "is_handle": 0,
+                "is_deleted": 0
+            }
+            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+            # 通知
+            mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
+            LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
+
+            self.event_attr_.reset()
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+    # 滞留事件
+    def handle_retention_detection(self):
+        try:
+            dev_id = self.dev_id_
+            device:Device = dev_map_find(dev_id)
+            if not device:
+                return
+            now = get_utc_time_s()
+
+            # 查找最新的落在检测区域的目标
+            rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
+            if rtd_unit:
+                timestamp = rtd_unit["timestamp"]
+                pose = rtd_unit["pose"]
+                target_point = rtd_unit["target_point"]
+
+                if self.event_attr_.enter_ts_ == -1:
+                    self.event_attr_.enter_ts_ = timestamp
+                else:
+                    self.event_attr_.leave_ts_ = timestamp
+
+            if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
+                return
+
+            # 归并时间内,不认为事件结束
+            if now - self.event_attr_.leave_ts_  < self.merge_time_:
+                return
+
+            self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
+            stay_time =self.event_attr_.stay_time_
+            # 时间小于触发时间阈值,忽略并重置
+            if stay_time < self.threshold_time_ :
+                self.event_attr_.reset()
+                LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
+                return
+
+            # 构造事件
+            # 入库
+            info = {
+                "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "stay_time": stay_time
+
+            }
+            event_uuid = str(uuid.uuid4())
+            params = {
+                "dev_id": dev_id,
+                "uuid": event_uuid,
+                "plan_uuid": self.plan_uuid_,
+                "event_type": event_desc_map[self.event_type_],
+                "info": json.dumps(info),
+                "is_handle": 0,
+                "is_deleted": 0
+            }
+            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+            # 通知
+            mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
+            LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
+
+            self.event_attr_.reset()
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+    # 如厕事件
+    def handle_toileting_detection(self):
+        try:
+            dev_id = self.dev_id_
+            device:Device = dev_map_find(dev_id)
+            if not device:
+                return
+            now = get_utc_time_s()
+
+            # 查找最新的落在检测区域的目标
+            rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
+            if rtd_unit:
+                timestamp = rtd_unit["timestamp"]
+                pose = rtd_unit["pose"]
+                target_point = rtd_unit["target_point"]
+
+                if self.event_attr_.enter_ts_ == -1:
+                    self.event_attr_.enter_ts_ = timestamp
+                else:
+                    self.event_attr_.leave_ts_ = timestamp
+
+            if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
+                return
+
+            # 归并时间内,不认为事件结束
+            if now - self.event_attr_.leave_ts_  < self.merge_time_:
+                return
+
+            self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
+            stay_time =self.event_attr_.stay_time_
+            # 时间小于触发时间阈值,忽略并重置
+            if stay_time < self.threshold_time_ :
+                self.event_attr_.reset()
+                LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
+                return
+
+            # 构造事件
+            # 入库
+            info = {
+                "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "stay_time": stay_time
+
+            }
+            event_uuid = str(uuid.uuid4())
+            params = {
+                "dev_id": dev_id,
+                "uuid": event_uuid,
+                "plan_uuid": self.plan_uuid_,
+                "event_type": event_desc_map[self.event_type_],
+                "info": json.dumps(info),
+                "is_handle": 0,
+                "is_deleted": 0
+            }
+            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+            # 通知
+            mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
+            LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
+
+            self.event_attr_.reset()
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+
+    # 如厕频次统计
+    def handle_toileting_frequency(self):
+        try:
+            dev_id = self.dev_id_
+            device:Device = dev_map_find(dev_id)
+            if not device:
+                return
+            now = get_utc_time_s()
+
+            # 查找最新的落在检测区域的目标
+            rtd_unit = self.find_latest_rtd_in_region(device, self.rect_, now, 3)
+            if rtd_unit:
+                timestamp = rtd_unit["timestamp"]
+                pose = rtd_unit["pose"]
+                target_point = rtd_unit["target_point"]
+
+                if self.event_attr_.enter_ts_ == -1:
+                    self.event_attr_.enter_ts_ = timestamp
+                else:
+                    self.event_attr_.leave_ts_ = timestamp
+
+            if self.event_attr_.enter_ts_ == -1 or self.event_attr_.leave_ts_ == -1:
+                return
+
+            # 归并时间内,不认为事件结束
+            if now - self.event_attr_.leave_ts_  < self.merge_time_:
+                return
+
+            self.event_attr_.stay_time_ = self.event_attr_.leave_ts_ - self.event_attr_.enter_ts_
+            stay_time =self.event_attr_.stay_time_
+            # 时间小于触发时间阈值,忽略并重置
+            if stay_time < self.threshold_time_ :
+                self.event_attr_.reset()
+                LOGINFO(f"less than threshold_time, alarm_plan: {self.plan_uuid_}, event_type: {self.event_type_}")
+                return
+
+            # 构造事件
+            # 入库
+            info = {
+                "enter_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "leave_time": utc_to_bj_s(self.event_attr_.enter_ts_),
+                "stay_time": stay_time
+
+            }
+            event_uuid = str(uuid.uuid4())
+            params = {
+                "dev_id": dev_id,
+                "uuid": event_uuid,
+                "plan_uuid": self.plan_uuid_,
+                "event_type": event_desc_map[self.event_type_],
+                "info": json.dumps(info),
+                "is_handle": 0,
+                "is_deleted": 0
+            }
+            db_req_que.put(DBRequest(sql=sqls.sql_insert_events, params=params, callback=None))
+
+            # 通知
+            mqtt_send.alarm_event(dev_id, event_uuid, self.plan_uuid_, event_desc_map[self.event_type_], info, "events")
+            LOGDBG(f"new event: {event_desc_map[self.event_type_]}, stay_time: {stay_time}")
+
+            self.event_attr_.reset()
+
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+
+
+

+ 230 - 0
core/alarm_plan_manager.py

@@ -0,0 +1,230 @@
+from typing import List, Tuple, Optional
+import time
+from datetime import datetime, date
+import threading
+from threading import Thread, Lock, Event
+from enum import Enum
+import queue
+import json
+import ast
+import traceback
+
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+
+from core.alarm_event import AlarmEvent
+from core.alarm_plan import AlarmPlan
+from core.time_plan import TimePlan
+from core.event_type import EventType
+import core.g_LAS as g_las
+
+
+
+class AlarmPlanManager:
+    def __init__(self, alarm_plan_map: dict = None):
+        self.lock_ = Lock()
+
+        self.alarm_plan_map_ = alarm_plan_map or {}   # <uuid, AlarmPlan>
+
+        self.running = False
+        self.thread = None
+
+    def push(self, plan_uuid: str, alarm_plan: AlarmPlan) -> None:
+        """插入/更新一个 AlarmPlan"""
+        with self.lock_:
+            self.alarm_plan_map_[plan_uuid] = alarm_plan
+
+
+    def pop(self, plan_uuid: str) -> Optional[AlarmPlan]:
+        """弹出并返回 AlarmPlan,如果不存在返回 None"""
+        with self.lock_:
+            return self.alarm_plan_map_.pop(plan_uuid, None)
+
+
+    def find(self, plan_uuid: str) -> Optional[AlarmPlan]:
+        """查找 AlarmPlan,如果不存在返回 None"""
+        with self.lock_:
+            return self.alarm_plan_map_.get(plan_uuid, None)
+
+
+    def delete(self, plan_uuid: str) -> bool:
+        """删除 AlarmPlan,成功返回 True,不存在返回 False"""
+        with self.lock_:
+            if plan_uuid in self.alarm_plan_map_:
+                del self.alarm_plan_map_[plan_uuid]
+                return True
+            return False
+
+
+    def list_all(self) -> list:
+        """返回所有 AlarmPlan(浅拷贝列表)"""
+        with self.lock_:
+            return list(self.alarm_plan_map_.values())
+
+
+    def start_scheduler(self, interval=5):
+        if self.running:
+            return
+        self.running = True
+        self.thread = threading.Thread(target=self._scheduler, args=(interval,), daemon=True)
+        self.thread.start()
+
+    def stop_scheduler(self):
+        self.running = False
+
+    def _scheduler(self, interval):
+        while self.running:
+            plans = self.list_all()
+            plan: AlarmPlan = None
+            for plan in plans:
+                try:
+                    plan.update_status()       # 更新状态
+                    if plan.status_ == 1:       # 激活状态才执行
+                        plan.execute()
+                except Exception as e:
+                    LOGERR(f"[Scheduler] plan {plan.plan_uuid_} error: {e}")
+            time.sleep(interval)
+
+
+class EventDispatcher:
+    def __init__(self):
+        self.queues = {}  # event_type -> Queue
+        self.threads = {}
+        self.running = False
+
+    def start(self, handlers: dict):
+        """handlers: {event_type: handler_func}"""
+        self.running = True
+        for event_type, handler in handlers.items():
+            q = queue.Queue()
+            self.queues[event_type] = q
+
+            t = threading.Thread(target=self.worker, args=(event_type, q, handler), daemon=True)
+            self.threads[event_type] = t
+            t.start()
+
+    def stop(self):
+        self.running = False
+
+    def dispatch(self, event_type: int, plan):
+        if event_type in self.queues:
+            self.queues[event_type].put(plan)
+        else:
+            LOGINFO(f"[Dispatcher] No queue for event_type={event_type}")
+
+    def worker(self, event_type, q: queue.Queue, handler):
+        while self.running:
+            try:
+                plan = q.get(timeout=1)
+                handler(plan)
+            except queue.Empty:
+                continue
+            except Exception as e:
+                LOGERR(f"[Dispatcher] Error in event_type={event_type}: {e}")
+
+
+def init_alarm_plan_mgr():
+    g_las.g_alarm_plan_mgr = AlarmPlanManager()
+    g_las.g_event_dispatcher = EventDispatcher()
+
+
+def start_event_dispatcher():
+    handles = {
+        EventType.STAY_DETECTION.value      : AlarmPlan.handle_stay_detection,
+        EventType.RETENTION_DETECTION.value : AlarmPlan.handle_retention_detection,
+        EventType.TOILETING_DETECTION.value : AlarmPlan.handle_toileting_detection,
+        EventType.TOILETING_FREQUENCY.value : AlarmPlan.handle_toileting_frequency,
+    }
+    g_las.g_event_dispatcher.start(handles)
+
+
+def start_alarm_plan_mgr():
+    g_las.g_alarm_plan_mgr.start_scheduler(interval=1)
+
+
+
+# 将region字典转换为xy平面的rect [left, top, width, height]
+def region_to_rect(region: dict) -> list:
+    x_start, x_stop = region["x_cm_start"], region["x_cm_stop"]
+    y_start, y_stop = region["y_cm_start"], region["y_cm_stop"]
+
+    left = min(x_start, x_stop)
+    right = max(x_start, x_stop)
+    top = min(y_start, y_stop)
+    bottom = max(y_start, y_stop)
+
+    width = right - left
+    height = bottom - top
+
+    return [left, top, width, height]
+
+# 回调函数,处理查询结果:查询所有的告警计划信息
+def cb_handle_query_all_alarm_plan_info(result):
+    try:
+        if result:
+            for row in result:
+                plan_uuid: str  = row["plan_uuid"]
+                plan_name: str  = row["plan_name"]
+                dev_id: str     = row["dev_id"]
+                enable: int     = bool(row["enable"])
+                # region          = row["region"]
+                # rect            = json.loads(region_to_rect(region))
+                rect: list      = json.loads(row["region"])
+                threshold_time: int = row["threshold_time"]
+                merge_time: int     = row["merge_time"]
+
+                event_val: int   = row["event_val"]
+                event_type      = event_val
+                event_str: str   = row["event_str"]
+                event_desc: str  = row["event_desc"]
+
+                start_date  = row["start_date"]
+                stop_date   = row["stop_date"]
+
+                time_range  = json.loads(row["time_range"])
+                month_days = None
+                if row["month_days"]:
+                    month_days = ast.literal_eval(row["mozhenth_days"])
+                weekdays = None
+                if row["weekdays"]:
+                    weekdays = ast.literal_eval(row["weekdays"])
+
+                time_plan = TimePlan(
+                    time_range  = time_range,
+                    start_date  = start_date,
+                    stop_date   = stop_date,
+                    weekdays    = weekdays,
+                    month_days  = month_days
+                )
+
+                alarm_plan = AlarmPlan(
+                    plan_uuid   = plan_uuid,
+                    name        = plan_name,
+                    dev_id      = dev_id,
+                    enable      = enable,
+                    time_plan   = time_plan,
+                    rect        = rect,
+                    event_type  = event_type,
+                    threshold_time  = threshold_time,
+                    merge_time  = merge_time
+                )
+                if alarm_plan.event_attr_ is None:
+                    LOGERR(f"drop plan {plan_uuid}, invalid event_type: {event_type}")
+                    continue
+
+
+                # 更新设备信息
+                g_las.g_alarm_plan_mgr.push(plan_uuid, alarm_plan)
+
+            LOGDBG(f"cb_handle_query_all_alarm_plan_info succeed")
+        else:
+            LOGDBG("cb_handle_query_all_alarm_plan_info, invalid result")
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+

+ 13 - 0
core/alarm_type.py

@@ -0,0 +1,13 @@
+from enum import Enum
+
+# 告警类型
+class MSG_TYPE(Enum):
+    MSG_DEV_STATUS          = 0     # 设备状态变更
+    MSG_DEV_RAW_POINTS      = 1     # 实时点云
+    MSG_REALTIME_TARGET     = 2     # 实时目标位置
+    MSG_EVENT_FALL          = 3     # 跌倒事件
+    MSG_EVENT_EXIST         = 4     # 存在事件
+    MSG_ALARM_EVENT         = 5     # 告警事件
+
+
+

+ 25 - 0
core/event_type.py

@@ -0,0 +1,25 @@
+from enum import Enum
+
+# 事件类型
+class EventType(Enum):
+    STAY_DETECTION                  = 1 # 停留事件
+    RETENTION_DETECTION             = 2 # 滞留事件
+    TOILETING_DETECTION             = 3 # 如厕事件
+    TOILETING_FREQUENCY             = 4 # 如厕频次统计
+    NIGHT_TOILETING_FREQUENCY       = 5 # 夜间如厕频次统计
+    TOILETING_FREQUENCY_ABNORMAL    = 6 # 如厕频次异常
+    NIGHT_TOILETING_FREQUENCY_ABNORMAL  = 7 # 起夜异常
+    BATHROOM_STAY_FREQUENCY         = 8 # 卫生间频次统计
+    TARGET_ABSENCE                  = 9 # 异常消失
+
+event_desc_map = {
+    EventType.STAY_DETECTION.value              : "stay_detection",
+    EventType.RETENTION_DETECTION.value         : "retention_detection",
+    EventType.TOILETING_DETECTION.value         : "toileting_detection",
+    EventType.TOILETING_FREQUENCY.value         : "toileting_frequency",
+    EventType.NIGHT_TOILETING_FREQUENCY.value   : "night_toileting_frequency",
+    EventType.TOILETING_FREQUENCY_ABNORMAL.value: "toileting_frequency_abnormal",
+    EventType.NIGHT_TOILETING_FREQUENCY_ABNORMAL.value  : "night_toileting_frequency_abnormal",
+    EventType.BATHROOM_STAY_FREQUENCY.value     : "bathroom_stay_frequency",
+    EventType.TARGET_ABSENCE.value              : "target_absence"
+}

+ 7 - 0
core/g_LAS.py

@@ -0,0 +1,7 @@
+# from core.alarm_plan import  AlarmPlan
+# from core.alarm_plan_manager import  AlarmPlanManager
+# from core.alarm_plan_manager import  EventDispatcher
+
+# 全局变量
+g_alarm_plan_mgr    = None
+g_event_dispatcher  = None

+ 5 - 0
core/linkage_action.py

@@ -0,0 +1,5 @@
+
+# 联动动作
+class LinkageAction:
+    def __init__(self):
+        return

+ 37 - 0
core/linkage_alarm_service.py

@@ -0,0 +1,37 @@
+from typing import List, Tuple, Optional
+from datetime import datetime, time, date
+from threading import Thread, Lock
+from enum import Enum
+
+from core.event_type import EventType
+from core.alarm_event import AlarmEvent
+from core.alarm_plan_manager import AlarmPlanManager
+
+
+class LinkageAlarmService:
+    def __init__(self, plan_mgr: AlarmPlanManager):
+        self.plan_mgr_ = plan_mgr
+        self.last_trigger_times_ = {}   # plan_uuid -> datetime
+
+    def process_event(self, event: AlarmEvent):
+        matched_plans = self.plan_mgr_.match_plans(event)
+        now = event.timestamp_
+
+        for plan in matched_plans:
+            plan_uuid  = plan.plan_uuid_
+            last_time = self.last_trigger_times_.get(plan_uuid)
+
+            # 归并事件判断
+            if last_time and (now - last_time).total_seconds() < plan.merge_time_:
+                print(f"归并")
+                continue
+            print(f"触发事件 {event.event_id_}, 命中计划[{plan.name_}]")
+            self.last_trigger_times_[plan_uuid] = now
+
+
+def simulate_event_input(service: LinkageAlarmService):
+    event = AlarmEvent(
+        event_id = "ev001",
+        dev_id = "00FFAABBCC11",
+        event_type = EventType.TOILETING_DETECTION
+    )

+ 63 - 0
core/time_plan.py

@@ -0,0 +1,63 @@
+from typing import List, Tuple, Optional
+from datetime import datetime, time, date
+from threading import Thread, Lock
+from enum import Enum
+
+from common.sys_comm import (
+    LOGDBG, LOGINFO, LOGWARN, LOGERR,
+    get_utc_time_ms
+)
+
+
+# 时间计划
+class TimePlan:
+    def __init__(self,
+                 time_range: list,
+                 start_date: str,
+                 stop_date: str,
+                 weekdays: Optional[List[int]] = None,
+                 month_days: Optional[List[int]] = None
+                 ):
+
+        self.start_date_    = start_date    # 开始日期
+        self.stop_date_     = stop_date     # 结束日期
+        self.time_range_    = time_range    # 生效时间
+        # [{"start_time": "00:00","end_time": "12:00"},{"start_time": "18:00","end_time": "23:59"}, ...]
+        self.month_days_    = month_days    # 每月的生效日期
+        self.weekdays_      = weekdays      # 每周的生效日期
+
+    def is_active_now(self, now: Optional[datetime] = None) -> bool:
+        now = now or datetime.now()
+
+        # 起止日期匹配
+        try:
+            if not (self.start_date_ <= now.strftime("%Y-%m-%d") <= self.stop_date_):
+                return False
+        except ValueError:
+            return False
+
+        # 周日期匹配
+        if self.weekdays_ is not None and now.weekday() not in self.weekdays_:
+            return False
+
+        # 月日期匹配
+        if self.month_days_ is not None and now.day not in self.month_days_:
+            return False
+
+
+        # 时间匹配
+        now_t = now.time()
+        for period in self.time_range_:
+            start_t = datetime.strptime(period["start_time"], "%H:%M").time()
+            end_t = datetime.strptime(period["end_time"], "%H:%M").time()
+            if start_t <= now_t <= end_t:
+                return True
+
+        return False
+
+    def example(self) -> bool:
+        if self.is_active_now():
+            return True
+        else:
+            return False
+

+ 190 - 0
db/db_process.py

@@ -0,0 +1,190 @@
+'''
+执行数据库操作的线程
+'''
+
+import threading
+import queue
+import time
+from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
+import pymysql
+from DBUtils.PooledDB import PooledDB
+from concurrent.futures import ThreadPoolExecutor
+
+import json
+import shutil
+
+import common.sys_comm
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from common.sys_comm import get_bj_time_ms
+from common.sys_comm import g_sys_conf, g_sys_conf_mtx
+
+
+# ssh配置
+ssh_conf = {
+    "ssh_host": "119.45.12.173",
+    "ssh_port": 22,
+    "ssh_user": "root",
+    "ssh_pwd": "Hfln@667788",
+
+}
+
+# 数据库配置
+db_config = {
+    # 数据库相关参数
+    "host": "localhost",
+    "user": "root",
+    "password": "Hfln@1024",
+    "database": "lnxx_dev"
+}
+
+
+# 请求队列
+db_req_que = queue.Queue()
+# 记录 SSH 隧道和数据库连接
+ssh_server = None
+# 连接池对象
+db_pool = None
+
+# 数据库请求类
+class DBRequest:
+    def __init__(self, sql:str, params=None, callback=None, userdata=None):
+        self.sql = sql
+        self.params = params if params else ()
+        self.callback = callback
+        self.userdata = userdata
+
+
+# ========== 初始化配置 ==========
+def db_pro_init():
+    global ssh_conf, db_config
+    with g_sys_conf_mtx:
+        ssh_conf = {
+            "ssh_host": g_sys_conf["ssh_host"],
+            "ssh_port": g_sys_conf["ssh_port"],
+            "ssh_user": g_sys_conf["ssh_username"],
+            "ssh_pwd": g_sys_conf["ssh_password"],
+        }
+
+        db_config = {
+            "host": g_sys_conf["db_host"],
+            "user": g_sys_conf["db_username"],
+            "password": g_sys_conf["db_password"],
+            "database": "lnxx_dev"
+        }
+
+
+# ========== 初始化 SSH ==========
+def initialize_ssh_connection():
+    global ssh_server
+    if ssh_server is None or not ssh_server.is_active:
+        ssh_server = SSHTunnelForwarder(
+            (ssh_conf["ssh_host"], ssh_conf["ssh_port"]),
+            ssh_username=ssh_conf["ssh_user"],
+            ssh_password=ssh_conf["ssh_pwd"],
+            remote_bind_address=('127.0.0.1', 3306)
+        )
+        ssh_server.start()
+        LOGINFO("SSH connected")
+
+
+# ========== 初始化连接池 ==========
+def initialize_connection_pool():
+    global db_pool, ssh_server
+    if g_sys_conf["platform"] == 0:
+        initialize_ssh_connection()
+        port = ssh_server.local_bind_port
+        host = "127.0.0.1"
+    else:
+        port = 3306
+        host = db_config["host"]
+
+    db_pool = PooledDB(
+        creator=pymysql,
+        maxconnections=10,
+        mincached=2,
+        maxcached=5,
+        blocking=True,
+        host=host,
+        port=port,
+        user=db_config['user'],
+        password=db_config['password'],
+        database=db_config['database'],
+        charset='utf8mb4',
+        cursorclass=pymysql.cursors.DictCursor
+    )
+    LOGINFO("DB connection pool initialized")
+
+
+# ========== 执行数据库请求 ==========
+def handle_db_request(db_request: DBRequest):
+    conn = None
+    try:
+        conn = db_pool.connection()
+        with conn.cursor() as cursor:
+            cursor.execute(db_request.sql, db_request.params)
+            sql_lower = db_request.sql.strip().lower()
+            if sql_lower.startswith("select"):
+                result = cursor.fetchall()
+            elif sql_lower.startswith("insert"):
+                result = {"lastrowid": cursor.lastrowid}
+            else:
+                result = {"rowcount": cursor.rowcount}
+            if db_request.callback:
+                if db_request.userdata:
+                    db_request.callback(result, db_request.userdata)
+                else:
+                    db_request.callback(result)
+        conn.commit()
+    except Exception as e:
+        LOGERR(f"[DB ERROR] SQL执行失败: {e}")
+    finally:
+        if conn:
+            conn.close()
+        db_req_que.task_done()
+
+
+# ========== 主数据库线程 ==========
+def db_process():
+    db_pro_init()
+    initialize_connection_pool()
+
+    executor = ThreadPoolExecutor(max_workers=8)  # 限制线程并发数
+    while True:
+        try:
+            db_request: DBRequest = db_req_que.get()
+            if db_request is None:
+                break
+            executor.submit(handle_db_request, db_request)
+        except Exception as e:
+            LOGERR(f"[DB Thread Error] {e}")
+        time.sleep(0.01)
+
+
+def create_db_process():
+    # 启动数据库线程
+    return threading.Thread(target=db_process, daemon=True)
+
+
+# 处理数据库返回的结果
+def handle_device_data(results):
+    LOGDBG("Received results: {results}")
+
+
+# 示例请求生成器
+def request_generator():
+    while True:
+        sql_query = "SELECT * FROM dev_info"  # 示例查询
+        db_req_que.put(DBRequest(sql=sql_query, callback=handle_device_data))
+        time.sleep(1)  # 每秒生成一个请求
+
+
+def test_main():
+    # 启动数据库线程
+    db_thread = threading.Thread(target=db_process, daemon=True)
+    db_thread.start()
+
+    # 启动请求生成器
+    request_gen_thread = threading.Thread(target=request_generator)
+    request_gen_thread.start()
+
+

+ 9 - 0
db/db_scripts/create_alarm_time_plan.sql

@@ -0,0 +1,9 @@
+CREATE TABLE `lnxx_dev`.`alarm_time_plan`  (
+  `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '索引',
+  `start_date` char(255) NULL COMMENT '开始日期',
+  `stop_date` char(255) NULL COMMENT '结束日期',
+  `time_range` char(255) NULL COMMENT '生效时间',
+  `month_days` char(255) NULL COMMENT '每月的生效日期',
+  `weekdays` char(255) NULL COMMENT '每周的生效日期',
+  PRIMARY KEY (`id`)
+);

+ 17 - 0
db/db_scripts/create_alram_plan.sql

@@ -0,0 +1,17 @@
+CREATE TABLE `lnxx_dev`.`alarm_plan`  (
+  `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '索引',
+  `uuid` varchar(255) NULL COMMENT 'uuid',
+  `name` varchar(255) NULL COMMENT '计划名称',
+  `dev_id` varchar(255) NULL COMMENT '设备id',
+  `enable` int(0) NULL COMMENT '是否启用',
+  `region` varchar(255) NULL COMMENT '检测区域[left, top, width, height]',
+  `event_val` int(0) NULL COMMENT '事件类型值',
+  `alarm_time_plan_id` int(0) NULL COMMENT '时间计划表id',
+  `threshold_time` int(0) NULL COMMENT '触发阈值(秒)',
+  `merge_time` int(0) NULL COMMENT '归并时间(秒)',
+  `param` varchar(255) NULL COMMENT '参数',
+  `create_time` datetime(0) NULL COMMENT '创建时间',
+  `update_time` datetime(0) NULL COMMENT '更新时间',
+  `remark` varchar(255) NULL COMMENT 'remark',
+  PRIMARY KEY (`id`)
+);

+ 9 - 0
db/db_scripts/create_event_type.sql

@@ -0,0 +1,9 @@
+CREATE TABLE `lnxx_dev`.`event_type`  (
+  `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '索引',
+  `event_val` int(0) NULL COMMENT '事件值',
+  `event_str` varchar(255) NULL COMMENT '事件字符串',
+  `event_desc` varchar(255) NULL COMMENT '事件描述',
+  `remark` varchar(255) NULL COMMENT '备注',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY uniq_event_val (`event_val`)
+);

+ 14 - 0
db/db_scripts/create_events.sql

@@ -0,0 +1,14 @@
+CREATE TABLE `lnxx_dev`.`events`  (
+  `id` int(0) NOT NULL AUTO_INCREMENT COMMENT '索引',
+  `dev_id` varchar(255) NULL COMMENT '设备id',
+  `uuid` varchar(255) NULL COMMENT 'uuid',
+  `plan_uuid` varchar(255) NULL COMMENT 'alarm_plan的uuid',
+  `event_type` varchar(255) NULL COMMENT '事件类型',
+  `info` varchar(255) NULL COMMENT '事件信息(json)',
+  `is_handle` int(255) NULL COMMENT '是否处理:0-未处理,1-已处理',
+  `create_time` datetime(0) NULL COMMENT '创建时间',
+  `update_time` datetime(0) NULL COMMENT '更新时间',
+  `is_deleted` int(0) NULL COMMENT '删除标记:0-未删除,1-已删除',
+  `remark` varchar(255) NULL COMMENT '备注',
+  PRIMARY KEY (`id`)
+);

+ 18 - 0
db/db_scripts/insert_into_event_type.sql

@@ -0,0 +1,18 @@
+INSERT INTO event_type (
+    event_val,
+    event_str,
+    event_desc,
+    remark
+) VALUES
+    (1, "stay_detection", "停留事件", NULL),
+    (2, "retention_detection", "滞留事件", NULL),
+    (3, "toileting_detection", "如厕事件", NULL),
+    (4, "toileting_frequency", "如厕频次统计", NULL),
+    (5, "night_toileting_frequency", "夜间如厕频次统计", NULL),
+    (6, "toileting_frequency_abnormal", "如厕频次异常", NULL),
+    (7, "night_toileting_frequency_abnormal", "起夜异常", NULL),
+    (8, "bathroom_stay_frequency", "卫生间频次统计", NULL),
+    (9, "target_absence", "异常消失", NULL)
+ON DUPLICATE KEY UPDATE
+    event_str = VALUES(event_str),
+    event_desc = VALUES(event_desc);

+ 112 - 0
db/db_sqls.py

@@ -0,0 +1,112 @@
+
+tmp_sql = """
+INSERT INTO dev_info (
+    dev_id, online, software, hardware, ssid, password, ip, mount_plain, 
+    start_x, start_y, start_z, stop_x, stop_y, stop_z, height
+) VALUES (
+    %(dev_id)s, %(online)s, %(software)s, %(hardware)s, %(ssid)s, %(password)s, %(ip)s, %(mount_plain)s,
+    %(start_x)s, %(start_y)s, %(start_z)s, %(stop_x)s, %(stop_y)s, %(stop_z)s, %(height)s
+)
+"""
+
+# 查询所有 dev_info 的设备信息
+sql_query_all_dev_info = "SELECT * FROM dev_info"
+sql_query_one_dev_info = "SELECT * FROM dev_info WHERE dev_id = %s"
+
+
+# 查询所有设备的信息
+sql_query_all_dev_detail_info = """
+SELECT 
+    dev_info.*,
+    user_info.phone AS ui_phone,
+    dev_room.room_params AS dr_roomparams,
+    dev_room.furnitures AS dr_furnitures,
+    dev_room.create_time AS dr_create_time,
+    dev_room.delete_tag AS dr_delete_tag
+FROM 
+    dev_info
+LEFT JOIN 
+    user_info ON dev_info.user_openid = user_info.openid
+LEFT JOIN 
+    dev_room ON dev_info.dev_id = dev_room.dev_id
+"""
+
+# 查询单个设备的信息
+sql_query_one_dev_detail_info = """
+SELECT 
+    dev_info.*,
+    user_info.phone AS ui_phone,
+    dev_room.room_params AS dr_roomparams,
+    dev_room.furnitures AS dr_furnitures,
+    dev_room.create_time AS dr_create_time,
+    dev_room.delete_tag AS dr_delete_tag
+FROM 
+    dev_info
+LEFT JOIN 
+    user_info ON dev_info.user_openid = user_info.openid
+LEFT JOIN 
+    dev_room ON dev_info.dev_id = dev_room.dev_id
+WHERE 
+    dev_info.dev_id = %(dev_id)s
+"""
+
+# 查询所有告警计划
+sql_query_all_alarm_plan = """
+SELECT
+    ap.id               AS plan_id,
+    ap.uuid             AS plan_uuid,
+    ap.name             AS plan_name,
+    ap.dev_id           AS dev_id,
+    ap.enable           AS enable,
+    ap.region           AS region,
+    ap.threshold_time   AS threshold_time,
+    ap.merge_time       AS merge_time,
+    ap.create_time      AS create_time,
+    ap.update_time      AS update_time,
+
+    et.event_val        AS event_val,
+    et.event_str        AS event_str,
+    et.event_desc       AS event_desc,
+
+    atp.id              AS time_plan_id,
+    atp.start_date      AS start_date,
+    atp.stop_date       AS stop_date,
+    atp.time_range      AS time_range,
+    atp.month_days      AS month_days,
+    atp.weekdays        AS weekdays+
+
+
+FROM alarm_plan ap
+LEFT JOIN event_type et 
+       ON ap.event_val = et.event_val
+LEFT JOIN alarm_time_plan atp 
+       ON ap.alarm_time_plan_id = atp.id
+WHERE ap.enable = 1;
+"""
+
+# 查询单个告警计划
+# todo
+
+
+# 插入events
+sql_insert_events = """
+INSERT INTO events (
+    dev_id,
+    uuid,
+    plan_uuid,
+    event_type,
+    info,
+    is_handle,
+    is_deleted)
+VALUES (
+    %(dev_id)s,
+    %(uuid)s,
+    %(plan_uuid)s,
+    %(event_type)s,
+    %(info)s,
+    %(is_handle)s,
+    %(is_deleted)s
+);
+"""
+
+

+ 228 - 0
device/dev_mng.py

@@ -0,0 +1,228 @@
+import threading
+from threading import Lock
+import queue
+import json
+import traceback
+from collections import deque
+
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from common.sys_comm import get_utc_time_ms, utc_to_bj_s, get_bj_time_ms, get_bj_time_s
+from common.sys_comm import g_sys_conf, g_sys_conf_mtx
+
+g_dev_map_lock = threading.Lock()
+g_dev_map = {}  # <dev_id: str, device: Device>
+
+
+
+
+
+# 跟踪区域类
+class TrackingRegion:
+    def __init__(self, x1=0, y1=0, z1=0, x2=0, y2=0, z2=0):
+        # 线程锁
+        self.lock_ = threading.Lock()
+        self.start_x: int = x1
+        self.start_y: int = y1
+        self.start_z: int = z1
+        self.stop_x: int = x2
+        self.stop_y: int = y2
+        self.stop_z: int = z2
+
+
+    def __repr__(self):
+        return (f"TrackingRegion(start=({self.start_x}, {self.start_y}, {self.start_z}), "
+                f"stop=({self.stop_x}, {self.stop_y}, {self.stop_z}))")
+
+
+# 安装参数类
+class InstallParam:
+    def __init__(self, mount_plain="", height=0, tracking_region=None):
+        # 线程锁
+        self.lock_ = threading.Lock()
+        self.mount_plain: str = mount_plain
+        self.isCeiling: int = 1
+        self.height = height
+        self.tracking_region = tracking_region if tracking_region else TrackingRegion()
+        self.north_angle_: float = 0
+
+    def __repr__(self):
+        return (f"InstallParam(mount_plain='{self.mount_plain}', isCeiling={self.isCeiling}, "
+                f"height={self.height}, north_angle={self.north_angle_}, "
+                f"tracking_region={repr(self.tracking_region)})")
+
+
+# 网络参数类
+class Network:
+    def __init__(self, ssid="", password="", ip=""):
+        # 线程锁
+        self.lock_ = threading.Lock()
+        self.ssid = ssid
+        self.password = password
+        self.ip = ip
+
+
+    def set_ssid(self, ssid:str):
+        with self.lock_:
+            self.ssid = ssid
+
+    def set_password(self, password:str):
+        with self.lock_:
+            self.password = password
+
+    def set_ip(self, ip:str):
+        with self.lock_:
+            self.ip = ip
+
+    def __repr__(self):
+        return f"Network(ssid='{self.ssid}', password='{self.password}' , ip='{self.ip}')"
+
+
+class Device():
+    def __init__(self,
+                 dev_id: str = "",
+                 dev_name: str = "",
+                 online: int = 0,
+                 dev_type: str = "",
+                 network: Network = None,
+                 install_param: InstallParam = None):
+        # 线程锁
+        self.lock_ = Lock()
+        # 基本属性
+        self.dev_id_        = dev_id
+        self.dev_name_      = dev_name
+        self.online_        = online
+        self.dev_type_      = dev_type
+        self.network_       = network if network else Network()
+        self.install_param_ = install_param if install_param else InstallParam()
+
+        # 实时数据队列
+        self.rtd_len_: int = 100
+        self.rtd_que_: deque = deque(maxlen=self.rtd_len_)
+        """
+{
+    "timestamp": 1727323744093,
+    "pose": 2,
+    "target_point": [
+        [
+            0.15537149991307939,
+            -0.17245136840002878,
+            0.5702038151877267,
+            1
+        ],
+        [
+            0.15537149991307939,
+            -0.17245136840002878,
+            0.5702038151877267,
+            2
+        ]
+    ]
+}
+        """
+
+
+    # 插入新的rtd单元
+    def put_rtd_unit(self, rtd_unit: object):
+        """
+        插入新的点云数据,超过最大长度自动丢弃旧数据
+        """
+        with self.lock_:
+            self.rtd_que_.append(rtd_unit)
+
+    # 获取最新的rtd_que_
+    def get_last_rtd_que(self) -> list:
+        """
+        获取当前最新的点云数据(全部缓存)
+        """
+        with self.lock_:
+            return list(self.rtd_que_)
+
+    # 获取rtd单元
+    def get_rtd_unit(self, index: int):
+        with self.lock_:
+            if not self.rtd_que_:
+                return None
+            try:
+                return self.rtd_que_[index]
+            except IndexError:
+                return None
+
+    # 获取最新的rtd单元
+    def get_last_rtd_unit(self) -> object:
+        with self.lock_:
+            if self.rtd_que_:
+                return self.rtd_que_[-1]
+            return None
+
+    # 获取 rtd_que_ 的副本
+    def get_rtd_que_copy(self) -> list:
+        with self.lock_:
+            return list(self.rtd_que_)
+
+
+
+def update_dev_info(dev_id:str, dev_instance:Device):
+    with g_dev_map_lock:
+        if dev_id in g_dev_map:
+            g_dev_map[dev_id] = None
+            LOGDBG(f"update dev: {dev_id}")
+        else:
+            LOGDBG(f"new dev: {dev_id}")
+
+        # todo 更新设备保活时间(伪)
+        # dev_instance.set_keepalive(get_utc_time_ms())
+        g_dev_map[dev_id] = dev_instance
+
+
+# 回调函数,处理查询结果:查询所有的设备信息
+def cb_handle_query_all_dev_info(result):
+    try:
+        if result:
+            for row in result:
+                dev_id = row["client_id"]
+
+                dev_instance = Device(
+                    dev_id=row["client_id"],
+                    dev_name=row["dev_name"],
+                    online=row["online"],
+                    dev_type=row["dev_type"]
+                )
+
+                # 更新设备信息
+                update_dev_info(dev_id, dev_instance)
+
+            LOGDBG(f"cb_handle_query_all_dev_info succeed")
+        else:
+            LOGDBG("cb_handle_query_all_dev_info, invalid result")
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+
+def dev_map_push(dev_id: str, device: Device) -> None:
+    with g_dev_map_lock:
+        g_dev_map[dev_id] = device
+
+
+def dev_map_pop(dev_id: str) -> Device:
+    with g_dev_map_lock:
+        return g_dev_map.pop(dev_id, None)
+
+
+def dev_map_find(dev_id: str) -> Device:
+    with g_dev_map_lock:
+        return g_dev_map.get(dev_id, None)
+
+
+def dev_map_delete(dev_id: str) -> bool:
+    with g_dev_map_lock:
+        if dev_id in g_dev_map:
+            del g_dev_map[dev_id]
+            return True
+        return False

+ 178 - 0
mqtt/mqtt_process.py

@@ -0,0 +1,178 @@
+'''
+处理mqtt消息相关
+'''
+
+import paho.mqtt.client as mqtt
+import time
+import threading
+import re
+import queue
+import sys
+from queue import Queue, Empty
+from concurrent.futures import ThreadPoolExecutor
+import signal
+
+import common.sys_comm as sys_comm
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from mqtt.mqtt_topics import TOPICS, Topic_Pattern
+
+import mqtt.mqtt_recv as mqtt_recv
+import mqtt.mqtt_send as mqtt_send
+from mqtt.mqtt_send import mqtt_send_que # 发送队列
+# 格式如下
+'''
+{
+    "topic":    "topic/xxxx",
+    "msg":      "msg to be send"
+}
+'''
+
+# ================================
+# MQTT 配置
+# ================================
+MQTT_BROKER = "119.45.12.173"   # MQTT BROKER 地址
+MQTT_PORT = 1883                # MQTT 端口
+MQTT_USERNAME = "lnradar"       # MQTT 用户名
+MQTT_PASSWD = "lnradar"         # MQTT 密码
+
+# ================================
+# 全局对象
+# ================================
+executor = ThreadPoolExecutor(max_workers=8)
+mqtt_queue = Queue()   # 消息队列
+shutting_down = False
+
+import atexit
+atexit.register(lambda: setattr(sys.modules[__name__], 'shutting_down', True))
+
+# ================================
+# 辅助函数
+# ================================
+def check_topic(pattern:str, topic:str) -> bool:
+    return bool(re.match(pattern, topic))
+
+
+# ================================
+# 消费者线程
+# ================================
+class MQTTConsumerThread(threading.Thread):
+    def __init__(self):
+        super().__init__()
+        self.running = True
+
+    def run(self):
+        global shutting_down
+        while self.running:
+            try:
+                msg_tuple = mqtt_queue.get(timeout=0.1)  # (client, userdata, msg)
+            except Empty:
+                if shutting_down:
+                    break
+                continue
+
+            client, userdata, msg = msg_tuple
+            if shutting_down:
+                break
+            try:
+                executor.submit(mqtt_recv.process_message, client, userdata, msg)
+            except RuntimeError:
+                # 线程池已关闭,忽略
+                break
+
+
+
+# ================================
+# MQTT 回调
+# ================================
+def on_connect(client, userdata, flags, rc):
+    if rc == 0:
+        LOGINFO("MQTT Connected successfully!")
+        client.subscribe(TOPICS.dev_dsp_data)
+        client.subscribe(TOPICS.mps_all)
+        client.subscribe(TOPICS.das_all)
+    else:
+        LOGERR(f"MQTT failed to connect, return code {rc}")
+
+def on_message(client, userdata, msg):
+    if not shutting_down:
+        mqtt_queue.put((client, userdata, msg))  # 放入队列,由消费者线程处理
+
+# ================================
+# MQTT 线程类
+# ================================
+class MQTTClientThread(threading.Thread):
+    def __init__(self,):
+        threading.Thread.__init__(self)
+        self.client:mqtt.Client = mqtt.Client()
+        self.publish_status = {}
+        self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWD)
+        self.client.on_connect = on_connect
+        self.client.on_message = on_message
+        self.client.on_publish = self.on_publish
+
+        self.running = True
+
+    def on_publish(self, client, userdata, mid):
+        self.publish_status[mid] = "success"
+
+    def send_msg_to_mqtt(self):
+        while True:
+            time.sleep(0.01)
+            try:
+                mqtt_msg: dict = mqtt_send_que.get(timeout=0.1)
+            except Exception:
+                if shutting_down:
+                    break
+                continue
+            try:
+                topic = mqtt_msg["topic"]
+                msg = mqtt_msg["msg"]
+                qos = mqtt_msg.get("qos", 0)
+                info = self.client.publish(topic, msg, qos=qos)
+                if info.rc == 0:
+                    self.publish_status[info.mid] = "pending"
+                    if qos == 0:
+                        self.publish_status[info.mid] = "success"
+                else:
+                    LOGDBG(f"send failed: {info.rc}, topic: {topic}, qos: {qos}\nmsg:{msg}")
+
+            except Exception as e:
+                LOGERR(f"send error: {e}, topic: {topic}, qos: {qos}\nmsg:{msg}")
+
+    def run(self):
+        global shutting_down
+        try:
+            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
+            LOGINFO("Connecting to MQTT broker...")
+            self.client.loop_start()
+            self.send_msg_to_mqtt()
+
+        except Exception as e:
+            LOGERR(f"MQTT thread encountered an error: {e}")
+        finally:
+            self.running = False
+            self.client.loop_stop()
+            self.client.disconnect()
+            shutting_down = True
+
+
+mqtt_client: MQTTClientThread = None
+mqtt_consumer: MQTTConsumerThread = None
+
+# ================================
+# 退出信号处理
+# ================================
+def signal_handler(sig, frame):
+    global shutting_down
+    LOGINFO("Exiting... shutting down MQTT and thread pool")
+    shutting_down = True
+    mqtt_client.running = False
+    mqtt_client.client.loop_stop()
+    mqtt_client.client.disconnect()
+    mqtt_consumer.running = False
+    executor.shutdown(wait=True)
+    sys.exit(0)
+
+signal.signal(signal.SIGINT, signal_handler)
+signal.signal(signal.SIGTERM, signal_handler)
+

+ 224 - 0
mqtt/mqtt_recv.py

@@ -0,0 +1,224 @@
+'''
+处理mqtt接收的消息
+'''
+
+import paho.mqtt.client as mqtt
+import queue
+import json
+import traceback
+import re
+
+import common.sys_comm as sys_comm
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from common.sys_comm import get_tracker_targets, get_utc_time_ms, get_utc_time_s
+from common.sys_comm import POSE_E, DEV_EC
+from common.sys_comm import g_sys_conf, g_sys_conf_mtx
+
+import db.db_process as db_process
+from db.db_process import db_req_que
+from db.db_process import DBRequest
+
+from mqtt.mqtt_topics import Topic_Pattern
+import mqtt.mqtt_send as mqtt_send
+
+import device.dev_mng as dev_mng
+from device.dev_mng import g_dev_map, g_dev_map_lock, Device
+
+
+
+# 数据队列
+raw_points_que_forpost = queue.Queue()      # 实时姿态队列
+'''
+{
+    "dev_id": "xxxxxx",
+    "raw_points": [...]
+}
+'''
+
+# 检查topic
+def check_topic(topic:str, pattern:str) -> bool:
+    topic = topic.strip()
+    return bool(re.match(pattern, topic))
+
+
+"""
+{
+    "timestamp": 1727323744093,
+    "pose": 2,
+    "target_point": [
+        [
+            0.15537149991307939,
+            -0.17245136840002878,
+            0.5702038151877267,
+            1
+        ],
+        [
+            0.15537149991307939,
+            -0.17245136840002878,
+            0.5702038151877267,
+            2
+        ]
+    ]
+}
+"""
+# 处理来自设备的点云消息: /dev/{device_id}/dsp_data
+def deal_dsp_data(msg:mqtt.MQTTMessage):
+    try:
+        parts = msg.topic.split('/')
+        dev_id = parts[2]
+
+        # 未注册的设备,不处理
+        with g_dev_map_lock:
+            if dev_id not in g_dev_map:
+                return
+            device:Device = g_dev_map[dev_id]
+
+        payload = json.loads(msg.payload.decode('utf-8'))
+
+        # 处理 target
+        if ("tracker_targets" in payload):
+            tracker_targets = payload["tracker_targets"]
+
+            timestamp = get_utc_time_s()
+            pose = POSE_E.POSE_4.value
+            rtd_unit = {
+                "timestamp": timestamp,
+                "pose": pose,
+                "target_point": tracker_targets
+            }
+            device.put_rtd_unit(rtd_unit)
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+# 实时位置姿态
+def deal_realtime_pos(msg:mqtt.MQTTMessage):
+    try:
+        payload = json.loads(msg.payload.decode('utf-8'))
+        dev_id   = payload.get("dev_id")
+        # 未注册的设备,不处理
+        with g_dev_map_lock:
+            if dev_id not in g_dev_map:
+                return
+            device:Device = g_dev_map[dev_id]
+
+        timestamp   = payload.get("timestamp")
+        pose        = payload.get("pose")
+        target_point    = payload.get("target_point", [])
+
+        rtd_unit = {
+            "timestamp": timestamp,
+            "pose": pose,
+            "target_point": target_point
+        }
+        device.put_rtd_unit(rtd_unit)
+
+        return
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+# 设备消息分发处理:/dev/#
+def deal_dev_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage):
+    try:
+        topic = msg.topic
+        parts = msg.topic.split('/')
+        dev_id = parts[2]
+
+        # 设备信息(注册)
+        if (check_topic(topic,Topic_Pattern.dev_login)):
+            return
+            deal_dev_login(msg)
+
+        # 设备心跳保活
+        elif (check_topic(topic,Topic_Pattern.dev_keepalive)):
+            return
+            deal_dev_keepalive(msg)
+
+        # 设备配置信息
+        elif (check_topic(topic,Topic_Pattern.dev_rep_dev_param)):
+            return
+            deal_report_device_param(msg)
+
+        # 设备实时数据
+        elif (check_topic(topic,Topic_Pattern.dev_dsp_data)):
+            deal_dsp_data(msg)
+
+        # 点云数据
+        elif (check_topic(topic,Topic_Pattern.dev_cloudpoint)):
+            return
+            deal_cloudpoint(msg)
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+# 设备代理分发处理:/das/#
+def deal_das_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage):
+    try:
+        topic = msg.topic
+        parts = msg.topic.split('/')
+        dev_id = parts[2]
+        # if dev_id != "00FFAABBDDN":
+        #     return
+
+        # 实时位置姿态
+        if (check_topic(topic,Topic_Pattern.das_realtime_pos)):
+            deal_realtime_pos(msg)
+
+    except json.JSONDecodeError as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+    except Exception as e:
+        tb_info = traceback.extract_tb(e.__traceback__)
+        for frame in tb_info:
+            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+# 处理接收的消息入口
+def process_message(client:mqtt.Client, userdata, msg:mqtt.MQTTMessage):
+    topic = msg.topic
+    qos = msg.qos
+
+    if (check_topic(topic,Topic_Pattern.dev_all)):      # 设备
+        deal_dev_msg(client, userdata, msg)
+    elif (check_topic(topic,Topic_Pattern.mps_all)):     # 小程序服务
+        return
+        deal_mps_msg(client, userdata, msg)
+    elif (check_topic(topic,Topic_Pattern.opc_all)):     # 运维客户端
+        return
+        deal_opc_msg(client, userdata, msg)
+    elif (check_topic(topic,Topic_Pattern.das_all)):     # 设备接入
+        deal_das_msg(client, userdata, msg)
+    else:
+        # LOGDBG(f"recv invalid topic: {msg.topic}")
+        return
+
+
+
+
+
+
+
+

+ 76 - 0
mqtt/mqtt_send.py

@@ -0,0 +1,76 @@
+'''
+处理mqtt发送的消息
+'''
+
+import queue
+import json
+from enum import Enum
+import numpy
+
+mqtt_send_que = queue.Queue()   # 发送队列
+
+import common.sys_comm as sys_comm
+from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
+from common.sys_comm import MODEL_E, POSE_CLASS_E, POSE_E
+from common.sys_comm import g_sys_conf, g_sys_conf_mtx
+from mqtt.mqtt_topics import TOPICS
+
+# 消息类型
+class MSG_TYPE(Enum):
+    MSG_DEV_STATUS          = 0     # 设备状态变更
+    MSG_DEV_RAW_POINTS      = 1     # 实时点云
+    MSG_REALTIME_TARGET     = 2     # 实时目标位置
+    MSG_EVENT_FALL          = 3     # 跌倒事件
+    MSG_EVENT_EXIST         = 4     # 存在事件
+
+
+def get_target_point(point_cloud:list):
+    return numpy.mean(point_cloud, axis=0).tolist()
+
+
+# 创建消息相关  START
+
+# 准备将消息通过 MQTT 发送
+def send_msg(topic:str, format_json:dict, qos:int=0):
+    try:
+        parts = topic.split('/')
+        model = parts[1]
+
+        if model != "dev":
+            with g_sys_conf_mtx:
+                format_json["sp_id"] = g_sys_conf["sp_id"]
+        content:str = json.dumps(format_json)
+
+        mqtt_msg = {
+            "topic":    topic,
+            "msg":      content,
+            "qos":      qos
+        }
+        mqtt_send_que.put(mqtt_msg)
+
+    except Exception as e:
+        LOGERR(f"send_msg error: {e}")
+
+
+# 告警事件
+def alarm_event(
+        dev_id: str,
+        uuid: str,
+        plan_uuid: str,
+        event_type: str,
+        info: dict,
+        table: str):
+    format_json = dict()
+    format_json["dev_id"] = dev_id
+    format_json["uuid"] = uuid
+    format_json["plan_uuid"] =plan_uuid
+    format_json["event_type"] = event_type
+    format_json["info"] = info
+    format_json["table"] = table
+    send_msg(TOPICS.las_alarm_event, format_json)
+
+
+
+
+
+# 创建消息相关  END

+ 107 - 0
mqtt/mqtt_topics.py

@@ -0,0 +1,107 @@
+# TOPIC 定义
+class TOPICS(str):
+    # 设备 Device
+    dev_all             = "/dev/#"                          # 设备端消息
+    dev_login           = "/dev/+/login"                    # 设备注册(请求)
+    dev_keepalive       = "/dev/+/keepalive"                # 心跳保活(请求)
+    dev_rep_dev_info    = "/dev/+/report_device_info"       # 设备信息
+    dev_rep_dev_param   = "/dev/+/report_device_param"      # 设备参数
+    dev_dsp_data        = "/dev/+/dsp_data"                 # 设备实时数据
+    dev_cloudpoint      = "/dev/+/cloudpoint"               # 点云数据
+    dev_rep_fall_event  = "/dev/+/report_falling_event"     # 跌倒事件
+    dev_rep_pres_event  = "/dev/+/report_presence_event"    # 存在事件
+    dev_update_firmware = "/dev/+/update_firmware"          # OTA固件升级
+    dev_reboot          = "/dev/+/reboot"                   # 设备软重启
+
+    # 设备调试
+    dev_set_debug       = "/dev/+/set_debug_param"          # 设置debug配置
+    dev_get_debug       = "/dev/+/get_debug_param"          # 获取debug配置
+
+    # 设备接入 Device Access Service
+    das_all             = "/das/#"                          # 设备接入消息
+    das_login           = "/das/+/login"                    # 设备注册(响应)
+    das_keepalive       = "/das/+/keepalive"                # 心跳保活(响应)
+    das_status          = "/das/dev_status"                 # 设备状态变更
+    das_cloudpoint      = "/das/cloudpoint"                 # 点云消息
+    das_realtime_pos    = "/das/realtime_pos"               # 实时位置姿态
+    das_event           = "/das/event"                      # 跌倒事件
+    das_exist_event     = "/das/exist"                      # 存在事件
+    das_alarm_event     = "/das/alarm_event"                # 告警事件
+    das_set_dev_param   = "/das/set_device_param"           # 设置设备参数(请求)
+
+    das_debug_param     = "/das/+/debug_param"              # debug配置信息(响应)
+    das_report_alarm_param = "/das/report_alarm_param"      # 上报告警参数(响应)
+    das_set_alarm_param_ack = "/das/set_alarm_param_ack"    # 设置告警参数确认(响应)
+
+    # 小程序 Mini Program Service
+    mps_all             = "/mps/#"                          # 小程序服务消息
+    mps_get_dev_info    = "/mps/request_device_info"        # 请求设备信息
+    mps_get_dev_param   = "/mps/get_device_param"           # 请求设备参数
+    mps_set_dev_param   = "/mps/set_device_param"           # 设置设备参数
+    mps_set_dev_param   = "/mps/fall_event/ack"             # 跌倒确认
+
+    # 运维客户端
+    opc_all             = "/opc/#"                          # 运维客户端
+    opc_get_alarm_param = "/opc/get_alarm_param"            # 获取告警参数
+    opc_set_alarm_param = "/opc/set_alarm_param"            # 设置告警参数
+
+    # 告警联动服务
+    las_all             = "/las/#" 
+    las_alarm_event     = "/las/alarm_event"                # 上报告警事件
+
+
+# topic匹配规则
+class Topic_Pattern(str):
+    # 设备 Device
+    dev_all             = r"^/dev/.*$"                          # 设备端消息
+    dev_login           = r"^/dev/[^/]+/login$"                 # 设备注册(请求)
+    dev_keepalive       = r"^/dev/[^/]+/keepalive$"             # 心跳保活(请求)
+    dev_rep_dev_info    = r"^/dev/[^/]+/report_device_info$"    # 设备信息
+    dev_rep_dev_param   = r"^/dev/[^/]+/report_device_param$"   # 上报设备参数
+    dev_dsp_data        = r"^/dev/[^/]+/dsp_data$"              # 设备实时数据
+    dev_cloudpoint      = r"^/dev/[^/]+/cloudpoint$"            # 点云数据
+    dev_rep_fall_event  = r"^/dev/[^/]+/report_falling_event$"  # 跌倒事件
+    dev_rep_pres_event  = r"^/dev/[^/]+/report_presence_event$" # 存在事件
+    dev_update_firmware = r"^/dev/[^/]+/update_firmware$"       # OTA固件升级
+    dev_reboot          = r"^/dev/[^/]+/reboot$"                # 设备软重启
+
+    # 设备调试
+    dev_set_debug       = r"^/dev/[^/]+/set_debug_param$"       # 设置debug配置
+    dev_get_debug       = r"^/dev/[^/]+/get_debug_param$"       # 获取debug配置
+
+    # 设备接入 Device Access Service
+    das_all             = r"^/das/.*$"                          # 设备接入消息
+    das_login           = r"^/das/[^/]+/login$"                 # 设备注册(响应)
+    das_keepalive       = r"^/das/[^/]+/keepalive$"             # 心跳保活(响应)
+    das_status          = r"^/das/dev_status$"                  # 设备状态变更
+    das_cloudpoint      = r"^/das/cloudpoint$"                  # 点云消息
+    das_realtime_pos    = r"^/das/realtime_pos$"                # 实时位置姿态
+    das_event           = r"^/das/event$"                       # 跌倒事件
+    das_exist_event     = r"^/das/exist$"                       # 存在事件
+    das_alarm_event     = r"^/das/alarm_event$"                 # 告警事件
+    das_set_dev_param   = r"^/das/set_device_param$"           # 设置设备参数(请求)
+
+    das_debug_param     = r"^/das/+/debug_param$"               # debug配置信息(响应)
+    das_report_alarm_param = r"^/das/report_alarm_param$"       # 上报告警参数(响应)
+    das_set_alarm_param_ack = r"^/das/set_alarm_param_ack$"     # 设置告警参数确认(响应)
+
+    # 小程序 Mini Program Service
+    mps_all             = r"^/mps/.*$"                          # 小程序服务消息
+    mps_get_dev_info    = r"^/mps/get_device_info$"             # 获取设备信息
+    mps_get_dev_param   = r"^/mps/get_device_param$"            # 请求设备参数
+    mps_set_dev_param   = r"^/mps/set_device_param$"            # 设置设备参数
+    mps_dev_reboot      = r"^/mps/[^/]+/reboot$"                # 设备软重启
+    mps_add_device      = r"^/mps/add_device"                   # 添加设备
+    mps_del_device      = r"^/mps/del_device"                   # 删除设备
+    mps_add_group       = r"^/mps/add_group"                    # 添加群组
+    mps_del_group       = r"^/mps/del_group"                    # 删除群组
+    mps_fall_event_ack  = r"^/mps/fall_event/ack$"              # 跌倒确认
+
+    # 运维客户端
+    opc_all             = r"^/opc/.*$"                          # 运维客户端
+    opc_get_alarm_param = r"^/opc/get_alarm_param$"             # 获取告警参数
+    opc_set_alarm_param = r"^/opc/set_alarm_param$"             # 设置告警参数
+
+    # 告警联动服务
+    las_all             = r"^/las/.*$"
+    las_alarm_event = r"^/las/alarm_eventr$"                # 上报告警事件

+ 15 - 0
restart.py

@@ -0,0 +1,15 @@
+#!/usr/bin/env python3
+import subprocess
+import stop
+import start
+import time
+
+def restart():
+    print("Restarting LAS...")
+    stop.stop()
+    time.sleep(1)  # 等待 1 秒,确保已完全停止
+    start.start()
+
+if __name__ == "__main__":
+    restart()
+

+ 42 - 0
start.py

@@ -0,0 +1,42 @@
+#!/usr/bin/env python3
+import subprocess
+import os
+import sys
+
+PRO_PATH = "/platform/LAS/"
+LOGFILE = PRO_PATH + "LAS.log"
+PIDFILE = PRO_PATH + "LAS.pid"
+
+def is_running(pid):
+    try:
+        os.kill(pid, 0)
+        return True
+    except OSError:
+        return False
+
+def start():
+    if os.path.exists(PIDFILE):
+        with open(PIDFILE, "r") as f:
+            pid = int(f.read())
+            if is_running(pid):
+                print(f"LAS is already running with PID {pid}")
+                return
+            else:
+                print("Found stale PID file. Cleaning up.")
+                os.remove(PIDFILE)
+
+    with open(LOGFILE, "a") as log:
+        process = subprocess.Popen(
+            [sys.executable, "LAS.py"],
+            stdout=log,
+            stderr=log,
+            stdin=subprocess.DEVNULL,
+            close_fds=True,
+        )
+        with open(PIDFILE, "w") as f:
+            f.write(str(process.pid))
+        print(f"LAS started with PID {process.pid}")
+
+if __name__ == "__main__":
+    start()
+

+ 30 - 0
status.py

@@ -0,0 +1,30 @@
+#!/usr/bin/env python3
+import os
+import signal
+
+PRO_PATH = "/platform/LAS/"
+PIDFILE = PRO_PATH + "LAS.pid"
+
+def is_running(pid):
+    try:
+        os.kill(pid, 0)
+        return True
+    except OSError:
+        return False
+
+def status():
+    if not os.path.exists(PIDFILE):
+        print("LAS is NOT running (no PID file).")
+        return
+
+    with open(PIDFILE, "r") as f:
+        pid = int(f.read())
+
+    if is_running(pid):
+        print(f"LAS is running with PID {pid}")
+    else:
+        print(f"LAS is NOT running (PID file found but process is gone).")
+
+if __name__ == "__main__":
+    status()
+

+ 26 - 0
stop.py

@@ -0,0 +1,26 @@
+#!/usr/bin/env python3
+import os
+import signal
+
+PRO_PATH = "/platform/LAS/"
+PIDFILE = PRO_PATH + "LAS.pid"
+
+def stop():
+    if not os.path.exists(PIDFILE):
+        print("No PID file found. Is LAS running?")
+        return
+
+    with open(PIDFILE, "r") as f:
+        pid = int(f.read())
+
+    try:
+        os.kill(pid, signal.SIGTERM)
+        print(f"Stopped LAS (PID {pid})")
+    except ProcessLookupError:
+        print(f"No process found with PID {pid}")
+    finally:
+        os.remove(PIDFILE)
+
+if __name__ == "__main__":
+    stop()
+