123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- '''
- 执行数据库操作的线程
- '''
- import threading
- import queue
- import time
- from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError
- import pymysql
- from DBUtils.PooledDB import PooledDB
- from concurrent.futures import ThreadPoolExecutor
- import json
- import shutil
- import common.sys_comm
- from common.sys_comm import LOGDBG, LOGINFO, LOGWARN, LOGERR
- from common.sys_comm import get_bj_time_ms
- from common.sys_comm import g_sys_conf, g_sys_conf_mtx
- # ssh配置
- ssh_conf = {
- "ssh_host": "119.45.12.173",
- "ssh_port": 22,
- "ssh_user": "root",
- "ssh_pwd": "Hfln@667788",
- }
- # 数据库配置
- db_config = {
- # 数据库相关参数
- "host": "localhost",
- "user": "root",
- "password": "Hfln@1024",
- "database": "lnxx_dev"
- }
- # 请求队列
- db_req_que = queue.Queue()
- # 记录 SSH 隧道和数据库连接
- ssh_server = None
- # 连接池对象
- db_pool = None
- # 数据库请求类
- class DBRequest:
- def __init__(self, sql:str, params=None, callback=None, userdata=None):
- self.sql = sql
- self.params = params if params else ()
- self.callback = callback
- self.userdata = userdata
- # ========== 初始化配置 ==========
- def db_pro_init():
- global ssh_conf, db_config
- with g_sys_conf_mtx:
- ssh_conf = {
- "ssh_host": g_sys_conf["ssh_host"],
- "ssh_port": g_sys_conf["ssh_port"],
- "ssh_user": g_sys_conf["ssh_username"],
- "ssh_pwd": g_sys_conf["ssh_password"],
- }
- db_config = {
- "host": g_sys_conf["db_host"],
- "user": g_sys_conf["db_username"],
- "password": g_sys_conf["db_password"],
- "database": "lnxx_dev"
- }
- # ========== 初始化 SSH ==========
- def initialize_ssh_connection():
- global ssh_server
- if ssh_server is None or not ssh_server.is_active:
- ssh_server = SSHTunnelForwarder(
- (ssh_conf["ssh_host"], ssh_conf["ssh_port"]),
- ssh_username=ssh_conf["ssh_user"],
- ssh_password=ssh_conf["ssh_pwd"],
- remote_bind_address=('127.0.0.1', 3306)
- )
- ssh_server.start()
- LOGINFO("SSH connected")
- # ========== 初始化连接池 ==========
- def initialize_connection_pool():
- global db_pool, ssh_server
- if g_sys_conf["platform"] == 0:
- initialize_ssh_connection()
- port = ssh_server.local_bind_port
- host = "127.0.0.1"
- else:
- port = 3306
- host = db_config["host"]
- db_pool = PooledDB(
- creator=pymysql,
- maxconnections=10,
- mincached=2,
- maxcached=5,
- blocking=True,
- host=host,
- port=port,
- user=db_config['user'],
- password=db_config['password'],
- database=db_config['database'],
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor
- )
- LOGINFO("DB connection pool initialized")
- # ========== 执行数据库请求 ==========
- def handle_db_request(db_request: DBRequest):
- conn = None
- try:
- conn = db_pool.connection()
- with conn.cursor() as cursor:
- cursor.execute(db_request.sql, db_request.params)
- sql_lower = db_request.sql.strip().lower()
- if sql_lower.startswith("select"):
- result = cursor.fetchall()
- elif sql_lower.startswith("insert"):
- result = {"lastrowid": cursor.lastrowid}
- else:
- result = {"rowcount": cursor.rowcount}
- if db_request.callback:
- if db_request.userdata:
- db_request.callback(result, db_request.userdata)
- else:
- db_request.callback(result)
- conn.commit()
- except Exception as e:
- LOGERR(f"[DB ERROR] SQL执行失败: {e}")
- finally:
- if conn:
- conn.close()
- db_req_que.task_done()
- # ========== 主数据库线程 ==========
- def db_process():
- db_pro_init()
- initialize_connection_pool()
- executor = ThreadPoolExecutor(max_workers=8) # 限制线程并发数
- while True:
- try:
- db_request: DBRequest = db_req_que.get()
- if db_request is None:
- break
- executor.submit(handle_db_request, db_request)
- except Exception as e:
- LOGERR(f"[DB Thread Error] {e}")
- time.sleep(0.01)
- def create_db_process():
- # 启动数据库线程
- return threading.Thread(target=db_process, daemon=True)
- # 处理数据库返回的结果
- def handle_device_data(results):
- LOGDBG("Received results: {results}")
- # 示例请求生成器
- def request_generator():
- while True:
- sql_query = "SELECT * FROM dev_info" # 示例查询
- db_req_que.put(DBRequest(sql=sql_query, callback=handle_device_data))
- time.sleep(1) # 每秒生成一个请求
- def test_main():
- # 启动数据库线程
- db_thread = threading.Thread(target=db_process, daemon=True)
- db_thread.start()
- # 启动请求生成器
- request_gen_thread = threading.Thread(target=request_generator)
- request_gen_thread.start()
|