db_process.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. '''
  2. 执行数据库操作的线程
  3. '''
  4. import threading
  5. import queue
  6. import time
  7. from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
  8. import pymysql
  9. from DBUtils.PooledDB import PooledDB
  10. from concurrent.futures import ThreadPoolExecutor
  11. import json
  12. import shutil
  13. import common.sys_comm as sys_comm
  14. from common.sys_comm import (
  15. LOGDBG, LOGINFO, LOGWARN, LOGERR, EC
  16. )
  17. from common.sys_comm import get_bj_time_ms
  18. # ssh配置
  19. ssh_conf = {
  20. "ssh_host": "119.45.12.173",
  21. "ssh_port": 22,
  22. "ssh_user": "root",
  23. "ssh_pwd": "Hfln@147888",
  24. }
  25. # 数据库配置
  26. db_config = {
  27. # 数据库相关参数
  28. "host": "localhost",
  29. "user": "root",
  30. "password": "Hfln@147888",
  31. "database": "lnxx_dev"
  32. }
  33. # ===================== 全局对象 =====================
  34. # 请求队列
  35. db_req_que = queue.Queue()
  36. # 记录 SSH 隧道和数据库连接
  37. ssh_server = None
  38. # 连接池对象
  39. db_pool = None
  40. # 数据库线程是否运行标记
  41. db_worker_running = False
  42. # 数据库请求类
  43. class DBRequest_Async:
  44. def __init__(self, sql:str, params=None, callback=None, userdata=None):
  45. self.sql = sql
  46. self.params = params if params else ()
  47. self.callback = callback
  48. self.userdata = userdata
  49. class DBRequest_Sync(DBRequest_Async):
  50. def __init__(self, sql:str, params=None, callback=None, userdata=None):
  51. super().__init__(sql, params, callback, userdata)
  52. self._done_event = threading.Event()
  53. self._result = None
  54. self._exception = None
  55. def wait(self, timeout=None):
  56. """阻塞等待执行完成"""
  57. finished = self._done_event.wait(timeout)
  58. if not finished:
  59. raise TimeoutError("DBRequest_Sync timed out")
  60. if self._exception:
  61. raise self._exception
  62. return self._result
  63. def set_result(self, result):
  64. self._result = result
  65. self._done_event.set()
  66. def set_exception(self, e):
  67. self._exception = e
  68. self._done_event.set()
  69. # ========== 初始化配置 ==========
  70. def db_pro_init():
  71. global ssh_conf, db_config
  72. with sys_comm.g_sys_conf_mtx:
  73. ssh_conf = {
  74. "ssh_host": sys_comm.g_sys_conf["ssh_host"],
  75. "ssh_port": sys_comm.g_sys_conf["ssh_port"],
  76. "ssh_user": sys_comm.g_sys_conf["ssh_username"],
  77. "ssh_pwd": sys_comm.g_sys_conf["ssh_password"],
  78. }
  79. db_config = {
  80. "host": sys_comm.g_sys_conf["db_host"],
  81. "user": sys_comm.g_sys_conf["db_username"],
  82. "password": sys_comm.g_sys_conf["db_password"],
  83. "database": "lnxx_dev"
  84. }
  85. # ========== 初始化 SSH ==========
  86. def initialize_ssh_connection():
  87. global ssh_server
  88. if ssh_server is None or not ssh_server.is_active:
  89. ssh_server = SSHTunnelForwarder(
  90. (ssh_conf["ssh_host"], ssh_conf["ssh_port"]),
  91. ssh_username=ssh_conf["ssh_user"],
  92. ssh_password=ssh_conf["ssh_pwd"],
  93. remote_bind_address=('127.0.0.1', 3306)
  94. )
  95. ssh_server.start()
  96. LOGINFO("SSH connected")
  97. # ========== 初始化连接池 ==========
  98. def initialize_connection_pool():
  99. global db_pool, ssh_server
  100. if sys_comm.g_sys_conf["platform"] == 0:
  101. initialize_ssh_connection()
  102. port = ssh_server.local_bind_port
  103. host = "127.0.0.1"
  104. else:
  105. port = 3306
  106. host = db_config["host"]
  107. db_pool = PooledDB(
  108. creator=pymysql,
  109. maxconnections=10,
  110. mincached=2,
  111. maxcached=5,
  112. blocking=True,
  113. host=host,
  114. port=port,
  115. user=db_config['user'],
  116. password=db_config['password'],
  117. database=db_config['database'],
  118. charset='utf8mb4',
  119. cursorclass=pymysql.cursors.DictCursor
  120. )
  121. LOGINFO("DB connection pool initialized")
  122. # ========== 执行数据库请求 ==========
  123. def handle_db_request(db_request):
  124. conn = None
  125. try:
  126. conn = db_pool.connection()
  127. with conn.cursor() as cursor:
  128. cursor.execute(db_request.sql, db_request.params)
  129. sql_lower = db_request.sql.strip().lower()
  130. if sql_lower.startswith("select"):
  131. result = cursor.fetchall()
  132. elif sql_lower.startswith("insert"):
  133. result = {"lastrowid": cursor.lastrowid}
  134. else:
  135. result = {"rowcount": cursor.rowcount}
  136. # 执行回调
  137. if db_request.callback:
  138. try:
  139. db_request.callback(result, db_request.userdata)
  140. except Exception as e:
  141. LOGERR(f"[DB ERROR] 回调执行失败: {e}, sql: {db_request.sql}")
  142. if isinstance(db_request, DBRequest_Sync):
  143. db_request.set_result(result)
  144. # LOGINFO(f"[DB SUCCESS] SQL executed successfully: {db_request.sql}")
  145. conn.commit()
  146. except Exception as e:
  147. LOGERR(f"[DB ERROR] SQL执行失败: {e}, sql: {db_request.sql}")
  148. if isinstance(db_request, DBRequest_Sync):
  149. db_request.set_exception(e)
  150. finally:
  151. if conn:
  152. conn.close()
  153. # ========== 封装接口 ==========
  154. # 同步执行
  155. def db_execute_sync(sql: str, params=None, callback=None, userdata=None, timeout=5):
  156. """
  157. 如果传了 callback,会先执行 callback,再返回结果。
  158. 如果不传 callback,直接返回查询结果。
  159. 若timeout传入None会无限期等待(不建议)
  160. """
  161. if not db_worker_running:
  162. LOGERR("DB worker is not running, cannot execute sync request")
  163. return EC.EC_FAILED
  164. req = DBRequest_Sync(sql=sql, params=params, callback=callback, userdata=userdata)
  165. db_req_que.put(req)
  166. return req.wait(timeout=timeout)
  167. # 异步执行
  168. def db_execute_async(sql: str, params=None, callback=None, userdata=None):
  169. """
  170. callback: 可选,数据库操作完成后调用
  171. userdata: 可选,回调附带的用户数据
  172. """
  173. if not db_worker_running:
  174. LOGERR("DB worker is not running, cannot execute async request")
  175. return None
  176. req = DBRequest_Async(sql=sql, params=params, callback=callback, userdata=userdata)
  177. db_req_que.put(req)
  178. return req
  179. # ========== 主数据库线程 ==========
  180. def db_process():
  181. global db_worker_running
  182. db_worker_running = True
  183. db_pro_init()
  184. initialize_connection_pool()
  185. # 多线程执行器
  186. async_executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="AsyncDBWorker")
  187. try:
  188. while True:
  189. db_request = db_req_que.get()
  190. if db_request is None:
  191. break
  192. try:
  193. if isinstance(db_request, DBRequest_Sync):
  194. # 同步操作
  195. handle_db_request(db_request)
  196. else:
  197. # 异步操作
  198. async_executor.submit(handle_db_request, db_request)
  199. except Exception as e:
  200. LOGERR(f"[DB Thread Error] {e}, sql: {db_request.sql}")
  201. finally:
  202. db_req_que.task_done()
  203. finally:
  204. # 收到退出信号后,关闭执行器
  205. async_executor.shutdown(wait=True)
  206. db_worker_running = False
  207. LOGERR("DB process exit gracefully")
  208. # 创建数据库线程
  209. def create_db_process():
  210. global db_thread
  211. db_thread = threading.Thread(target=db_process, daemon=True, name="DBWorkerThread")
  212. return db_thread
  213. # 停止数据库线程
  214. def stop_db_process():
  215. if db_worker_running:
  216. db_req_que.put(None)
  217. db_thread.join()
  218. LOGINFO("DB worker stopped")
  219. # ========== 示例 ==========
  220. # 处理数据库返回的结果
  221. def cb_handle_device_data(results, userdata):
  222. LOGDBG("Received results: {results}")
  223. # 示例请求生成器
  224. def request_generator():
  225. while True:
  226. sql_query = "SELECT * FROM dev_info" # 示例查询
  227. db_req_que.put(DBRequest_Async(sql=sql_query, callback=cb_handle_device_data))
  228. time.sleep(1) # 每秒生成一个请求
  229. def test_main():
  230. # 启动数据库线程
  231. db_thread = threading.Thread(target=db_process, daemon=True)
  232. db_thread.start()
  233. # 启动请求生成器
  234. request_gen_thread = threading.Thread(target=request_generator)
  235. request_gen_thread.start()