123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- #!/usr/bin/env python3.9
- from threading import Thread
- import configparser
- import os
- import time
- import traceback
- import sys
- import json
- import g_config
- import common.sys_comm as sys_comm
- from common.sys_comm import (
- LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
- get_utc_time_ms
- )
- import mqtt.mqtt_process as mp
- from mqtt.mqtt_process import (
- MQTTClientThread, RobustMQTTClient, MQTTConsumerThread, g_mqtt_client, g_mqtt_consumer
- )
- import db.db_process as db_process
- from db.db_process import db_req_que
- from db.db_process import DBRequest_Async
- from db.db_process import (
- db_execute_sync, db_execute_async
- )
- import db.db_sqls as sqls
- import device.dev_mng as g_Dev
- from device.dev_mng import (
- Device, g_dev_mgr, init_dev_mng
- )
- import core.alarm_plan_manager as ap_mgr
- from core.alarm_plan_manager import (
- AlarmPlanManager,
- init_alarm_plan_mgr,
- start_alarm_plan_mgr,
- cb_handle_query_all_alarm_plan_info
- )
- from core.alarm_plan_dispatcher import (
- init_alarm_plan_disp,
- start_alarm_plan_dispatcher
- )
- import core.g_LAS as g_las
- # 系统初始化
- def sys_init():
- try:
- # 创建日志目录
- if not os.path.exists("./log/"):
- os.makedirs("./log/")
- print("create log dir succeed !")
- # 读取配置文件
- config = configparser.ConfigParser()
- with open('./conf.ini', 'r', encoding='utf-8') as f:
- config.read_file(f)
- if not (config.has_option('conf', 'module_name') and
- config.has_option('conf', 'server_ip')):
- LOGDBG("sys_init failed, invalid conf.ini param")
- return 0
- LOGDBG("read conf.ini succeed !")
- server_ip = config["conf"]["server_ip"].strip()
- module_name = config["conf"]["module_name"].strip()
- sp_id = module_name + "_" + str(get_utc_time_ms())
- if g_config.init_g_sys_conf(server_ip, module_name, sp_id):
- print(f" ================ system init failed, invalid server_ip: {server_ip}")
- LOGERR(f" ================ system init failed, invalid server_ip: {server_ip}")
- return -1
- # 启动成功,打印系统信息
- module_name = g_config.g_sys_conf["module_name"]
- platform = g_config.g_sys_conf["platform"]
- host_ip = g_config.g_sys_conf["host_ip"]
- service = g_config.g_sys_conf["service"]["ip"]
- max_log_files = sys_comm.g_log_conf["max_log_files"]
- max_log_size = sys_comm.g_log_conf["max_log_size"]
- log_lvl = sys_comm.g_log_conf["log_lvl"]
- sp_id = g_config.g_sys_conf["sp_id"]
- print(f" ================ system init succeed !")
- print(f" ================ module : {module_name}")
- print(f" ================ platform : {platform}")
- print(f" ================ host_ip : {host_ip}")
- print(f" ================ service : {service}")
- print(f" ================ max_log_files : {max_log_files}")
- print(f" ================ max_log_size : {max_log_size}")
- print(f" ================ log_lvl : {log_lvl}")
- print(f" ================ sp_id : {sp_id}")
- print(f" ================ version : v B1.0")
- LOGINFO(f" ================ system init succeed !")
- LOGINFO(f" ================ module : {module_name}")
- LOGINFO(f" ================ platform : {platform}")
- LOGINFO(f" ================ host_ip : {host_ip}")
- print(f" ================ service : {service}")
- LOGINFO(f" ================ max_log_files : {max_log_files}")
- LOGINFO(f" ================ max_log_size : {max_log_size}")
- LOGINFO(f" ================ log_lvl : {log_lvl}")
- LOGINFO(f" ================ sp_id : {sp_id}")
- LOGINFO(f" ================ version : v B1.0")
- return 0
- except json.JSONDecodeError as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
- return EC.EC_FAILED
- except Exception as e:
- tb_info = traceback.extract_tb(e.__traceback__)
- for frame in tb_info:
- LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
- return EC.EC_FAILED
- # 轮循执行一些任务
- def run():
- # 轮循处理任务
- while True:
- time.sleep(1)
- def query_events_expire_range():
- event_save_range = 90
- try:
- result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15)
- if result and len(result) == 1:
- row = result[0]
- param_value = row.get("param_value")
- if param_value is not None: # 只要不是 None,就用数据库的值
- event_save_range = param_value
- else:
- LOGWARN("found invalid event_save_range, use default 90")
- return event_save_range
- except Exception as e:
- LOGERR(f"query event_save_range failed: {e}, use default 90")
- return EC.EC_FAILED
- # 创建事件过期计划
- def create_events_expire_plan():
- event_save_range = query_events_expire_range()
- if not event_save_range:
- return EC.EC_FAILED
- g_las.g_alarm_plan_mgr.create_clean_expire_events_task(event_save_range)
- return 0
- # 主线程
- def main_process():
- if not g_las.g_alarm_plan_mgr:
- LOGERR(f"error: g_alarm_plan_mgr not init")
- return EC.EC_FAILED
- if not g_las.g_alarm_plan_disp:
- LOGERR(f"error: g_alarm_plan_disp not init")
- return EC.EC_FAILED
- # 查询所有设备信息
- db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_dev_info,
- callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
- # 查询所有告警计划
- db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_alarm_plan,
- callback=cb_handle_query_all_alarm_plan_info))
- # 创建任务:创建事件过期计划
- iRet = create_events_expire_plan()
- if iRet:
- LOGERR(f"create_events_expire_plan failed, process termination")
- return iRet
- # 轮循任务
- run()
- def main():
- # 初始化
- if (0 != sys_init()):
- sys.exit(EC.EC_FAILED)
- # 初始化 dev_mng
- init_dev_mng()
- g_Dev.g_dev_mgr.start()
- # 初始化LAS
- init_alarm_plan_mgr()
- init_alarm_plan_disp()
- start_alarm_plan_dispatcher() # 事件分发器
- start_alarm_plan_mgr() # 告警计划管理器
- # 数据库线程
- db_thread = db_process.create_db_process()
- db_thread.start()
- # MQTT 消息线程
- mp.g_mqtt_client = RobustMQTTClient()
- mp.g_mqtt_client.start()
- mp.g_mqtt_consumer = MQTTConsumerThread()
- mp.g_mqtt_consumer.start()
- LOGDBG(f" ================ LAS start success ...")
- # 主线程
- if not main_process():
- LOGERR(f"main_process error, process termination")
- sys.exit(EC.EC_FAILED)
- main()
|