db_process.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. '''
  2. 执行数据库操作的线程
  3. '''
  4. import threading
  5. import queue
  6. import time
  7. from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
  8. import pymysql
  9. from DBUtils.PooledDB import PooledDB
  10. from concurrent.futures import ThreadPoolExecutor
  11. import json
  12. import shutil
  13. import common.sys_comm as sys_comm
  14. from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
  15. from common.sys_comm import get_bj_time_ms
  16. # ssh配置
  17. ssh_conf = {
  18. "ssh_host": "119.45.12.173",
  19. "ssh_port": 22,
  20. "ssh_user": "root",
  21. "ssh_pwd": "Hfln@667788",
  22. }
  23. # 数据库配置
  24. db_config = {
  25. # 数据库相关参数
  26. "host": "localhost",
  27. "user": "root",
  28. "password": "Hfln@1024",
  29. "database": "lnxx_dev"
  30. }
  31. # ===================== 全局对象 =====================
  32. # 请求队列
  33. db_req_que = queue.Queue()
  34. # 记录 SSH 隧道和数据库连接
  35. ssh_server = None
  36. # 连接池对象
  37. db_pool = None
  38. # 数据库线程是否运行标记
  39. db_worker_running = False
  40. # 数据库请求类
  41. class DBRequest_Async:
  42. def __init__(self, sql:str, params=None, callback=None, userdata=None):
  43. self.sql = sql
  44. self.params = params if params else ()
  45. self.callback = callback
  46. self.userdata = userdata
  47. class DBRequest_Sync(DBRequest_Async):
  48. def __init__(self, sql:str, params=None, callback=None, userdata=None):
  49. super().__init__(sql, params, callback, userdata)
  50. self._done_event = threading.Event()
  51. self._result = None
  52. self._exception = None
  53. def wait(self, timeout=None):
  54. """阻塞等待执行完成"""
  55. finished = self._done_event.wait(timeout)
  56. if not finished:
  57. raise TimeoutError("DBRequest_Sync timed out")
  58. if self._exception:
  59. raise self._exception
  60. return self._result
  61. def set_result(self, result):
  62. self._result = result
  63. self._done_event.set()
  64. def set_exception(self, e):
  65. self._exception = e
  66. self._done_event.set()
  67. # ========== 初始化配置 ==========
  68. def db_pro_init():
  69. global ssh_conf, db_config
  70. with sys_comm.g_sys_conf_mtx:
  71. ssh_conf = {
  72. "ssh_host": sys_comm.g_sys_conf["ssh_host"],
  73. "ssh_port": sys_comm.g_sys_conf["ssh_port"],
  74. "ssh_user": sys_comm.g_sys_conf["ssh_username"],
  75. "ssh_pwd": sys_comm.g_sys_conf["ssh_password"],
  76. }
  77. db_config = {
  78. "host": sys_comm.g_sys_conf["db_host"],
  79. "user": sys_comm.g_sys_conf["db_username"],
  80. "password": sys_comm.g_sys_conf["db_password"],
  81. "database": "lnxx_dev"
  82. }
  83. # ========== 初始化 SSH ==========
  84. def initialize_ssh_connection():
  85. global ssh_server
  86. if ssh_server is None or not ssh_server.is_active:
  87. ssh_server = SSHTunnelForwarder(
  88. (ssh_conf["ssh_host"], ssh_conf["ssh_port"]),
  89. ssh_username=ssh_conf["ssh_user"],
  90. ssh_password=ssh_conf["ssh_pwd"],
  91. remote_bind_address=('127.0.0.1', 3306)
  92. )
  93. ssh_server.start()
  94. LOGINFO("SSH connected")
  95. # ========== 初始化连接池 ==========
  96. def initialize_connection_pool():
  97. global db_pool, ssh_server
  98. if sys_comm.g_sys_conf["platform"] == 0:
  99. initialize_ssh_connection()
  100. port = ssh_server.local_bind_port
  101. host = "127.0.0.1"
  102. else:
  103. port = 3306
  104. host = db_config["host"]
  105. db_pool = PooledDB(
  106. creator=pymysql,
  107. maxconnections=10,
  108. mincached=2,
  109. maxcached=5,
  110. blocking=True,
  111. host=host,
  112. port=port,
  113. user=db_config['user'],
  114. password=db_config['password'],
  115. database=db_config['database'],
  116. charset='utf8mb4',
  117. cursorclass=pymysql.cursors.DictCursor
  118. )
  119. LOGINFO("DB connection pool initialized")
  120. # ========== 执行数据库请求 ==========
  121. def handle_db_request(db_request):
  122. conn = None
  123. try:
  124. conn = db_pool.connection()
  125. with conn.cursor() as cursor:
  126. cursor.execute(db_request.sql, db_request.params)
  127. sql_lower = db_request.sql.strip().lower()
  128. if sql_lower.startswith("select"):
  129. result = cursor.fetchall()
  130. elif sql_lower.startswith("insert"):
  131. result = {"lastrowid": cursor.lastrowid}
  132. else:
  133. result = {"rowcount": cursor.rowcount}
  134. # 执行回调
  135. if db_request.callback:
  136. try:
  137. db_request.callback(result, db_request.userdata)
  138. except Exception as e:
  139. LOGERR(f"[DB ERROR] 回调执行失败: {e}, sql: {db_request.sql}")
  140. if isinstance(db_request, DBRequest_Sync):
  141. db_request.set_result(result)
  142. # LOGINFO(f"[DB SUCCESS] SQL executed successfully: {db_request.sql}")
  143. conn.commit()
  144. except Exception as e:
  145. LOGERR(f"[DB ERROR] SQL执行失败: {e}, sql: {db_request.sql}")
  146. if isinstance(db_request, DBRequest_Sync):
  147. db_request.set_exception(e)
  148. finally:
  149. if conn:
  150. conn.close()
  151. # ========== 封装接口 ==========
  152. # 同步执行
  153. def db_execute_sync(sql: str, params=None, callback=None, userdata=None, timeout=5):
  154. """
  155. 如果传了 callback,会先执行 callback,再返回结果。
  156. 如果不传 callback,直接返回查询结果。
  157. 若timeout传入None会无限期等待(不建议)
  158. """
  159. if not db_worker_running:
  160. LOGERR("DB worker is not running, cannot execute sync request")
  161. return -1
  162. req = DBRequest_Sync(sql=sql, params=params, callback=callback, userdata=userdata)
  163. db_req_que.put(req)
  164. return req.wait(timeout=timeout)
  165. # 异步执行
  166. def db_execute_async(sql: str, params=None, callback=None, userdata=None):
  167. """
  168. callback: 可选,数据库操作完成后调用
  169. userdata: 可选,回调附带的用户数据
  170. """
  171. if not db_worker_running:
  172. LOGERR("DB worker is not running, cannot execute async request")
  173. return None
  174. req = DBRequest_Async(sql=sql, params=params, callback=callback, userdata=userdata)
  175. db_req_que.put(req)
  176. return req
  177. # ========== 主数据库线程 ==========
  178. def db_process():
  179. global db_worker_running
  180. db_worker_running = True
  181. db_pro_init()
  182. initialize_connection_pool()
  183. # 单线程执行器
  184. sync_executor = ThreadPoolExecutor(max_workers=1)
  185. # 多线程执行器
  186. async_executor = ThreadPoolExecutor(max_workers=8) # 限制线程并发数
  187. try:
  188. while True:
  189. db_request = db_req_que.get()
  190. if db_request is None:
  191. break
  192. try:
  193. if isinstance(db_request, DBRequest_Sync):
  194. # 同步操作
  195. handle_db_request(db_request)
  196. # sync_executor.submit(handle_db_request, db_request)
  197. else:
  198. # 异步操作
  199. async_executor.submit(handle_db_request, db_request)
  200. except Exception as e:
  201. LOGERR(f"[DB Thread Error] {e}, sql: {db_request.sql}")
  202. finally:
  203. db_req_que.task_done()
  204. finally:
  205. # 收到退出信号后,关闭执行器
  206. sync_executor.shutdown(wait=True)
  207. async_executor.shutdown(wait=True)
  208. db_worker_running = False
  209. LOGERR("DB process exit gracefully")
  210. # 创建数据库线程
  211. def create_db_process():
  212. global db_thread
  213. db_thread = threading.Thread(target=db_process, daemon=True)
  214. return db_thread
  215. # 停止数据库线程
  216. def stop_db_process():
  217. if db_worker_running:
  218. db_req_que.put(None)
  219. db_thread.join()
  220. LOGINFO("DB worker stopped")
  221. # ========== 示例 ==========
  222. # 处理数据库返回的结果
  223. def cb_handle_device_data(results, userdata):
  224. LOGDBG("Received results: {results}")
  225. # 示例请求生成器
  226. def request_generator():
  227. while True:
  228. sql_query = "SELECT * FROM dev_info" # 示例查询
  229. db_req_que.put(DBRequest_Async(sql=sql_query, callback=cb_handle_device_data))
  230. time.sleep(1) # 每秒生成一个请求
  231. def test_main():
  232. # 启动数据库线程
  233. db_thread = threading.Thread(target=db_process, daemon=True)
  234. db_thread.start()
  235. # 启动请求生成器
  236. request_gen_thread = threading.Thread(target=request_generator)
  237. request_gen_thread.start()