123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- #!/usr/bin/env python3.9
- from threading import Thread
- import configparser
- import os
- import time
- import traceback
- import sys
- import json
- import common.sys_comm as sys_comm
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from common.sys_comm import get_utc_time_ms
- from common.sys_comm import g_sys_conf, g_sys_conf_mtx, g_log_conf, g_log_conf_mtx
- from mqtt import mqtt_process
- from mqtt.mqtt_process import MQTTClientThread, MQTTConsumerThread, mqtt_client, mqtt_consumer
- import db.db_process as db_process
- from db.db_process import db_req_que
- from db.db_process import DBRequest
- import db.db_sqls as sqls
- import device.dev_mng as g_Dev
- from device.dev_mng import (
- Device, g_dev_mgr, init_dev_mng
- )
- import core.alarm_plan_manager as ap_mgr
- from core.alarm_plan_manager import (
- AlarmPlanManager,
- init_alarm_plan_mgr,
- start_alarm_plan_mgr,
- cb_handle_query_all_alarm_plan_info
- )
- from core.alarm_plan_dispatcher import (
- init_alarm_plan_disp,
- start_alarm_plan_dispatcher
- )
- import core.g_LAS as g_las
- # 系统初始化
- def sys_init():
- try:
- # 创建日志目录
- if not os.path.exists("./log/"):
- os.makedirs("./log/")
- print("create log dir succeed !")
- LOGDBG(f" ================ system init ...")
- print(f" ================ system init ...")
- # 读取配置文件
- config = configparser.ConfigParser()
- with open('./conf.ini', 'r', encoding='utf-8') as f:
- config.read_file(f)
- if not (config.has_option('conf', 'module_name') and
- config.has_option('conf', 'platform') and
- config.has_option('conf', 'db_host') and
- config.has_option('conf', 'log_lvl') and
- config.has_option('conf', 'max_log_files') and
- config.has_option('conf', 'max_log_files')):
- LOGDBG("sys_init failed, invalid conf.ini param")
- return 0
- if not (config.has_option('conf', 'db_host')):
- LOGDBG("sys_init failed, invalid db param")
- return 0
- if not (config.has_option('linux', 'host_ip') and
- config.has_option('windows', 'host_ip') and
- config.has_option('windows', 'server_ip') and
- config.has_option('windows', 'ssh_host') and
- config.has_option('windows', 'ssh_port')):
- LOGDBG("sys_init failed, invalid host_ip param")
- return 0
- LOGDBG("read conf.ini succeed !")
- # 初始化日志配置
- with g_log_conf_mtx:
- g_log_conf["module_name"] = str(config["conf"]["module_name"])
- g_log_conf["log_lvl"] = int(config["conf"]["log_lvl"])
- g_log_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024
- g_log_conf["max_log_files"] = int(config["conf"]["max_log_files"])
- LOGDBG("log init succeed")
- # 初始化系统配置
- with g_sys_conf_mtx:
- g_sys_conf["module_name"] = str(config["conf"]["module_name"])
- g_sys_conf["platform"] = int(config["conf"]["platform"])
- g_sys_conf["db_host"] = str(config["conf"]["db_host"])
- g_sys_conf["log_lvl"] = int(config["conf"]["log_lvl"])
- g_sys_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024
- g_sys_conf["max_log_files"] = int(config["conf"]["max_log_files"])
- # windows 本地
- if g_sys_conf["platform"] == 0:
- g_sys_conf["host_ip"] = str(config["windows"]["host_ip"])
- g_sys_conf["server_ip"] = str(config["windows"]["server_ip"])
- g_sys_conf["ssh_host"] = str(config["windows"]["ssh_host"])
- g_sys_conf["ssh_port"] = int(config["windows"]["ssh_port"])
- mqtt_process.MQTT_BROKER = g_sys_conf["server_ip"]
- # linux 服务器
- elif g_sys_conf["platform"] == 1:
- g_sys_conf["host_ip"] = str(config["linux"]["host_ip"])
- mqtt_process.MQTT_BROKER = g_sys_conf["host_ip"]
- g_sys_conf["sp_id"] = int(get_utc_time_ms())
- # 报警配置
- g_sys_conf["alarm_conf"] = sys_comm.alarm_conf
- # 启动成功,打印系统信息
- module_name = g_sys_conf["module_name"]
- platform = g_sys_conf["platform"]
- host_ip = g_sys_conf["host_ip"]
- max_log_files = g_sys_conf["max_log_files"]
- max_log_size = g_sys_conf["max_log_size"]
- log_lvl = g_sys_conf["log_lvl"]
- sp_id = g_sys_conf["sp_id"]
- print(f" ================ system init succeed !")
- print(f" ================ module : {module_name}")
- print(f" ================ platform : {platform}")
- print(f" ================ host_ip : {host_ip}")
- print(f" ================ max_log_files : {max_log_files}")
- print(f" ================ max_log_size : {max_log_size}")
- print(f" ================ log_lvl : {log_lvl}")
- print(f" ================ sp_id : {sp_id}")
- print(f" ================ version : v B1.0")
- LOGINFO(f" ================ system init succeed !")
- LOGINFO(f" ================ module : {module_name}")
- LOGINFO(f" ================ platform : {platform}")
- LOGINFO(f" ================ host_ip : {host_ip}")
- LOGINFO(f" ================ max_log_files : {max_log_files}")
- LOGINFO(f" ================ max_log_size : {max_log_size}")
- LOGINFO(f" ================ log_lvl : {log_lvl}")
- LOGINFO(f" ================ sp_id : {sp_id}")
- LOGINFO(f" ================ version : v B1.0")
- return 0
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- return -1
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return -1
- # 轮循函数,定期执行一些任务
- def run():
- # 轮循处理任务
- while True:
- time.sleep(1)
- # 主线程
- def main_process():
- if not g_las.g_alarm_plan_mgr:
- return -1
- # 查询所有设备信息
- db_req_que.put(DBRequest(sql=sqls.sql_query_all_dev_info,
- callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
- # 查询所有告警计划
- db_req_que.put(DBRequest(sql=sqls.sql_query_all_alarm_plan,
- callback=cb_handle_query_all_alarm_plan_info))
- # 轮循函数
- run()
- def main():
- # 初始化
- if (0 != sys_init()):
- sys.exit(-1)
- # 初始化 dev_mng
- init_dev_mng()
- g_Dev.g_dev_mgr.start()
- # 初始化LAS
- init_alarm_plan_mgr()
- init_alarm_plan_disp()
- start_alarm_plan_dispatcher() # 事件分发器
- start_alarm_plan_mgr() # 告警计划管理器
- # 数据库线程
- db_process.create_db_process().start()
- # MQTT 消息线程
- mqtt_client = MQTTClientThread()
- mqtt_client.start()
- mqtt_consumer = MQTTConsumerThread()
- mqtt_consumer.start()
- # 主线程
- main_process()
- main()
|