LAS.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. #!/usr/bin/env python3.9
  2. from threading import Thread
  3. import configparser
  4. import os
  5. import time
  6. import traceback
  7. import sys
  8. import json
  9. import common.sys_comm as sys_comm
  10. from common.sys_comm import (
  11. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  12. get_utc_time_ms
  13. )
  14. import mqtt.mqtt_process as mp
  15. from mqtt.mqtt_process import (
  16. MQTTClientThread, RobustMQTTClient, MQTTConsumerThread, g_mqtt_client, g_mqtt_consumer
  17. )
  18. import db.db_process as db_process
  19. from db.db_process import db_req_que
  20. from db.db_process import DBRequest_Async
  21. from db.db_process import (db_execute_sync, db_execute_async)
  22. import db.db_sqls as sqls
  23. import device.dev_mng as g_Dev
  24. from device.dev_mng import (
  25. Device, g_dev_mgr, init_dev_mng
  26. )
  27. import core.alarm_plan_manager as ap_mgr
  28. from core.alarm_plan_manager import (
  29. AlarmPlanManager,
  30. init_alarm_plan_mgr,
  31. start_alarm_plan_mgr,
  32. cb_handle_query_all_alarm_plan_info
  33. )
  34. from core.alarm_plan_dispatcher import (
  35. init_alarm_plan_disp,
  36. start_alarm_plan_dispatcher
  37. )
  38. import core.g_LAS as g_las
  39. # 系统初始化
  40. def sys_init():
  41. try:
  42. # 创建日志目录
  43. if not os.path.exists("./log/"):
  44. os.makedirs("./log/")
  45. print("create log dir succeed !")
  46. LOGDBG(f" ================ system init ...")
  47. print(f" ================ system init ...")
  48. # 读取配置文件
  49. config = configparser.ConfigParser()
  50. with open('./conf.ini', 'r', encoding='utf-8') as f:
  51. config.read_file(f)
  52. if not (config.has_option('conf', 'module_name') and
  53. config.has_option('conf', 'platform') and
  54. config.has_option('conf', 'db_host') and
  55. config.has_option('conf', 'log_lvl') and
  56. config.has_option('conf', 'max_log_files') and
  57. config.has_option('conf', 'max_log_files')):
  58. LOGDBG("sys_init failed, invalid conf.ini param")
  59. return 0
  60. if not (config.has_option('conf', 'db_host')):
  61. LOGDBG("sys_init failed, invalid db param")
  62. return 0
  63. if not (config.has_option('linux', 'host_ip') and
  64. config.has_option('windows', 'host_ip') and
  65. config.has_option('windows', 'server_ip') and
  66. config.has_option('windows', 'ssh_host') and
  67. config.has_option('windows', 'ssh_port')):
  68. LOGDBG("sys_init failed, invalid host_ip param")
  69. return 0
  70. LOGDBG("read conf.ini succeed !")
  71. # 初始化日志配置
  72. with sys_comm.g_log_conf_mtx:
  73. sys_comm.g_log_conf["module_name"] = str(config["conf"]["module_name"])
  74. sys_comm.g_log_conf["log_lvl"] = int(config["conf"]["log_lvl"])
  75. sys_comm.g_log_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024
  76. sys_comm.g_log_conf["max_log_files"] = int(config["conf"]["max_log_files"])
  77. LOGDBG("log init succeed")
  78. # 初始化系统配置
  79. with sys_comm.g_sys_conf_mtx:
  80. sys_comm.g_sys_conf["module_name"] = str(config["conf"]["module_name"])
  81. sys_comm.g_sys_conf["platform"] = int(config["conf"]["platform"])
  82. sys_comm.g_sys_conf["db_host"] = str(config["conf"]["db_host"])
  83. sys_comm.g_sys_conf["log_lvl"] = int(config["conf"]["log_lvl"])
  84. sys_comm.g_sys_conf["max_log_size"] = int(config["conf"]["max_log_size"]) * 1024 * 1024
  85. sys_comm.g_sys_conf["max_log_files"] = int(config["conf"]["max_log_files"])
  86. # windows 本地
  87. if sys_comm.g_sys_conf["platform"] == 0:
  88. sys_comm.g_sys_conf["host_ip"] = str(config["windows"]["host_ip"])
  89. sys_comm.g_sys_conf["server_ip"] = str(config["windows"]["server_ip"])
  90. sys_comm.g_sys_conf["ssh_host"] = str(config["windows"]["ssh_host"])
  91. sys_comm.g_sys_conf["ssh_port"] = int(config["windows"]["ssh_port"])
  92. mp.MQTT_BROKER = sys_comm.g_sys_conf["server_ip"]
  93. # linux 服务器
  94. elif sys_comm.g_sys_conf["platform"] == 1:
  95. sys_comm.g_sys_conf["host_ip"] = str(config["linux"]["host_ip"])
  96. mp.MQTT_BROKER = sys_comm.g_sys_conf["host_ip"]
  97. sys_comm.g_sys_conf["sp_id"] = int(get_utc_time_ms())
  98. # 报警配置
  99. sys_comm.g_sys_conf["alarm_conf"] = sys_comm.alarm_conf
  100. # 启动成功,打印系统信息
  101. module_name = sys_comm.g_sys_conf["module_name"]
  102. platform = sys_comm.g_sys_conf["platform"]
  103. host_ip = sys_comm.g_sys_conf["host_ip"]
  104. max_log_files = sys_comm.g_sys_conf["max_log_files"]
  105. max_log_size = sys_comm.g_sys_conf["max_log_size"]
  106. log_lvl = sys_comm.g_sys_conf["log_lvl"]
  107. sp_id = sys_comm.g_sys_conf["sp_id"]
  108. print(f" ================ system init succeed !")
  109. print(f" ================ module : {module_name}")
  110. print(f" ================ platform : {platform}")
  111. print(f" ================ host_ip : {host_ip}")
  112. print(f" ================ max_log_files : {max_log_files}")
  113. print(f" ================ max_log_size : {max_log_size}")
  114. print(f" ================ log_lvl : {log_lvl}")
  115. print(f" ================ sp_id : {sp_id}")
  116. print(f" ================ version : v B1.0")
  117. LOGINFO(f" ================ system init succeed !")
  118. LOGINFO(f" ================ module : {module_name}")
  119. LOGINFO(f" ================ platform : {platform}")
  120. LOGINFO(f" ================ host_ip : {host_ip}")
  121. LOGINFO(f" ================ max_log_files : {max_log_files}")
  122. LOGINFO(f" ================ max_log_size : {max_log_size}")
  123. LOGINFO(f" ================ log_lvl : {log_lvl}")
  124. LOGINFO(f" ================ sp_id : {sp_id}")
  125. LOGINFO(f" ================ version : v B1.0")
  126. return 0
  127. except json.JSONDecodeError as e:
  128. tb_info = traceback.extract_tb(e.__traceback__)
  129. for frame in tb_info:
  130. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  131. return EC.EC_FAILED
  132. except Exception as e:
  133. tb_info = traceback.extract_tb(e.__traceback__)
  134. for frame in tb_info:
  135. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  136. return EC.EC_FAILED
  137. # 轮循执行一些任务
  138. def run():
  139. # 轮循处理任务
  140. while True:
  141. time.sleep(1)
  142. def query_events_expire_range():
  143. event_save_range = 90
  144. try:
  145. result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15)
  146. if result and len(result) == 1:
  147. row = result[0]
  148. param_value = row.get("param_value")
  149. if param_value is not None: # 只要不是 None,就用数据库的值
  150. event_save_range = param_value
  151. else:
  152. LOGWARN("found invalid event_save_range, use default 90")
  153. return event_save_range
  154. except Exception as e:
  155. LOGERR(f"query event_save_range failed: {e}, use default 90")
  156. return EC.EC_FAILED
  157. # 创建事件过期计划
  158. def create_events_expire_plan():
  159. event_save_range = query_events_expire_range()
  160. if not event_save_range:
  161. return EC.EC_FAILED
  162. g_las.g_alarm_plan_mgr.create_clean_expire_events_task(event_save_range)
  163. return 0
  164. # 主线程
  165. def main_process():
  166. if not g_las.g_alarm_plan_mgr:
  167. LOGERR(f"error: g_alarm_plan_mgr not init")
  168. return EC.EC_FAILED
  169. if not g_las.g_alarm_plan_disp:
  170. LOGERR(f"error: g_alarm_plan_disp not init")
  171. return EC.EC_FAILED
  172. # 查询所有设备信息
  173. db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_dev_info,
  174. callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
  175. # 查询所有告警计划
  176. db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_alarm_plan,
  177. callback=cb_handle_query_all_alarm_plan_info))
  178. # 创建任务:创建事件过期计划
  179. iRet = create_events_expire_plan()
  180. if iRet:
  181. LOGERR(f"create_events_expire_plan failed, process termination")
  182. return iRet
  183. # 轮循任务
  184. run()
  185. def main():
  186. # 初始化
  187. if (0 != sys_init()):
  188. sys.exit(EC.EC_FAILED)
  189. # 初始化 dev_mng
  190. init_dev_mng()
  191. g_Dev.g_dev_mgr.start()
  192. # 初始化LAS
  193. init_alarm_plan_mgr()
  194. init_alarm_plan_disp()
  195. start_alarm_plan_dispatcher() # 事件分发器
  196. start_alarm_plan_mgr() # 告警计划管理器
  197. # 数据库线程
  198. db_thread = db_process.create_db_process()
  199. db_thread.start()
  200. # MQTT 消息线程
  201. mp.g_mqtt_client = RobustMQTTClient()
  202. mp.g_mqtt_client.start()
  203. mp.g_mqtt_consumer = MQTTConsumerThread()
  204. mp.g_mqtt_consumer.start()
  205. LOGDBG(f" ================ LAS start success ...")
  206. # 主线程
  207. if not main_process():
  208. LOGERR(f"main_process error, process termination")
  209. sys.exit(EC.EC_FAILED)
  210. main()