mysql_pools.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import logging
  2. from aiomysql import create_pool
  3. from aiomysql.cursors import DictCursor
  4. from app.core.config import GlobalConfigSettings
  5. from app.core.observability import LogService
  6. logging.basicConfig(level=logging.INFO)
  7. class DatabaseManager(LogService):
  8. def __init__(self, config: GlobalConfigSettings):
  9. super().__init__(config.aliyun_log)
  10. self.database_mapper = {
  11. "aigc": config.aigc_db,
  12. "growth": config.growth_db,
  13. "long_video": config.long_video_db,
  14. "long_articles": config.long_articles_db,
  15. "piaoquan_crawler": config.piaoquan_crawler_db,
  16. "content_decode": config.content_decode_db,
  17. }
  18. self.pools = {}
  19. async def init_pools(self):
  20. # 从配置获取数据库配置,也可以直接在这里配置
  21. for db_name, config in self.database_mapper.items():
  22. try:
  23. pool = await create_pool(
  24. host=config.host,
  25. port=config.port,
  26. user=config.user,
  27. password=config.password,
  28. db=config.db,
  29. minsize=config.minsize,
  30. maxsize=config.maxsize,
  31. cursorclass=DictCursor,
  32. autocommit=True,
  33. )
  34. self.pools[db_name] = pool
  35. logging.info(f"{db_name} MYSQL连接池 created successfully")
  36. except Exception as e:
  37. await self.log(
  38. contents={
  39. "db_name": db_name,
  40. "error": str(e),
  41. "message": f"Failed to create pool for {db_name}",
  42. }
  43. )
  44. self.pools[db_name] = None
  45. async def close_pools(self):
  46. for name, pool in self.pools.items():
  47. if pool:
  48. pool.close()
  49. await pool.wait_closed()
  50. logging.info(f"{name} MYSQL连接池 closed successfully")
  51. async def async_fetch(
  52. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  53. ):
  54. pool = self.pools[db_name]
  55. if not pool:
  56. await self.init_pools()
  57. # fetch from db
  58. try:
  59. async with pool.acquire() as conn:
  60. async with conn.cursor(cursor_type) as cursor:
  61. await cursor.execute(query, params)
  62. fetch_response = await cursor.fetchall()
  63. return fetch_response
  64. except Exception as e:
  65. await self.log(
  66. contents={
  67. "task": "async_fetch",
  68. "db_name": db_name,
  69. "error": str(e),
  70. "message": f"Failed to fetch data from {db_name}",
  71. "query": query,
  72. "params": params,
  73. }
  74. )
  75. return None
  76. async def async_save(
  77. self, query, params, db_name="long_articles", batch: bool = False
  78. ):
  79. pool = self.pools[db_name]
  80. if not pool:
  81. await self.init_pools()
  82. async with pool.acquire() as connection:
  83. async with connection.cursor() as cursor:
  84. try:
  85. if batch:
  86. await cursor.executemany(query, params)
  87. else:
  88. await cursor.execute(query, params)
  89. affected_rows = cursor.rowcount
  90. await connection.commit()
  91. return affected_rows
  92. except Exception as e:
  93. await connection.rollback()
  94. await self.log(
  95. contents={
  96. "task": "async_save",
  97. "db_name": db_name,
  98. "error": str(e),
  99. "message": f"Failed to save data to {db_name}",
  100. "query": query,
  101. "params": params,
  102. }
  103. )
  104. raise e
  105. def get_pool(self, db_name):
  106. return self.pools.get(db_name)
  107. def list_databases(self):
  108. return list(self.database_mapper.keys())