mysql_pools.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from aiomysql import create_pool, DictCursor
  2. from applications.config import *
  3. class DatabaseManager:
  4. def __init__(self):
  5. self.databases = None
  6. self.pools = {}
  7. async def init_pools(self):
  8. # 从环境变量获取数据库配置,也可以直接在这里配置
  9. self.databases = {
  10. "aigc_db_pool": {
  11. "host": aigc_db_config.get("host", "localhost"),
  12. "port": 3306,
  13. "user": aigc_db_config.get("user", "root"),
  14. "password": aigc_db_config.get("password", ""),
  15. "db": aigc_db_config.get("db", "database1"),
  16. "minsize": int(aigc_db_config.get("min_size", 1)),
  17. "maxsize": int(aigc_db_config.get("max_size", 5)),
  18. },
  19. "long_video_db_pool": {
  20. "host": long_video_db_config.get("host", "localhost"),
  21. "port": 3306,
  22. "user": long_video_db_config.get("user", "root"),
  23. "password": long_video_db_config.get("password", ""),
  24. "db": long_video_db_config.get("db", "database1"),
  25. "minsize": int(long_video_db_config.get("min_size", 1)),
  26. "maxsize": int(long_video_db_config.get("max_size", 5)),
  27. },
  28. }
  29. for db_name, config in self.databases.items():
  30. try:
  31. pool = await create_pool(
  32. host=config["host"],
  33. port=config["port"],
  34. user=config["user"],
  35. password=config["password"],
  36. db=config["db"],
  37. minsize=config["minsize"],
  38. maxsize=config["maxsize"],
  39. cursorclass=DictCursor,
  40. autocommit=True,
  41. )
  42. self.pools[db_name] = pool
  43. print(f"✅ Created connection pool for {db_name}")
  44. except Exception as e:
  45. print(f"❌ Failed to create pool for {db_name}: {str(e)}")
  46. self.pools[db_name] = None
  47. async def close_pools(self):
  48. for name, pool in self.pools.items():
  49. if pool:
  50. pool.close()
  51. await pool.wait_closed()
  52. print(f"🔌 Closed connection pool for {name}")
  53. async def async_fetch(self, db_name, query, cursor_type=DictCursor):
  54. pool = self.pools[db_name]
  55. if not pool:
  56. await self.init_pools()
  57. # fetch from db
  58. try:
  59. async with pool.acquire() as conn:
  60. async with conn.cursor(cursor_type) as cursor:
  61. await cursor.execute(query)
  62. fetch_response = await cursor.fetchall()
  63. return fetch_response, None
  64. except Exception as e:
  65. return None, str(e)
  66. def get_pool(self, db_name):
  67. return self.pools.get(db_name)
  68. def list_databases(self):
  69. return list(self.databases.keys())