123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- #!/usr/bin/env python3.9
- from threading import Thread
- import configparser
- import os
- import time
- import traceback
- import sys
- import json
- import common.sys_comm as sys_comm
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
- get_utc_time_ms
- )
- from mqtt import mqtt_process
- from mqtt.mqtt_process import MQTTClientThread, MQTTConsumerThread, mqtt_client, mqtt_consumer
- import db.db_process as db_process
- from db.db_process import db_req_que
- from db.db_process import DBRequest_Async
- from db.db_process import (db_execute_sync, db_execute_async)
- import db.db_sqls as sqls
- import device.dev_mng as g_Dev
- from device.dev_mng import (
- Device, g_dev_mgr, init_dev_mng
- )
- import core.alarm_plan_manager as ap_mgr
- from core.alarm_plan_manager import (
- AlarmPlanManager,
- init_alarm_plan_mgr,
- start_alarm_plan_mgr,
- cb_handle_query_all_alarm_plan_info
- )
- from core.alarm_plan_dispatcher import (
- init_alarm_plan_disp,
- start_alarm_plan_dispatcher
- )
- import core.g_LAS as g_las
- # 系统初始化
- def sys_init():
- try:
- # 创建日志目录
- if not os.path.exists("./log/"):
- os.makedirs("./log/")
- print("create log dir succeed !")
- LOGDBG(f" ================ system init ...")
- print(f" ================ system init ...")
- # 读取配置文件
- config = configparser.ConfigParser()
- with open('./conf.ini', 'r', encoding='utf-8') as f:
- config.read_file(f)
- if not (config.has_option('conf', 'module_name') and
- config.has_option('conf', 'platform') and
- config.has_option('conf', 'db_host') and
- config.has_option('conf', 'log_lvl') and
- config.has_option('conf', 'max_log_files') and
- config.has_option('conf', 'max_log_files')):
- LOGDBG("sys_init failed, invalid conf.ini param")
- return 0
- if not (config.has_option('conf', 'db_host')):
- LOGDBG("sys_init failed, invalid db param")
- return 0
- if not (config.has_option('linux', 'host_ip') and
- config.has_option('windows', 'host_ip') and
- config.has_option('windows', 'server_ip') and
- config.has_option('windows', 'ssh_host') and
- config.has_option('windows', 'ssh_port')):
- LOGDBG("sys_init failed, invalid host_ip param")
- return 0
- LOGDBG("read conf.ini succeed !")
- # 初始化日志配置
- with sys_comm.g_log_conf_mtx:
- sys_comm.g_log_conf["module_name"] = str(config["conf"]["module_name"])
- sys_comm.g_log_conf["log_lvl"] = int(config["conf"]["log_lvl"])
- sys_comm.g_log_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024
- sys_comm.g_log_conf["max_log_files"] = int(config["conf"]["max_log_files"])
- LOGDBG("log init succeed")
- # 初始化系统配置
- with sys_comm.g_sys_conf_mtx:
- sys_comm.g_sys_conf["module_name"] = str(config["conf"]["module_name"])
- sys_comm.g_sys_conf["platform"] = int(config["conf"]["platform"])
- sys_comm.g_sys_conf["db_host"] = str(config["conf"]["db_host"])
- sys_comm.g_sys_conf["log_lvl"] = int(config["conf"]["log_lvl"])
- sys_comm.g_sys_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024
- sys_comm.g_sys_conf["max_log_files"] = int(config["conf"]["max_log_files"])
- # windows 本地
- if sys_comm.g_sys_conf["platform"] == 0:
- sys_comm.g_sys_conf["host_ip"] = str(config["windows"]["host_ip"])
- sys_comm.g_sys_conf["server_ip"] = str(config["windows"]["server_ip"])
- sys_comm.g_sys_conf["ssh_host"] = str(config["windows"]["ssh_host"])
- sys_comm.g_sys_conf["ssh_port"] = int(config["windows"]["ssh_port"])
- mqtt_process.MQTT_BROKER = sys_comm.g_sys_conf["server_ip"]
- # linux 服务器
- elif sys_comm.g_sys_conf["platform"] == 1:
- sys_comm.g_sys_conf["host_ip"] = str(config["linux"]["host_ip"])
- mqtt_process.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"]
- sys_comm.g_sys_conf["sp_id"] = int(get_utc_time_ms())
- # 报警配置
- sys_comm.g_sys_conf["alarm_conf"] = sys_comm.alarm_conf
- # 启动成功,打印系统信息
- module_name = sys_comm.g_sys_conf["module_name"]
- platform = sys_comm.g_sys_conf["platform"]
- host_ip = sys_comm.g_sys_conf["host_ip"]
- max_log_files = sys_comm.g_sys_conf["max_log_files"]
- max_log_size = sys_comm.g_sys_conf["max_log_size"]
- log_lvl = sys_comm.g_sys_conf["log_lvl"]
- sp_id = sys_comm.g_sys_conf["sp_id"]
- print(f" ================ system init succeed !")
- print(f" ================ module : {module_name}")
- print(f" ================ platform : {platform}")
- print(f" ================ host_ip : {host_ip}")
- print(f" ================ max_log_files : {max_log_files}")
- print(f" ================ max_log_size : {max_log_size}")
- print(f" ================ log_lvl : {log_lvl}")
- print(f" ================ sp_id : {sp_id}")
- print(f" ================ version : v B1.0")
- LOGINFO(f" ================ system init succeed !")
- LOGINFO(f" ================ module : {module_name}")
- LOGINFO(f" ================ platform : {platform}")
- LOGINFO(f" ================ host_ip : {host_ip}")
- LOGINFO(f" ================ max_log_files : {max_log_files}")
- LOGINFO(f" ================ max_log_size : {max_log_size}")
- LOGINFO(f" ================ log_lvl : {log_lvl}")
- LOGINFO(f" ================ sp_id : {sp_id}")
- LOGINFO(f" ================ version : v B1.0")
- return 0
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- return EC.EC_FAILED
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return EC.EC_FAILED
- # 轮循执行一些任务
- def run():
- # 轮循处理任务
- while True:
- time.sleep(1)
- def query_events_expire_range():
- event_save_range = 90
- try:
- result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15)
- if result and len(result) == 1:
- row = result[0]
- param_value = row.get("param_value")
- if param_value is not None: # 只要不是 None,就用数据库的值
- event_save_range = param_value
- else:
- LOGWARN("found invalid event_save_range, use default 90")
- return event_save_range
- except Exception as e:
- LOGERR(f"query event_save_range failed: {e}, use default 90")
- return EC.EC_FAILED
- # 创建事件过期计划
- def create_events_expire_plan():
- event_save_range = query_events_expire_range()
- if not event_save_range:
- return EC.EC_FAILED
- g_las.g_alarm_plan_mgr.create_clean_expire_events_task(event_save_range)
- return 0
- # 主线程
- def main_process():
- if not g_las.g_alarm_plan_mgr:
- LOGERR(f"error: g_alarm_plan_mgr not init")
- return EC.EC_FAILED
- if not g_las.g_alarm_plan_disp:
- LOGERR(f"error: g_alarm_plan_disp not init")
- return EC.EC_FAILED
- # 查询所有设备信息
- db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_dev_info,
- callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
- # 查询所有告警计划
- db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_alarm_plan,
- callback=cb_handle_query_all_alarm_plan_info))
- # 创建任务:创建事件过期计划
- iRet = create_events_expire_plan()
- if iRet:
- LOGERR(f"create_events_expire_plan failed, process termination")
- return iRet
- # 轮循任务
- run()
- def main():
- # 初始化
- if (0 != sys_init()):
- sys.exit(EC.EC_FAILED)
- # 初始化 dev_mng
- init_dev_mng()
- g_Dev.g_dev_mgr.start()
- # 初始化LAS
- init_alarm_plan_mgr()
- init_alarm_plan_disp()
- start_alarm_plan_dispatcher() # 事件分发器
- start_alarm_plan_mgr() # 告警计划管理器
- # 数据库线程
- db_thread = db_process.create_db_process()
- db_thread.start()
- # MQTT 消息线程
- mqtt_client = MQTTClientThread()
- mqtt_client.start()
- mqtt_consumer = MQTTConsumerThread()
- mqtt_consumer.start()
- # 主线程
- if not main_process():
- LOGERR(f"main_process error, process termination")
- sys.exit(EC.EC_FAILED)
- main()
|