123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- from aiomysql import create_pool, DictCursor
- from applications.config import *
- class DatabaseManager:
- def __init__(self):
- self.databases = None
- self.pools = {}
- async def init_pools(self):
- # 从环境变量获取数据库配置,也可以直接在这里配置
- self.databases = {
- "aigc_db_pool": {
- "host": aigc_db_config.get("host", "localhost"),
- "port": 3306,
- "user": aigc_db_config.get("user", "root"),
- "password": aigc_db_config.get("password", ""),
- "db": aigc_db_config.get("db", "database1"),
- "minsize": int(aigc_db_config.get("min_size", 1)),
- "maxsize": int(aigc_db_config.get("max_size", 5)),
- },
- "long_video_db_pool": {
- "host": long_video_db_config.get("host", "localhost"),
- "port": 3306,
- "user": long_video_db_config.get("user", "root"),
- "password": long_video_db_config.get("password", ""),
- "db": long_video_db_config.get("db", "database1"),
- "minsize": int(long_video_db_config.get("min_size", 1)),
- "maxsize": int(long_video_db_config.get("max_size", 5)),
- },
- }
- for db_name, config in self.databases.items():
- try:
- pool = await create_pool(
- host=config["host"],
- port=config["port"],
- user=config["user"],
- password=config["password"],
- db=config["db"],
- minsize=config["minsize"],
- maxsize=config["maxsize"],
- cursorclass=DictCursor,
- autocommit=True,
- )
- self.pools[db_name] = pool
- print(f"✅ Created connection pool for {db_name}")
- except Exception as e:
- print(f"❌ Failed to create pool for {db_name}: {str(e)}")
- self.pools[db_name] = None
- async def close_pools(self):
- for name, pool in self.pools.items():
- if pool:
- pool.close()
- await pool.wait_closed()
- print(f"🔌 Closed connection pool for {name}")
- async def async_fetch(self, db_name, query, cursor_type=DictCursor):
- pool = self.pools[db_name]
- if not pool:
- await self.init_pools()
- # fetch from db
- try:
- async with pool.acquire() as conn:
- async with conn.cursor(cursor_type) as cursor:
- await cursor.execute(query)
- fetch_response = await cursor.fetchall()
- return fetch_response, None
- except Exception as e:
- return None, str(e)
- def get_pool(self, db_name):
- return self.pools.get(db_name)
- def list_databases(self):
- return list(self.databases.keys())
|