#!/usr/bin/env python3.9 from threading import Thread import configparser import os import time import traceback import sys import json import common.sys_comm as sys_comm from common.sys_comm import ( LOGDBG, LOGINFO, LOGWARN, LOGERR, EC, get_utc_time_ms ) from mqtt import mqtt_process as mp from mqtt.mqtt_process import ( MQTTClientThread, RobustMQTTClient, MQTTConsumerThread, g_mqtt_client, g_mqtt_consumer ) import db.db_process as db_process from db.db_process import db_req_que from db.db_process import DBRequest_Async from db.db_process import (db_execute_sync, db_execute_async) import db.db_sqls as sqls import device.dev_mng as g_Dev from device.dev_mng import ( Device, g_dev_mgr, init_dev_mng ) import core.alarm_plan_manager as ap_mgr from core.alarm_plan_manager import ( AlarmPlanManager, init_alarm_plan_mgr, start_alarm_plan_mgr, cb_handle_query_all_alarm_plan_info ) from core.alarm_plan_dispatcher import ( init_alarm_plan_disp, start_alarm_plan_dispatcher ) import core.g_LAS as g_las # 系统初始化 def sys_init(): try: # 创建日志目录 if not os.path.exists("./log/"): os.makedirs("./log/") print("create log dir succeed !") LOGDBG(f" ================ system init ...") print(f" ================ system init ...") # 读取配置文件 config = configparser.ConfigParser() with open('./conf.ini', 'r', encoding='utf-8') as f: config.read_file(f) if not (config.has_option('conf', 'module_name') and config.has_option('conf', 'platform') and config.has_option('conf', 'db_host') and config.has_option('conf', 'log_lvl') and config.has_option('conf', 'max_log_files') and config.has_option('conf', 'max_log_files')): LOGDBG("sys_init failed, invalid conf.ini param") return 0 if not (config.has_option('conf', 'db_host')): LOGDBG("sys_init failed, invalid db param") return 0 if not (config.has_option('linux', 'host_ip') and config.has_option('windows', 'host_ip') and config.has_option('windows', 'server_ip') and config.has_option('windows', 'ssh_host') and config.has_option('windows', 'ssh_port')): LOGDBG("sys_init failed, invalid host_ip param") return 0 LOGDBG("read conf.ini succeed !") # 初始化日志配置 with sys_comm.g_log_conf_mtx: sys_comm.g_log_conf["module_name"] = str(config["conf"]["module_name"]) sys_comm.g_log_conf["log_lvl"] = int(config["conf"]["log_lvl"]) sys_comm.g_log_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024 sys_comm.g_log_conf["max_log_files"] = int(config["conf"]["max_log_files"]) LOGDBG("log init succeed") # 初始化系统配置 with sys_comm.g_sys_conf_mtx: sys_comm.g_sys_conf["module_name"] = str(config["conf"]["module_name"]) sys_comm.g_sys_conf["platform"] = int(config["conf"]["platform"]) sys_comm.g_sys_conf["db_host"] = str(config["conf"]["db_host"]) sys_comm.g_sys_conf["log_lvl"] = int(config["conf"]["log_lvl"]) sys_comm.g_sys_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024 sys_comm.g_sys_conf["max_log_files"] = int(config["conf"]["max_log_files"]) # windows 本地 if sys_comm.g_sys_conf["platform"] == 0: sys_comm.g_sys_conf["host_ip"] = str(config["windows"]["host_ip"]) sys_comm.g_sys_conf["server_ip"] = str(config["windows"]["server_ip"]) sys_comm.g_sys_conf["ssh_host"] = str(config["windows"]["ssh_host"]) sys_comm.g_sys_conf["ssh_port"] = int(config["windows"]["ssh_port"]) mp.MQTT_BROKER = sys_comm.g_sys_conf["server_ip"] # linux 服务器 elif sys_comm.g_sys_conf["platform"] == 1: sys_comm.g_sys_conf["host_ip"] = str(config["linux"]["host_ip"]) mp.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"] sys_comm.g_sys_conf["sp_id"] = int(get_utc_time_ms()) # 报警配置 sys_comm.g_sys_conf["alarm_conf"] = sys_comm.alarm_conf # 启动成功,打印系统信息 module_name = sys_comm.g_sys_conf["module_name"] platform = sys_comm.g_sys_conf["platform"] host_ip = sys_comm.g_sys_conf["host_ip"] max_log_files = sys_comm.g_sys_conf["max_log_files"] max_log_size = sys_comm.g_sys_conf["max_log_size"] log_lvl = sys_comm.g_sys_conf["log_lvl"] sp_id = sys_comm.g_sys_conf["sp_id"] print(f" ================ system init succeed !") print(f" ================ module : {module_name}") print(f" ================ platform : {platform}") print(f" ================ host_ip : {host_ip}") print(f" ================ max_log_files : {max_log_files}") print(f" ================ max_log_size : {max_log_size}") print(f" ================ log_lvl : {log_lvl}") print(f" ================ sp_id : {sp_id}") print(f" ================ version : v B1.0") LOGINFO(f" ================ system init succeed !") LOGINFO(f" ================ module : {module_name}") LOGINFO(f" ================ platform : {platform}") LOGINFO(f" ================ host_ip : {host_ip}") LOGINFO(f" ================ max_log_files : {max_log_files}") LOGINFO(f" ================ max_log_size : {max_log_size}") LOGINFO(f" ================ log_lvl : {log_lvl}") LOGINFO(f" ================ sp_id : {sp_id}") LOGINFO(f" ================ version : v B1.0") return 0 except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") return EC.EC_FAILED except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return EC.EC_FAILED # 轮循执行一些任务 def run(): # 轮循处理任务 while True: time.sleep(1) def query_events_expire_range(): event_save_range = 90 try: result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15) if result and len(result) == 1: row = result[0] param_value = row.get("param_value") if param_value is not None: # 只要不是 None,就用数据库的值 event_save_range = param_value else: LOGWARN("found invalid event_save_range, use default 90") return event_save_range except Exception as e: LOGERR(f"query event_save_range failed: {e}, use default 90") return EC.EC_FAILED # 创建事件过期计划 def create_events_expire_plan(): event_save_range = query_events_expire_range() if not event_save_range: return EC.EC_FAILED g_las.g_alarm_plan_mgr.create_clean_expire_events_task(event_save_range) return 0 # 主线程 def main_process(): if not g_las.g_alarm_plan_mgr: LOGERR(f"error: g_alarm_plan_mgr not init") return EC.EC_FAILED if not g_las.g_alarm_plan_disp: LOGERR(f"error: g_alarm_plan_disp not init") return EC.EC_FAILED # 查询所有设备信息 db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_dev_info, callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info)) # 查询所有告警计划 db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_alarm_plan, callback=cb_handle_query_all_alarm_plan_info)) # 创建任务:创建事件过期计划 iRet = create_events_expire_plan() if iRet: LOGERR(f"create_events_expire_plan failed, process termination") return iRet # 轮循任务 run() def main(): # 初始化 if (0 != sys_init()): sys.exit(EC.EC_FAILED) # 初始化 dev_mng init_dev_mng() g_Dev.g_dev_mgr.start() # 初始化LAS init_alarm_plan_mgr() init_alarm_plan_disp() start_alarm_plan_dispatcher() # 事件分发器 start_alarm_plan_mgr() # 告警计划管理器 # 数据库线程 db_thread = db_process.create_db_process() db_thread.start() # MQTT 消息线程 mp.g_mqtt_client = RobustMQTTClient() mp.g_mqtt_client.start() mp.g_mqtt_consumer = MQTTConsumerThread() mp.g_mqtt_consumer.start() LOGDBG(f" ================ LAS start success ...") # 主线程 if not main_process(): LOGERR(f"main_process error, process termination") sys.exit(EC.EC_FAILED) main()