''' 执行数据库操作的线程 ''' import threading import queue import time from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError import pymysql from DBUtils.PooledDB import PooledDB from concurrent.futures import ThreadPoolExecutor import json import shutil import common.sys_comm from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR from common.sys_comm import get_bj_time_ms from common.sys_comm import g_sys_conf, g_sys_conf_mtx # ssh配置 ssh_conf = { "ssh_host": "119.45.12.173", "ssh_port": 22, "ssh_user": "root", "ssh_pwd": "Hfln@667788", } # 数据库配置 db_config = { # 数据库相关参数 "host": "localhost", "user": "root", "password": "Hfln@1024", "database": "lnxx_dev" } # 请求队列 db_req_que = queue.Queue() # 记录 SSH 隧道和数据库连接 ssh_server = None # 连接池对象 db_pool = None # 数据库请求类 class DBRequest: def __init__(self, sql:str, params=None, callback=None, userdata=None): self.sql = sql self.params = params if params else () self.callback = callback self.userdata = userdata # ========== 初始化配置 ========== def db_pro_init(): global ssh_conf, db_config with g_sys_conf_mtx: ssh_conf = { "ssh_host": g_sys_conf["ssh_host"], "ssh_port": g_sys_conf["ssh_port"], "ssh_user": g_sys_conf["ssh_username"], "ssh_pwd": g_sys_conf["ssh_password"], } db_config = { "host": g_sys_conf["db_host"], "user": g_sys_conf["db_username"], "password": g_sys_conf["db_password"], "database": "lnxx_dev" } # ========== 初始化 SSH ========== def initialize_ssh_connection(): global ssh_server if ssh_server is None or not ssh_server.is_active: ssh_server = SSHTunnelForwarder( (ssh_conf["ssh_host"], ssh_conf["ssh_port"]), ssh_username=ssh_conf["ssh_user"], ssh_password=ssh_conf["ssh_pwd"], remote_bind_address=('127.0.0.1', 3306) ) ssh_server.start() LOGINFO("SSH connected") # ========== 初始化连接池 ========== def initialize_connection_pool(): global db_pool, ssh_server if g_sys_conf["platform"] == 0: initialize_ssh_connection() port = ssh_server.local_bind_port host = "127.0.0.1" else: port = 3306 host = db_config["host"] db_pool = PooledDB( creator=pymysql, maxconnections=10, mincached=2, maxcached=5, blocking=True, host=host, port=port, user=db_config['user'], password=db_config['password'], database=db_config['database'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) LOGINFO("DB connection pool initialized") # ========== 执行数据库请求 ========== def handle_db_request(db_request: DBRequest): conn = None try: conn = db_pool.connection() with conn.cursor() as cursor: cursor.execute(db_request.sql, db_request.params) sql_lower = db_request.sql.strip().lower() if sql_lower.startswith("select"): result = cursor.fetchall() elif sql_lower.startswith("insert"): result = {"lastrowid": cursor.lastrowid} else: result = {"rowcount": cursor.rowcount} if db_request.callback: db_request.callback(result, db_request.userdata) conn.commit() except Exception as e: LOGERR(f"[DB ERROR] SQL执行失败: {e}") finally: if conn: conn.close() db_req_que.task_done() # ========== 主数据库线程 ========== def db_process(): db_pro_init() initialize_connection_pool() executor = ThreadPoolExecutor(max_workers=8) # 限制线程并发数 while True: try: db_request: DBRequest = db_req_que.get() if db_request is None: break executor.submit(handle_db_request, db_request) except Exception as e: LOGERR(f"[DB Thread Error] {e}") time.sleep(0.01) def create_db_process(): # 启动数据库线程 return threading.Thread(target=db_process, daemon=True) # 处理数据库返回的结果 def handle_device_data(results, userdata): LOGDBG("Received results: {results}") # 示例请求生成器 def request_generator(): while True: sql_query = "SELECT * FROM dev_info" # 示例查询 db_req_que.put(DBRequest(sql=sql_query, callback=handle_device_data)) time.sleep(1) # 每秒生成一个请求 def test_main(): # 启动数据库线程 db_thread = threading.Thread(target=db_process, daemon=True) db_thread.start() # 启动请求生成器 request_gen_thread = threading.Thread(target=request_generator) request_gen_thread.start()