123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- # -*- coding: utf-8 -*-
- """
- 数据库连接及操作
- """
- import pymysql
- from loguru import logger
- class MysqlHelper:
- @classmethod
- def connect_mysql(cls):
- # 创建一个 Connection 对象,代表了一个数据库连接
- connection = pymysql.connect(
- host="rm-bp13g3ra2f59q49xs.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- port=3306, # 端口号
- user="wqsd", # mysql用户名
- passwd="wqsd@2025", # mysql用户登录密码
- db="ai_knowledge", # 数据库名
- # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- charset="utf8",
- # 超时设置
- connect_timeout=30, # 连接超时时间(秒)
- read_timeout=30, # 读取超时时间(秒)
- write_timeout=30 # 写入超时时间(秒)
- )
- return connection
- @classmethod
- def get_values(cls, sql, params=None):
- try:
- # 连接数据库
- connect = cls.connect_mysql()
- # 返回一个 Cursor对象
- mysql = connect.cursor()
- if params:
- # 如果传递了 params 参数
- mysql.execute(sql, params)
- else:
- # 如果没有传递 params 参数
- mysql.execute(sql)
- # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
- data = mysql.fetchall()
- # 关闭数据库连接
- connect.close()
- # 返回查询结果,元组
- return data
- except Exception as e:
- print(f"get_values异常:{e}\n")
- @classmethod
- def update_values(cls, sql, params=None):
- """
- 执行更新操作(INSERT/UPDATE/DELETE)
-
- 参数:
- sql: 要执行的SQL语句
- params: SQL参数(可选,元组或字典)
-
- 返回:
- 成功时返回影响的行数,失败返回None
- """
- connect = None
- cursor = None
-
- try:
- connect = cls.connect_mysql()
- cursor = connect.cursor()
-
- # 执行SQL语句
- if params:
- affected_rows = cursor.execute(sql, params)
- else:
- affected_rows = cursor.execute(sql)
-
- connect.commit()
- return affected_rows
-
- except Exception as e:
- logger.error(f"SQL执行失败: {e}")
- logger.error(f"SQL语句: {sql}")
- if params:
- logger.error(f"参数: {params}")
-
- if connect:
- connect.rollback()
- return None
-
- finally:
- # 确保资源关闭
- if cursor:
- cursor.close()
- if connect:
- connect.close()
|