connection.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. """数据库连接管理
  2. 提供统一的数据库连接接口,支持环境变量配置
  3. 参考 content_finder/db/connection.py
  4. """
  5. import os
  6. import logging
  7. from typing import Optional
  8. try:
  9. import pymysql
  10. import pymysql.cursors
  11. except ImportError:
  12. pymysql = None
  13. logger = logging.getLogger(__name__)
  14. def get_connection():
  15. """获取数据库连接
  16. 从环境变量读取配置:
  17. - DB_HOST: 数据库主机地址
  18. - DB_PORT: 数据库端口(默认3306)
  19. - DB_USER: 数据库用户名
  20. - DB_PASSWORD: 数据库密码
  21. - DB_NAME: 数据库名称
  22. Returns:
  23. pymysql.Connection: 数据库连接对象
  24. Raises:
  25. ImportError: pymysql 未安装
  26. ValueError: 数据库配置缺失
  27. Exception: 连接失败
  28. """
  29. if pymysql is None:
  30. raise ImportError(
  31. "pymysql 未安装,请运行: pip install pymysql\n"
  32. "或在 requirements.txt 中添加 pymysql>=1.0.0"
  33. )
  34. # 读取环境变量
  35. host = os.getenv("DB_HOST", "").strip()
  36. port = int(os.getenv("DB_PORT", "3306"))
  37. user = os.getenv("DB_USER", "").strip()
  38. password = os.getenv("DB_PASSWORD", "")
  39. database = os.getenv("DB_NAME", "").strip()
  40. # 验证必需配置
  41. if not all([host, user, database]):
  42. raise ValueError(
  43. "数据库配置缺失!请在 .env 文件或环境变量中设置:\n"
  44. " DB_HOST=数据库主机地址\n"
  45. " DB_USER=数据库用户名\n"
  46. " DB_PASSWORD=数据库密码\n"
  47. " DB_NAME=数据库名称\n"
  48. " DB_PORT=3306 # 可选,默认3306"
  49. )
  50. try:
  51. conn = pymysql.connect(
  52. host=host,
  53. port=port,
  54. user=user,
  55. password=password,
  56. database=database,
  57. charset="utf8mb4",
  58. cursorclass=pymysql.cursors.DictCursor, # 返回字典格式
  59. autocommit=True, # 自动提交(简化事务管理)
  60. )
  61. logger.debug(f"数据库连接成功: {user}@{host}:{port}/{database}")
  62. return conn
  63. except Exception as e:
  64. logger.error(f"数据库连接失败: {e}")
  65. raise
  66. def test_connection() -> bool:
  67. """测试数据库连接
  68. Returns:
  69. bool: 连接成功返回 True,失败返回 False
  70. """
  71. try:
  72. conn = get_connection()
  73. with conn.cursor() as cursor:
  74. cursor.execute("SELECT 1")
  75. result = cursor.fetchone()
  76. conn.close()
  77. logger.info("数据库连接测试成功")
  78. return True
  79. except Exception as e:
  80. logger.error(f"数据库连接测试失败: {e}")
  81. return False
  82. if __name__ == "__main__":
  83. # 测试数据库连接
  84. logging.basicConfig(level=logging.INFO)
  85. if test_connection():
  86. print("✅ 数据库连接正常")
  87. else:
  88. print("❌ 数据库连接失败,请检查配置")