mysql_pools.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import logging
  2. from aiomysql import create_pool
  3. from aiomysql.cursors import DictCursor
  4. from app.core.config import GlobalConfigSettings
  5. from app.core.observability import LogService
  6. logging.basicConfig(level=logging.INFO)
  7. class DatabaseManager(LogService):
  8. def __init__(self, config: GlobalConfigSettings):
  9. super().__init__(config.aliyun_log)
  10. self.database_mapper = {
  11. "aigc": config.aigc_db,
  12. "growth": config.growth_db,
  13. "long_video": config.long_video_db,
  14. "long_articles": config.long_articles_db,
  15. "piaoquan_crawler": config.piaoquan_crawler_db,
  16. }
  17. self.pools = {}
  18. async def init_pools(self):
  19. # 从配置获取数据库配置,也可以直接在这里配置
  20. for db_name, config in self.database_mapper.items():
  21. try:
  22. pool = await create_pool(
  23. host=config.host,
  24. port=config.port,
  25. user=config.user,
  26. password=config.password,
  27. db=config.db,
  28. minsize=config.minsize,
  29. maxsize=config.maxsize,
  30. cursorclass=DictCursor,
  31. autocommit=True,
  32. )
  33. self.pools[db_name] = pool
  34. logging.info(f"{db_name} MYSQL连接池 created successfully")
  35. except Exception as e:
  36. await self.log(
  37. contents={
  38. "db_name": db_name,
  39. "error": str(e),
  40. "message": f"Failed to create pool for {db_name}",
  41. }
  42. )
  43. self.pools[db_name] = None
  44. async def close_pools(self):
  45. for name, pool in self.pools.items():
  46. if pool:
  47. pool.close()
  48. await pool.wait_closed()
  49. logging.info(f"{name} MYSQL连接池 closed successfully")
  50. async def async_fetch(
  51. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  52. ):
  53. pool = self.pools[db_name]
  54. if not pool:
  55. await self.init_pools()
  56. # fetch from db
  57. try:
  58. async with pool.acquire() as conn:
  59. async with conn.cursor(cursor_type) as cursor:
  60. await cursor.execute(query, params)
  61. fetch_response = await cursor.fetchall()
  62. return fetch_response
  63. except Exception as e:
  64. await self.log(
  65. contents={
  66. "task": "async_fetch",
  67. "db_name": db_name,
  68. "error": str(e),
  69. "message": f"Failed to fetch data from {db_name}",
  70. "query": query,
  71. "params": params,
  72. }
  73. )
  74. return None
  75. async def async_save(
  76. self, query, params, db_name="long_articles", batch: bool = False
  77. ):
  78. pool = self.pools[db_name]
  79. if not pool:
  80. await self.init_pools()
  81. async with pool.acquire() as connection:
  82. async with connection.cursor() as cursor:
  83. try:
  84. if batch:
  85. await cursor.executemany(query, params)
  86. else:
  87. await cursor.execute(query, params)
  88. affected_rows = cursor.rowcount
  89. await connection.commit()
  90. return affected_rows
  91. except Exception as e:
  92. await connection.rollback()
  93. await self.log(
  94. contents={
  95. "task": "async_save",
  96. "db_name": db_name,
  97. "error": str(e),
  98. "message": f"Failed to save data to {db_name}",
  99. "query": query,
  100. "params": params,
  101. }
  102. )
  103. raise e
  104. def get_pool(self, db_name):
  105. return self.pools.get(db_name)
  106. def list_databases(self):
  107. return list(self.database_mapper.keys())