#!/usr/bin/env python3 from threading import Thread import configparser import os import time import traceback import sys import json import common.sys_comm as sys_comm from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from common.sys_comm import get_utc_time_ms from common.sys_comm import g_sys_conf, g_sys_conf_mtx, g_log_conf, g_log_conf_mtx from mqtt import mqtt_process from mqtt.mqtt_process import MQTTClientThread, MQTTConsumerThread, mqtt_client, mqtt_consumer import db.db_process as db_process from db.db_process import db_req_que from db.db_process import DBRequest import db.db_sqls as sqls from device.dev_mng import ( cb_handle_query_all_dev_info ) import core.alarm_plan_manager as ap_mgr from core.alarm_plan_manager import( AlarmPlanManager, init_alarm_plan_mgr, start_event_dispatcher, start_alarm_plan_mgr, cb_handle_query_all_alarm_plan_info ) import core.g_LAS as g_las # 系统初始化 def sys_init(): try: # 创建日志目录 if not os.path.exists("./log/"): os.makedirs("./log/") print("create log dir succeed !") LOGDBG(f" ================ system init ...") print(f" ================ system init ...") # 读取配置文件 config = configparser.ConfigParser() with open('./conf.ini', 'r', encoding='utf-8') as f: config.read_file(f) if not (config.has_option('conf', 'module_name') and config.has_option('conf', 'platform') and config.has_option('conf', 'db_host') and config.has_option('conf', 'log_lvl') and config.has_option('conf', 'max_log_files') and config.has_option('conf', 'max_log_files')): LOGDBG("sys_init failed, invalid conf.ini param") return 0 if not (config.has_option('conf', 'db_host')): LOGDBG("sys_init failed, invalid db param") return 0 if not (config.has_option('linux', 'host_ip') and config.has_option('windows', 'host_ip') and config.has_option('windows', 'server_ip') and config.has_option('windows', 'ssh_host') and config.has_option('windows', 'ssh_port')): LOGDBG("sys_init failed, invalid host_ip param") return 0 LOGDBG("read conf.ini succeed !") # 初始化日志配置 with g_log_conf_mtx: g_log_conf["module_name"] = str(config["conf"]["module_name"]) g_log_conf["log_lvl"] = int(config["conf"]["log_lvl"]) g_log_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024 g_log_conf["max_log_files"] = int(config["conf"]["max_log_files"]) LOGDBG("log init succeed") # 初始化系统配置 with g_sys_conf_mtx: g_sys_conf["module_name"] = str(config["conf"]["module_name"]) g_sys_conf["platform"] = int(config["conf"]["platform"]) g_sys_conf["db_host"] = str(config["conf"]["db_host"]) g_sys_conf["log_lvl"] = int(config["conf"]["log_lvl"]) g_sys_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024 g_sys_conf["max_log_files"] = int(config["conf"]["max_log_files"]) # windows 本地 if g_sys_conf["platform"] == 0: g_sys_conf["host_ip"] = str(config["windows"]["host_ip"]) g_sys_conf["server_ip"] = str(config["windows"]["server_ip"]) g_sys_conf["ssh_host"] = str(config["windows"]["ssh_host"]) g_sys_conf["ssh_port"] = int(config["windows"]["ssh_port"]) mqtt_process.MQTT_BROKER = g_sys_conf["server_ip"] # linux 服务器 elif g_sys_conf["platform"] == 1: g_sys_conf["host_ip"] = str(config["linux"]["host_ip"]) mqtt_process.MQTT_BROKER = g_sys_conf["host_ip"] g_sys_conf["sp_id"] = int(get_utc_time_ms()) # 报警配置 g_sys_conf["alarm_conf"] = sys_comm.alarm_conf # 启动成功,打印系统信息 module_name = g_sys_conf["module_name"] platform = g_sys_conf["platform"] host_ip = g_sys_conf["host_ip"] max_log_files = g_sys_conf["max_log_files"] max_log_size = g_sys_conf["max_log_size"] log_lvl = g_sys_conf["log_lvl"] sp_id = g_sys_conf["sp_id"] LOGINFO(f" ================ system init succeed !") LOGINFO(f" ================ module : {module_name}") LOGINFO(f" ================ platform : {platform}") LOGINFO(f" ================ host_ip : {host_ip}") LOGINFO(f" ================ max_log_files : {max_log_files}") LOGINFO(f" ================ max_log_size : {max_log_size}") LOGINFO(f" ================ log_lvl : {log_lvl}") LOGINFO(f" ================ sp_id : {sp_id}") LOGINFO(f" ================ version : v B1.0") print(f" ================ system init succeed !") print(f" ================ module : {module_name}") print(f" ================ platform : {platform}") print(f" ================ host_ip : {host_ip}") print(f" ================ max_log_files : {max_log_files}") print(f" ================ max_log_size : {max_log_size}") print(f" ================ log_lvl : {log_lvl}") print(f" ================ sp_id : {sp_id}") print(f" ================ version : v B1.0") return 0 except json.JSONDecodeError as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}") return -1 except Exception as e: tb_info = traceback.extract_tb(e.__traceback__) for frame in tb_info: LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}") return -1 # 轮循函数,定期执行一些任务 def run(): # 轮循处理任务 while True: time.sleep(1) # 主线程 def main_process(): if not g_las.g_alarm_plan_mgr: return -1 # 查询所有设备信息 db_req_que.put(DBRequest(sql=sqls.sql_query_all_dev_info, callback=cb_handle_query_all_dev_info)) # 查询所有告警计划 db_req_que.put(DBRequest(sql=sqls.sql_query_all_alarm_plan, callback=cb_handle_query_all_alarm_plan_info)) # 轮循函数 run() def main(): # 初始化 if (0 != sys_init()): sys.exit(-1) # 初始化LAS init_alarm_plan_mgr() # 数据库处理线程 db_process.create_db_process().start() # MQTT 消息线程 mqtt_client = MQTTClientThread() mqtt_client.start() mqtt_consumer = MQTTConsumerThread() mqtt_consumer.start() # 事件分发器 start_event_dispatcher() # 告警计划管理器 start_alarm_plan_mgr() # 主线程 main_process() main()