liuzhiheng 98910242ae knowledge_v2 骨架代码 9 hours ago
..
README.md 98910242ae knowledge_v2 骨架代码 9 hours ago
__init__.py 98910242ae knowledge_v2 骨架代码 9 hours ago
db_config.py 98910242ae knowledge_v2 骨架代码 9 hours ago
example_usage.py 98910242ae knowledge_v2 骨架代码 9 hours ago
mysql_advanced.py 98910242ae knowledge_v2 骨架代码 9 hours ago
mysql_exceptions.py 98910242ae knowledge_v2 骨架代码 9 hours ago
mysql_helper.py 98910242ae knowledge_v2 骨架代码 9 hours ago
mysql_pool.py 98910242ae knowledge_v2 骨架代码 9 hours ago
mysql_transaction.py 98910242ae knowledge_v2 骨架代码 9 hours ago
test_mysql_utils.py 98910242ae knowledge_v2 骨架代码 9 hours ago

README.md

MySQL数据库工具库

基于pymysql的MySQL数据库操作工具库,提供连接池管理、CRUD操作、高级查询、事务管理等功能。

功能特性

  • 🔗 连接池管理 - 自动管理数据库连接,支持连接复用和自动回收
  • 📝 基础CRUD操作 - 提供简洁的增删改查接口
  • 🔍 高级查询功能 - 支持分页、排序、聚合查询、模糊搜索
  • 🔄 事务管理 - 支持事务上下文管理器和手动事务控制
  • 📊 批量操作 - 支持批量插入、批量执行等高效操作
  • 🛡️ 异常处理 - 完善的异常处理机制和自定义异常类型
  • 📋 日志记录 - 可配置的日志记录,支持SQL日志和性能监控
  • 高性能 - 连接池、批量操作等性能优化

安装依赖

pip install pymysql

配置说明

在项目根目录的.env文件中配置数据库连接信息:

DB_INFO={
    "host": "your_host",
    "database": "your_database",
    "port": 3306,
    "user": "your_username",
    "passwd": "your_password",
    "charset": "utf8"
}

快速开始

基础使用

from utils.mysql import mysql_db

# 插入数据
user_id = mysql_db.insert('users', {
    'name': 'John Doe',
    'email': 'john@example.com',
    'age': 25
})

# 查询数据
users = mysql_db.select('users', where='age > %s', where_params=(20,))

# 更新数据
mysql_db.update('users', {'age': 26}, 'id = %s', (user_id,))

# 删除数据
mysql_db.delete('users', 'id = %s', (user_id,))

分页查询

# 分页查询
result = mysql_db.paginate('users', page=1, page_size=10, order_by='created_at DESC')
print(f"总记录数: {result['pagination']['total_count']}")
print(f"当前页数据: {result['data']}")

高级查询

# 聚合查询
stats = mysql_db.aggregate('users', {
    'total_count': 'COUNT(*)',
    'avg_age': 'AVG(age)',
    'max_age': 'MAX(age)'
})

# 模糊搜索
results = mysql_db.search('users', ['name', 'email'], 'john')

# 排序查询
users = mysql_db.select_with_sort('users', sort_field='age', sort_order='DESC')

事务操作

# 使用事务上下文管理器
with mysql_db.transaction():
    user_id = mysql_db.insert('users', {'name': 'John', 'age': 25})
    mysql_db.update('users', {'age': 26}, 'id = %s', (user_id,))
    # 如果发生异常,事务会自动回滚

批量操作

# 批量插入
users_data = [
    {'name': 'User1', 'email': 'user1@example.com', 'age': 25},
    {'name': 'User2', 'email': 'user2@example.com', 'age': 26}
]
mysql_db.insert_many('users', users_data)

API文档

基础CRUD操作

insert(table, data, connection=None)

插入数据

  • table: 表名
  • data: 数据字典
  • connection: 数据库连接(可选,用于事务)
  • 返回:插入记录的ID

select(table, columns="*", where="", where_params=None, order_by="", limit=None, connection=None)

查询数据

  • table: 表名
  • columns: 查询列,默认为*
  • where: WHERE条件
  • where_params: WHERE条件参数
  • order_by: 排序条件
  • limit: 限制数量
  • connection: 数据库连接(可选,用于事务)
  • 返回:查询结果列表

update(table, data, where, where_params=None, connection=None)

更新数据

  • table: 表名
  • data: 更新数据字典
  • where: WHERE条件
  • where_params: WHERE条件参数
  • connection: 数据库连接(可选,用于事务)
  • 返回:影响的行数

delete(table, where, where_params=None, connection=None)

删除数据

  • table: 表名
  • where: WHERE条件
  • where_params: WHERE条件参数
  • connection: 数据库连接(可选,用于事务)
  • 返回:影响的行数

高级查询功能

paginate(table, page=1, page_size=20, columns="*", where="", where_params=None, order_by="", connection=None)

分页查询

  • 返回包含datapagination信息的字典

aggregate(table, agg_functions, where="", where_params=None, group_by="", having="", having_params=None, connection=None)

聚合查询

  • agg_functions: 聚合函数字典,格式为 {'alias': 'function(column)'}

search(table, search_columns, keyword, columns="*", where="", where_params=None, order_by="", limit=None, connection=None)

模糊搜索

  • search_columns: 搜索的列名列表
  • keyword: 搜索关键字

事务管理

transaction(isolation_level=None)

事务上下文管理器

  • isolation_level: 事务隔离级别
with mysql_db.transaction():
    # 在事务中执行操作
    pass

execute_in_transaction(func, *args, isolation_level=None, **kwargs)

在事务中执行函数

  • func: 要执行的函数,第一个参数必须是connection

连接池管理

get_pool_status()

获取连接池状态

  • 返回包含连接池信息的字典

异常处理

工具库提供了以下自定义异常类型:

  • MySQLBaseException: 基础异常类
  • MySQLConnectionError: 连接异常
  • MySQLConfigError: 配置异常
  • MySQLQueryError: 查询异常
  • MySQLTransactionError: 事务异常
  • MySQLPoolError: 连接池异常
  • MySQLValidationError: 数据验证异常
from utils.mysql import mysql_db, MySQLConnectionError, MySQLQueryError

try:
    users = mysql_db.select('users')
except MySQLConnectionError as e:
    print(f"数据库连接失败: {e}")
except MySQLQueryError as e:
    print(f"查询失败: {e}")

日志配置

本模块使用 loguru 进行日志记录。你可以在项目中自定义 loguru 配置:

from loguru import logger

# 配置日志输出到文件
logger.add("logs/mysql.log", rotation="10 MB", retention="7 days", level="INFO")

# 配置 SQL 调试日志
logger.add("logs/mysql_sql.log", rotation="10 MB", retention="7 days", level="DEBUG", filter=lambda record: "SQL" in record["message"])

文件结构

utils/
├── __init__.py              # 主入口文件
├── db_config.py             # 数据库配置解析(使用 python-dotenv)
├── mysql_pool.py            # 连接池管理
├── mysql_helper.py          # 基础CRUD操作
├── mysql_advanced.py        # 高级查询功能
├── mysql_transaction.py     # 事务管理
├── mysql_exceptions.py      # 自定义异常
├── example_usage.py         # 使用示例
├── test_mysql_utils.py      # 测试用例
└── README.md               # 文档说明

运行测试

# 运行所有测试
python utils/test_mysql_utils.py

# 查看使用示例
python utils/example_usage.py

注意事项

  1. 数据库权限:确保数据库用户有足够的权限执行相应操作
  2. 连接配置:正确配置.env文件中的数据库连接信息
  3. 异常处理:生产环境中建议对所有数据库操作进行异常处理
  4. 事务使用:涉及多个操作的业务逻辑建议使用事务保证数据一致性
  5. 性能优化:大量数据操作时建议使用批量操作方法
  6. 日志监控:生产环境建议启用SQL日志和性能监控

版本信息

当前版本:1.0.0

许可证

MIT License