LAS.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. #!/usr/bin/env python3.9
  2. from threading import Thread
  3. import configparser
  4. import os
  5. import time
  6. import traceback
  7. import sys
  8. import json
  9. import g_config
  10. import common.sys_comm as sys_comm
  11. from common.sys_comm import (
  12. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC,
  13. get_utc_time_ms
  14. )
  15. import mqtt.mqtt_process as mp
  16. from mqtt.mqtt_process import (
  17. MQTTClientThread, RobustMQTTClient, MQTTConsumerThread, g_mqtt_client, g_mqtt_consumer
  18. )
  19. import db.db_process as db_process
  20. from db.db_process import db_req_que
  21. from db.db_process import DBRequest_Async
  22. from db.db_process import (
  23. db_execute_sync, db_execute_async
  24. )
  25. import db.db_sqls as sqls
  26. import device.dev_mng as g_Dev
  27. from device.dev_mng import (
  28. Device, g_dev_mgr, init_dev_mng
  29. )
  30. import core.alarm_plan_manager as ap_mgr
  31. from core.alarm_plan_manager import (
  32. AlarmPlanManager,
  33. init_alarm_plan_mgr,
  34. start_alarm_plan_mgr,
  35. cb_handle_query_all_alarm_plan_info
  36. )
  37. from core.alarm_plan_dispatcher import (
  38. init_alarm_plan_disp,
  39. start_alarm_plan_dispatcher
  40. )
  41. import core.g_LAS as g_las
  42. # 系统初始化
  43. def sys_init():
  44. try:
  45. # 创建日志目录
  46. if not os.path.exists("./log/"):
  47. os.makedirs("./log/")
  48. print("create log dir succeed !")
  49. # 读取配置文件
  50. config = configparser.ConfigParser()
  51. with open('./conf.ini', 'r', encoding='utf-8') as f:
  52. config.read_file(f)
  53. if not (config.has_option('conf', 'module_name') and
  54. config.has_option('conf', 'server_ip')):
  55. LOGDBG("sys_init failed, invalid conf.ini param")
  56. return 0
  57. LOGDBG("read conf.ini succeed !")
  58. server_ip = config["conf"]["server_ip"].strip()
  59. module_name = config["conf"]["module_name"].strip()
  60. sp_id = module_name + "_" + str(get_utc_time_ms())
  61. if g_config.init_g_sys_conf(server_ip, module_name, sp_id):
  62. print(f" ================ system init failed, invalid server_ip: {server_ip}")
  63. LOGERR(f" ================ system init failed, invalid server_ip: {server_ip}")
  64. return -1
  65. # 启动成功,打印系统信息
  66. module_name = g_config.g_sys_conf["module_name"]
  67. platform = g_config.g_sys_conf["platform"]
  68. host_ip = g_config.g_sys_conf["host_ip"]
  69. service = g_config.g_sys_conf["service"]["ip"]
  70. max_log_files = sys_comm.g_log_conf["max_log_files"]
  71. max_log_size = sys_comm.g_log_conf["max_log_size"]
  72. log_lvl = sys_comm.g_log_conf["log_lvl"]
  73. sp_id = g_config.g_sys_conf["sp_id"]
  74. print(f" ================ system init succeed !")
  75. print(f" ================ module : {module_name}")
  76. print(f" ================ platform : {platform}")
  77. print(f" ================ host_ip : {host_ip}")
  78. print(f" ================ service : {service}")
  79. print(f" ================ max_log_files : {max_log_files}")
  80. print(f" ================ max_log_size : {max_log_size}")
  81. print(f" ================ log_lvl : {log_lvl}")
  82. print(f" ================ sp_id : {sp_id}")
  83. print(f" ================ version : v B1.0")
  84. LOGINFO(f" ================ system init succeed !")
  85. LOGINFO(f" ================ module : {module_name}")
  86. LOGINFO(f" ================ platform : {platform}")
  87. LOGINFO(f" ================ host_ip : {host_ip}")
  88. print(f" ================ service : {service}")
  89. LOGINFO(f" ================ max_log_files : {max_log_files}")
  90. LOGINFO(f" ================ max_log_size : {max_log_size}")
  91. LOGINFO(f" ================ log_lvl : {log_lvl}")
  92. LOGINFO(f" ================ sp_id : {sp_id}")
  93. LOGINFO(f" ================ version : v B1.0")
  94. return 0
  95. except json.JSONDecodeError as e:
  96. tb_info = traceback.extract_tb(e.__traceback__)
  97. for frame in tb_info:
  98. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error:{e}, {e.doc}")
  99. return EC.EC_FAILED
  100. except Exception as e:
  101. tb_info = traceback.extract_tb(e.__traceback__)
  102. for frame in tb_info:
  103. LOGERR(f"[{frame.filename}:{frame.lineno}] @{frame.name}(), error: {e}")
  104. return EC.EC_FAILED
  105. # 轮循执行一些任务
  106. def run():
  107. # 轮循处理任务
  108. while True:
  109. time.sleep(1)
  110. def query_events_expire_range():
  111. event_save_range = 90
  112. try:
  113. result = db_execute_sync(sql=sqls.sql_event_save_range, timeout=15)
  114. if result and len(result) == 1:
  115. row = result[0]
  116. param_value = row.get("param_value")
  117. if param_value is not None: # 只要不是 None,就用数据库的值
  118. event_save_range = param_value
  119. else:
  120. LOGWARN("found invalid event_save_range, use default 90")
  121. return event_save_range
  122. except Exception as e:
  123. LOGERR(f"query event_save_range failed: {e}, use default 90")
  124. return EC.EC_FAILED
  125. # 创建事件过期计划
  126. def create_events_expire_plan():
  127. event_save_range = query_events_expire_range()
  128. if not event_save_range:
  129. return EC.EC_FAILED
  130. g_las.g_alarm_plan_mgr.create_clean_expire_events_task(event_save_range)
  131. return 0
  132. # 主线程
  133. def main_process():
  134. if not g_las.g_alarm_plan_mgr:
  135. LOGERR(f"error: g_alarm_plan_mgr not init")
  136. return EC.EC_FAILED
  137. if not g_las.g_alarm_plan_disp:
  138. LOGERR(f"error: g_alarm_plan_disp not init")
  139. return EC.EC_FAILED
  140. # 查询所有设备信息
  141. db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_dev_info,
  142. callback=g_Dev.g_dev_mgr.cb_handle_query_all_dev_info))
  143. # 查询所有告警计划
  144. db_req_que.put(DBRequest_Async(sql=sqls.sql_query_all_alarm_plan,
  145. callback=cb_handle_query_all_alarm_plan_info))
  146. # 创建任务:创建事件过期计划
  147. iRet = create_events_expire_plan()
  148. if iRet:
  149. LOGERR(f"create_events_expire_plan failed, process termination")
  150. return iRet
  151. # 轮循任务
  152. run()
  153. def main():
  154. # 初始化
  155. if (0 != sys_init()):
  156. sys.exit(EC.EC_FAILED)
  157. # 初始化 dev_mng
  158. init_dev_mng()
  159. g_Dev.g_dev_mgr.start()
  160. # 初始化LAS
  161. init_alarm_plan_mgr()
  162. init_alarm_plan_disp()
  163. start_alarm_plan_dispatcher() # 事件分发器
  164. start_alarm_plan_mgr() # 告警计划管理器
  165. # 数据库线程
  166. db_thread = db_process.create_db_process()
  167. db_thread.start()
  168. # MQTT 消息线程
  169. mp.g_mqtt_client = RobustMQTTClient()
  170. mp.g_mqtt_client.start()
  171. mp.g_mqtt_consumer = MQTTConsumerThread()
  172. mp.g_mqtt_consumer.start()
  173. LOGDBG(f" ================ LAS start success ...")
  174. # 主线程
  175. if not main_process():
  176. LOGERR(f"main_process error, process termination")
  177. sys.exit(EC.EC_FAILED)
  178. main()