Przeglądaj źródła

dev_mgr封装为类,创建全局实例

nifangxu 1 miesiąc temu
rodzic
commit
6fc2ab022a
4 zmienionych plików z 110 dodań i 107 usunięć
  1. 6 2
      LAS.py
  2. 13 15
      core/alarm_plan.py
  3. 79 73
      device/dev_mng.py
  4. 12 17
      mqtt/mqtt_recv.py

+ 6 - 2
LAS.py

@@ -21,8 +21,9 @@ from db.db_process import db_req_que
 from db.db_process import DBRequest
 import db.db_sqls as sqls
 
+import device.dev_mng as g_Dev
 from device.dev_mng import (
-    cb_handle_query_all_dev_info
+    Device, g_dev_mgr, init_dev_mng
 )
 
 
@@ -176,7 +177,7 @@ def main_process():
 
     # 查询所有设备信息
     db_req_que.put(DBRequest(sql=sqls.sql_query_all_dev_info,
-                             callback=cb_handle_query_all_dev_info))
+                             callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
 
     # 查询所有告警计划
     db_req_que.put(DBRequest(sql=sqls.sql_query_all_alarm_plan,
@@ -192,6 +193,9 @@ def main():
     if (0 != sys_init()):
         sys.exit(-1)
 
+    # 初始化 dev_mng
+    init_dev_mng()
+
     # 初始化LAS
     init_alarm_plan_mgr()
     init_alarm_plan_disp()

+ 13 - 15
core/alarm_plan.py

@@ -18,17 +18,15 @@ from core.event_type import EventType, event_desc_map
 import core.g_LAS as g_las
 import core.alarm_plan_helper as helper
 
-from device.dev_mng import (
-    Device,
-    dev_map_push, dev_map_pop, dev_map_find, dev_map_delete
-)
-
 from db.db_process import (
     db_req_que, DBRequest
 )
 import db.db_sqls as sqls
 import mqtt.mqtt_send as mqtt_send
-from device.dev_mng import g_dev_map, g_dev_map_lock
+import device.dev_mng as g_Dev
+from device.dev_mng import (
+    Device, g_dev_mgr
+)
 
 
 class EventAttr_Base:
@@ -304,7 +302,7 @@ class AlarmPlan:
     def handle_stay_detection(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
             now = get_utc_time_s()
@@ -377,7 +375,7 @@ class AlarmPlan:
     def handle_retention_detection(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
             now = get_utc_time_s()
@@ -450,7 +448,7 @@ class AlarmPlan:
     def handle_toileting_detection(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
             now = get_utc_time_s()
@@ -524,7 +522,7 @@ class AlarmPlan:
     def handle_toileting_frequency(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
 
@@ -560,7 +558,7 @@ class AlarmPlan:
     def handle_night_toileting_frequency(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
 
@@ -596,7 +594,7 @@ class AlarmPlan:
     def handle_toileting_frequency_abnormal(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
 
@@ -632,7 +630,7 @@ class AlarmPlan:
     def handle_night_toileting_frequency_abnormal(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
 
@@ -667,7 +665,7 @@ class AlarmPlan:
     def handle_bathroom_stay_frequency(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
 
@@ -703,7 +701,7 @@ class AlarmPlan:
     def handle_target_absence(self):
         try:
             dev_id = self.dev_id_
-            device:Device = dev_map_find(dev_id)
+            device:Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
             if not device:
                 return
             now = get_utc_time_s()

+ 79 - 73
device/dev_mng.py

@@ -4,17 +4,15 @@ import queue
 import json
 import traceback
 from collections import deque
+from typing import Optional, List
 
 from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
-from common.sys_comm import get_utc_time_ms, utc_to_bj_s, get_bj_time_ms, get_bj_time_s
+from common.sys_comm import (
+    get_utc_time_ms, get_utc_time_s,
+    get_bj_time_ms, get_bj_time_s,
+    utc_to_bj_s)
 from common.sys_comm import g_sys_conf, g_sys_conf_mtx
 
-g_dev_map_lock = threading.Lock()
-g_dev_map = {}  # <dev_id: str, device: Device>
-
-
-
-
 
 # 跟踪区域类
 class TrackingRegion:
@@ -95,6 +93,8 @@ class Device():
         self.network_       = network if network else Network()
         self.install_param_ = install_param if install_param else InstallParam()
 
+        self.keepalive_: int    = get_utc_time_s()
+
         # 实时数据队列
         self.rtd_len_: int = 100
         self.rtd_que_: deque = deque(maxlen=self.rtd_len_)
@@ -119,7 +119,6 @@ class Device():
 }
         """
 
-
     # 插入新的rtd单元
     def put_rtd_unit(self, rtd_unit: object):
         """
@@ -159,70 +158,77 @@ class Device():
             return list(self.rtd_que_)
 
 
+    def update_keepalive(self):
+        with self.lock_:
+            self.keepalive_ = get_utc_time_s()
 
-def update_dev_info(dev_id:str, dev_instance:Device):
-    with g_dev_map_lock:
-        if dev_id in g_dev_map:
-            g_dev_map[dev_id] = None
-        #     LOGDBG(f"update dev: {dev_id}")
-        # else:
-        #     LOGDBG(f"new dev: {dev_id}")
-
-        # todo 更新设备保活时间(伪)
-        # dev_instance.set_keepalive(get_utc_time_ms())
-        g_dev_map[dev_id] = dev_instance
-
-
-# 回调函数,处理查询结果:查询所有的设备信息
-def cb_handle_query_all_dev_info(result):
-    try:
-        if result:
-            for row in result:
-                dev_id = row["client_id"]
-
-                dev_instance = Device(
-                    dev_id=row["client_id"],
-                    dev_name=row["dev_name"],
-                    online=row["online"],
-                    dev_type=row["dev_type"]
-                )
-
-                # 更新设备信息
-                update_dev_info(dev_id, dev_instance)
-
-            LOGDBG(f"cb_handle_query_all_dev_info succeed")
-        else:
-            LOGDBG("cb_handle_query_all_dev_info, invalid result")
-
-    except json.JSONDecodeError as e:
-        tb_info = traceback.extract_tb(e.__traceback__)
-        for frame in tb_info:
-            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
-    except Exception as e:
-        tb_info = traceback.extract_tb(e.__traceback__)
-        for frame in tb_info:
-            LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
-
-
-
-def dev_map_push(dev_id: str, device: Device) -> None:
-    with g_dev_map_lock:
-        g_dev_map[dev_id] = device
-
-
-def dev_map_pop(dev_id: str) -> Device:
-    with g_dev_map_lock:
-        return g_dev_map.pop(dev_id, None)
-
-
-def dev_map_find(dev_id: str) -> Device:
-    with g_dev_map_lock:
-        return g_dev_map.get(dev_id, None)
-
+    def update_keepalive(self, now:int):
+        with self.lock_:
+            self.keepalive_ = now
 
-def dev_map_delete(dev_id: str) -> bool:
-    with g_dev_map_lock:
-        if dev_id in g_dev_map:
-            del g_dev_map[dev_id]
-            return True
-        return False
+    def get_keepalive(self):
+        with self.lock_:
+            return self.keepalive_
+
+
+class DeviceManager():
+    def __init__(self):
+        self.g_dev_map_lock = threading.Lock()
+        self.g_dev_map = {}  # <dev_id: str, device: Device>
+
+    def push_dev_map(self, dev_id:str, dev_instance:Device):
+        with self.g_dev_map_lock:
+            self.g_dev_map[dev_id] = dev_instance
+
+    def pop_dev_map(self, dev_id: str):
+        with self.g_dev_map_lock:
+            return self.g_dev_map.pop(dev_id, None)
+
+    def find_dev_map(self, dev_id: str):
+        with self.g_dev_map_lock:
+            return self.g_dev_map.get(dev_id, None)
+
+    def delete_dev_map(self, dev_id: str):
+        with self.g_dev_map_lock:
+            if dev_id in self.g_dev_map:
+                del self.g_dev_map[dev_id]
+                return True
+            return False
+
+    def list_all_dev(self,) -> List["Device"]:
+        with self.g_dev_map_lock:
+            return list(self.g_dev_map.values())
+
+    # 回调函数,处理查询结果:查询所有的设备信息
+    def cb_handle_query_all_dev_info(self, result, userdata):
+        try:
+            if result:
+                for row in result:
+                    dev_id = row["client_id"]
+                    dev_instance = Device(
+                        dev_id=row["client_id"],
+                        dev_name=row["dev_name"],
+                        online=row["online"],
+                        dev_type=row["dev_type"]
+                    )
+                    # 更新设备信息
+                    self.push_dev_map(dev_id, dev_instance)
+    
+                LOGDBG(f"cb_handle_query_all_dev_info succeed")
+            else:
+                LOGDBG("cb_handle_query_all_dev_info, invalid result")
+    
+        except json.JSONDecodeError as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
+        except Exception as e:
+            tb_info = traceback.extract_tb(e.__traceback__)
+            for frame in tb_info:
+                LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
+
+
+g_dev_mgr: DeviceManager = None
+def init_dev_mng():
+    global g_dev_mgr
+    g_dev_mgr = DeviceManager()

+ 12 - 17
mqtt/mqtt_recv.py

@@ -21,8 +21,10 @@ from db.db_process import DBRequest
 from mqtt.mqtt_topics import Topic_Pattern
 import mqtt.mqtt_send as mqtt_send
 
-import device.dev_mng as dev_mng
-from device.dev_mng import g_dev_map, g_dev_map_lock, Device
+import device.dev_mng as g_Dev
+from device.dev_mng import (
+    Device,
+)
 
 import core.g_LAS as g_las
 
@@ -68,11 +70,10 @@ def deal_dsp_data(msg:mqtt.MQTTMessage):
         parts = msg.topic.split('/')
         dev_id = parts[2]
 
-        # 未注册的设备,不处理
-        with g_dev_map_lock:
-            if dev_id not in g_dev_map:
-                return
-            device:Device = g_dev_map[dev_id]
+        device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
+        if not g_Dev.g_dev_mgr.find_dev_map(dev_id):
+            device = Device(dev_id)
+            g_Dev.g_dev_mgr.push_dev_map(dev_id, device)
 
         payload = json.loads(msg.payload.decode('utf-8'))
 
@@ -104,11 +105,10 @@ def deal_realtime_pos(msg:mqtt.MQTTMessage):
     try:
         payload = json.loads(msg.payload.decode('utf-8'))
         dev_id   = payload.get("dev_id")
-        # 未注册的设备,不处理
-        with g_dev_map_lock:
-            if dev_id not in g_dev_map:
-                return
-            device:Device = g_dev_map[dev_id]
+
+        device: Device = g_Dev.g_dev_mgr.find_dev_map(dev_id)
+        if not g_Dev.g_dev_mgr.find_dev_map(dev_id):
+            return
 
         timestamp   = payload.get("timestamp")
         pose        = payload.get("pose")
@@ -173,11 +173,6 @@ def deal_dev_msg(client:mqtt.Client, userdaata, msg:mqtt.MQTTMessage):
             return
             deal_dev_keepalive(msg)
 
-        # 设备配置信息
-        elif (check_topic(topic,Topic_Pattern.dev_rep_dev_param)):
-            return
-            deal_report_device_param(msg)
-
         # 设备实时数据
         elif (check_topic(topic,Topic_Pattern.dev_dsp_data)):
             deal_dsp_data(msg)