# -*- coding: utf-8 -*- """ 数据库连接及操作 """ import pymysql from loguru import logger class MysqlHelper: @classmethod def connect_mysql(cls): # 创建一个 Connection 对象,代表了一个数据库连接 connection = pymysql.connect( host="rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 port=3306, # 端口号 user="wqsd", # mysql用户名 passwd="wqsd@2025", # mysql用户登录密码 db="ai_knowledge", # 数据库名 # 如果数据库里面的文本是utf8编码的,charset指定是utf8 charset="utf8", # 超时设置 connect_timeout=30, # 连接超时时间(秒) read_timeout=30, # 读取超时时间(秒) write_timeout=30 # 写入超时时间(秒) ) return connection @classmethod def get_values(cls, sql, params=None): try: # 连接数据库 connect = cls.connect_mysql() # 返回一个 Cursor对象 mysql = connect.cursor() if params: # 如果传递了 params 参数 mysql.execute(sql, params) else: # 如果没有传递 params 参数 mysql.execute(sql) # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录 data = mysql.fetchall() # 关闭数据库连接 connect.close() # 返回查询结果,元组 return data except Exception as e: print(f"get_values异常:{e}\n") @classmethod def update_values(cls, sql, params=None): """ 执行更新操作(INSERT/UPDATE/DELETE) 参数: sql: 要执行的SQL语句 params: SQL参数(可选,元组或字典) 返回: 成功时返回影响的行数,失败返回None """ connect = None cursor = None try: connect = cls.connect_mysql() cursor = connect.cursor() # 执行SQL语句 if params: affected_rows = cursor.execute(sql, params) else: affected_rows = cursor.execute(sql) connect.commit() return affected_rows except Exception as e: logger.error(f"SQL执行失败: {e}") logger.error(f"SQL语句: {sql}") if params: logger.error(f"参数: {params}") if connect: connect.rollback() return None finally: # 确保资源关闭 if cursor: cursor.close() if connect: connect.close()