db_process.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. '''
  2. 执行数据库操作的线程
  3. '''
  4. import threading
  5. import queue
  6. import time
  7. from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
  8. import pymysql
  9. from DBUtils.PooledDB import PooledDB
  10. from concurrent.futures import ThreadPoolExecutor
  11. import json
  12. import shutil
  13. import common.sys_comm
  14. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  15. from common.sys_comm import get_bj_time_ms
  16. from common.sys_comm import g_sys_conf, g_sys_conf_mtx
  17. # ssh配置
  18. ssh_conf = {
  19. "ssh_host": "119.45.12.173",
  20. "ssh_port": 22,
  21. "ssh_user": "root",
  22. "ssh_pwd": "Hfln@667788",
  23. }
  24. # 数据库配置
  25. db_config = {
  26. # 数据库相关参数
  27. "host": "localhost",
  28. "user": "root",
  29. "password": "Hfln@1024",
  30. "database": "lnxx_dev"
  31. }
  32. # 请求队列
  33. db_req_que = queue.Queue()
  34. # 记录 SSH 隧道和数据库连接
  35. ssh_server = None
  36. # 连接池对象
  37. db_pool = None
  38. # 数据库请求类
  39. class DBRequest:
  40. def __init__(self, sql:str, params=None, callback=None, userdata=None):
  41. self.sql = sql
  42. self.params = params if params else ()
  43. self.callback = callback
  44. self.userdata = userdata
  45. # ========== 初始化配置 ==========
  46. def db_pro_init():
  47. global ssh_conf, db_config
  48. with g_sys_conf_mtx:
  49. ssh_conf = {
  50. "ssh_host": g_sys_conf["ssh_host"],
  51. "ssh_port": g_sys_conf["ssh_port"],
  52. "ssh_user": g_sys_conf["ssh_username"],
  53. "ssh_pwd": g_sys_conf["ssh_password"],
  54. }
  55. db_config = {
  56. "host": g_sys_conf["db_host"],
  57. "user": g_sys_conf["db_username"],
  58. "password": g_sys_conf["db_password"],
  59. "database": "lnxx_dev"
  60. }
  61. # ========== 初始化 SSH ==========
  62. def initialize_ssh_connection():
  63. global ssh_server
  64. if ssh_server is None or not ssh_server.is_active:
  65. ssh_server = SSHTunnelForwarder(
  66. (ssh_conf["ssh_host"], ssh_conf["ssh_port"]),
  67. ssh_username=ssh_conf["ssh_user"],
  68. ssh_password=ssh_conf["ssh_pwd"],
  69. remote_bind_address=('127.0.0.1', 3306)
  70. )
  71. ssh_server.start()
  72. LOGINFO("SSH connected")
  73. # ========== 初始化连接池 ==========
  74. def initialize_connection_pool():
  75. global db_pool, ssh_server
  76. if g_sys_conf["platform"] == 0:
  77. initialize_ssh_connection()
  78. port = ssh_server.local_bind_port
  79. host = "127.0.0.1"
  80. else:
  81. port = 3306
  82. host = db_config["host"]
  83. db_pool = PooledDB(
  84. creator=pymysql,
  85. maxconnections=10,
  86. mincached=2,
  87. maxcached=5,
  88. blocking=True,
  89. host=host,
  90. port=port,
  91. user=db_config['user'],
  92. password=db_config['password'],
  93. database=db_config['database'],
  94. charset='utf8mb4',
  95. cursorclass=pymysql.cursors.DictCursor
  96. )
  97. LOGINFO("DB connection pool initialized")
  98. # ========== 执行数据库请求 ==========
  99. def handle_db_request(db_request: DBRequest):
  100. conn = None
  101. try:
  102. conn = db_pool.connection()
  103. with conn.cursor() as cursor:
  104. cursor.execute(db_request.sql, db_request.params)
  105. sql_lower = db_request.sql.strip().lower()
  106. if sql_lower.startswith("select"):
  107. result = cursor.fetchall()
  108. elif sql_lower.startswith("insert"):
  109. result = {"lastrowid": cursor.lastrowid}
  110. else:
  111. result = {"rowcount": cursor.rowcount}
  112. if db_request.callback:
  113. if db_request.userdata:
  114. db_request.callback(result, db_request.userdata)
  115. else:
  116. db_request.callback(result)
  117. conn.commit()
  118. except Exception as e:
  119. LOGERR(f"[DB ERROR] SQL执行失败: {e}")
  120. finally:
  121. if conn:
  122. conn.close()
  123. db_req_que.task_done()
  124. # ========== 主数据库线程 ==========
  125. def db_process():
  126. db_pro_init()
  127. initialize_connection_pool()
  128. executor = ThreadPoolExecutor(max_workers=8) # 限制线程并发数
  129. while True:
  130. try:
  131. db_request: DBRequest = db_req_que.get()
  132. if db_request is None:
  133. break
  134. executor.submit(handle_db_request, db_request)
  135. except Exception as e:
  136. LOGERR(f"[DB Thread Error] {e}")
  137. time.sleep(0.01)
  138. def create_db_process():
  139. # 启动数据库线程
  140. return threading.Thread(target=db_process, daemon=True)
  141. # 处理数据库返回的结果
  142. def handle_device_data(results):
  143. LOGDBG("Received results: {results}")
  144. # 示例请求生成器
  145. def request_generator():
  146. while True:
  147. sql_query = "SELECT * FROM dev_info" # 示例查询
  148. db_req_que.put(DBRequest(sql=sql_query, callback=handle_device_data))
  149. time.sleep(1) # 每秒生成一个请求
  150. def test_main():
  151. # 启动数据库线程
  152. db_thread = threading.Thread(target=db_process, daemon=True)
  153. db_thread.start()
  154. # 启动请求生成器
  155. request_gen_thread = threading.Thread(target=request_generator)
  156. request_gen_thread.start()