from aiomysql import create_pool, DictCursor from applications.config import * class DatabaseManager: def __init__(self): self.databases = None self.pools = {} async def init_pools(self): # 从环境变量获取数据库配置,也可以直接在这里配置 self.databases = { "aigc_db_pool": { "host": aigc_db_config.get("host", "localhost"), "port": 3306, "user": aigc_db_config.get("user", "root"), "password": aigc_db_config.get("password", ""), "db": aigc_db_config.get("db", "database1"), "minsize": int(aigc_db_config.get("min_size", 1)), "maxsize": int(aigc_db_config.get("max_size", 5)), }, "long_video_db_pool": { "host": long_video_db_config.get("host", "localhost"), "port": 3306, "user": long_video_db_config.get("user", "root"), "password": long_video_db_config.get("password", ""), "db": long_video_db_config.get("db", "database1"), "minsize": int(long_video_db_config.get("min_size", 1)), "maxsize": int(long_video_db_config.get("max_size", 5)), }, } for db_name, config in self.databases.items(): try: pool = await create_pool( host=config["host"], port=config["port"], user=config["user"], password=config["password"], db=config["db"], minsize=config["minsize"], maxsize=config["maxsize"], cursorclass=DictCursor, autocommit=True, ) self.pools[db_name] = pool print(f"✅ Created connection pool for {db_name}") except Exception as e: print(f"❌ Failed to create pool for {db_name}: {str(e)}") self.pools[db_name] = None async def close_pools(self): for name, pool in self.pools.items(): if pool: pool.close() await pool.wait_closed() print(f"🔌 Closed connection pool for {name}") async def async_fetch(self, db_name, query, cursor_type=DictCursor): pool = self.pools[db_name] if not pool: await self.init_pools() # fetch from db try: async with pool.acquire() as conn: async with conn.cursor(cursor_type) as cursor: await cursor.execute(query) fetch_response = await cursor.fetchall() return fetch_response, None except Exception as e: return None, str(e) def get_pool(self, db_name): return self.pools.get(db_name) def list_databases(self): return list(self.databases.keys())