import logging from aiomysql import create_pool from aiomysql.cursors import DictCursor from app.core.config import GlobalConfigSettings from app.core.observability import LogService logger = logging.getLogger(__name__) class DatabaseManager: def __init__(self, config: GlobalConfigSettings, log_service: LogService): self.log_service = log_service self.database_mapper = { "aigc": config.aigc_db, "growth": config.growth_db, "long_video": config.long_video_db, "long_articles": config.long_articles_db, "piaoquan_crawler": config.piaoquan_crawler_db, } self.pools = {} async def _log(self, contents: dict): await self.log_service.log(contents) async def init_pools(self): # 从配置获取数据库配置,也可以直接在这里配置 for db_name, config in self.database_mapper.items(): try: pool = await create_pool( host=config.host, port=config.port, user=config.user, password=config.password, db=config.db, minsize=config.minsize, maxsize=config.maxsize, cursorclass=DictCursor, autocommit=True, ) self.pools[db_name] = pool logger.info(f"{db_name} MYSQL连接池 created successfully") except Exception as e: await self._log( contents={ "db_name": db_name, "error": str(e), "message": f"Failed to create pool for {db_name}", } ) self.pools[db_name] = None async def close_pools(self): for name, pool in self.pools.items(): if pool: pool.close() await pool.wait_closed() logger.info(f"{name} MYSQL连接池 closed successfully") async def async_fetch( self, query, db_name="long_articles", params=None, cursor_type=DictCursor ): pool = self.pools.get(db_name) if not pool: await self.init_pools() pool = self.pools.get(db_name) if not pool: raise RuntimeError(f"Database pool '{db_name}' not available after init") try: async with pool.acquire() as conn: async with conn.cursor(cursor_type) as cursor: await cursor.execute(query, params) fetch_response = await cursor.fetchall() return fetch_response except Exception as e: await self._log( contents={ "task": "async_fetch", "db_name": db_name, "error": str(e), "message": f"Failed to fetch data from {db_name}", "query": query, "params": params, } ) raise async def async_fetch_one( self, query, db_name="long_articles", params=None, cursor_type=DictCursor ): """查询单条记录,不存在返回 None,出错抛异常""" pool = self.pools.get(db_name) if not pool: await self.init_pools() pool = self.pools.get(db_name) if not pool: raise RuntimeError(f"Database pool '{db_name}' not available after init") try: async with pool.acquire() as conn: async with conn.cursor(cursor_type) as cursor: await cursor.execute(query, params) return await cursor.fetchone() except Exception as e: await self._log( contents={ "task": "async_fetch_one", "db_name": db_name, "error": str(e), "message": f"Failed to fetch one from {db_name}", "query": query, "params": params, } ) raise async def async_save( self, query, params, db_name="long_articles", batch: bool = False ): pool = self.pools.get(db_name) if not pool: await self.init_pools() pool = self.pools.get(db_name) if not pool: raise RuntimeError(f"Database pool '{db_name}' not available after init") async with pool.acquire() as connection: async with connection.cursor() as cursor: try: if batch: await cursor.executemany(query, params) else: await cursor.execute(query, params) affected_rows = cursor.rowcount await connection.commit() return affected_rows except Exception as e: await connection.rollback() await self._log( contents={ "task": "async_save", "db_name": db_name, "error": str(e), "message": f"Failed to save data to {db_name}", "query": query, "params": params, } ) raise def get_pool(self, db_name): return self.pools.get(db_name) def list_databases(self): return list(self.database_mapper.keys())