mysql_pools.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import logging
  2. from aiomysql import create_pool
  3. from aiomysql.cursors import DictCursor
  4. from app.core.config import GlobalConfigSettings
  5. from app.core.observability import LogService
  6. logger = logging.getLogger(__name__)
  7. class DatabaseManager:
  8. def __init__(self, config: GlobalConfigSettings, log_service: LogService):
  9. self.log_service = log_service
  10. self.database_mapper = {
  11. "aigc": config.aigc_db,
  12. "growth": config.growth_db,
  13. "long_video": config.long_video_db,
  14. "long_articles": config.long_articles_db,
  15. "piaoquan_crawler": config.piaoquan_crawler_db,
  16. }
  17. self.pools = {}
  18. async def _log(self, contents: dict):
  19. await self.log_service.log(contents)
  20. async def init_pools(self):
  21. # 从配置获取数据库配置,也可以直接在这里配置
  22. for db_name, config in self.database_mapper.items():
  23. try:
  24. pool = await create_pool(
  25. host=config.host,
  26. port=config.port,
  27. user=config.user,
  28. password=config.password,
  29. db=config.db,
  30. minsize=config.minsize,
  31. maxsize=config.maxsize,
  32. cursorclass=DictCursor,
  33. autocommit=True,
  34. )
  35. self.pools[db_name] = pool
  36. logger.info(f"{db_name} MYSQL连接池 created successfully")
  37. except Exception as e:
  38. await self._log(
  39. contents={
  40. "db_name": db_name,
  41. "error": str(e),
  42. "message": f"Failed to create pool for {db_name}",
  43. }
  44. )
  45. self.pools[db_name] = None
  46. async def close_pools(self):
  47. for name, pool in self.pools.items():
  48. if pool:
  49. pool.close()
  50. await pool.wait_closed()
  51. logger.info(f"{name} MYSQL连接池 closed successfully")
  52. async def async_fetch(
  53. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  54. ):
  55. pool = self.pools.get(db_name)
  56. if not pool:
  57. await self.init_pools()
  58. pool = self.pools.get(db_name)
  59. if not pool:
  60. raise RuntimeError(f"Database pool '{db_name}' not available after init")
  61. try:
  62. async with pool.acquire() as conn:
  63. async with conn.cursor(cursor_type) as cursor:
  64. await cursor.execute(query, params)
  65. fetch_response = await cursor.fetchall()
  66. return fetch_response
  67. except Exception as e:
  68. await self._log(
  69. contents={
  70. "task": "async_fetch",
  71. "db_name": db_name,
  72. "error": str(e),
  73. "message": f"Failed to fetch data from {db_name}",
  74. "query": query,
  75. "params": params,
  76. }
  77. )
  78. raise
  79. async def async_fetch_one(
  80. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  81. ):
  82. """查询单条记录,不存在返回 None,出错抛异常"""
  83. pool = self.pools.get(db_name)
  84. if not pool:
  85. await self.init_pools()
  86. pool = self.pools.get(db_name)
  87. if not pool:
  88. raise RuntimeError(f"Database pool '{db_name}' not available after init")
  89. try:
  90. async with pool.acquire() as conn:
  91. async with conn.cursor(cursor_type) as cursor:
  92. await cursor.execute(query, params)
  93. return await cursor.fetchone()
  94. except Exception as e:
  95. await self._log(
  96. contents={
  97. "task": "async_fetch_one",
  98. "db_name": db_name,
  99. "error": str(e),
  100. "message": f"Failed to fetch one from {db_name}",
  101. "query": query,
  102. "params": params,
  103. }
  104. )
  105. raise
  106. async def async_save(
  107. self, query, params, db_name="long_articles", batch: bool = False
  108. ):
  109. pool = self.pools.get(db_name)
  110. if not pool:
  111. await self.init_pools()
  112. pool = self.pools.get(db_name)
  113. if not pool:
  114. raise RuntimeError(f"Database pool '{db_name}' not available after init")
  115. async with pool.acquire() as connection:
  116. async with connection.cursor() as cursor:
  117. try:
  118. if batch:
  119. await cursor.executemany(query, params)
  120. else:
  121. await cursor.execute(query, params)
  122. affected_rows = cursor.rowcount
  123. await connection.commit()
  124. return affected_rows
  125. except Exception as e:
  126. await connection.rollback()
  127. await self._log(
  128. contents={
  129. "task": "async_save",
  130. "db_name": db_name,
  131. "error": str(e),
  132. "message": f"Failed to save data to {db_name}",
  133. "query": query,
  134. "params": params,
  135. }
  136. )
  137. raise
  138. def get_pool(self, db_name):
  139. return self.pools.get(db_name)
  140. def list_databases(self):
  141. return list(self.database_mapper.keys())