mysql_pools.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import logging
  2. from aiomysql import create_pool
  3. from aiomysql.cursors import DictCursor
  4. from applications.config import *
  5. logging.basicConfig(level=logging.INFO)
  6. class DatabaseManager:
  7. def __init__(self):
  8. self.databases = None
  9. self.pools = {}
  10. async def init_pools(self):
  11. # 从配置获取数据库配置,也可以直接在这里配置
  12. self.databases = {
  13. "aigc": aigc_db_config,
  14. "long_video": long_video_db_config,
  15. "long_articles": long_articles_db_config,
  16. "piaoquan_crawler": piaoquan_crawler_db_config,
  17. "growth": growth_db_config,
  18. }
  19. for db_name, config in self.databases.items():
  20. try:
  21. pool = await create_pool(
  22. host=config["host"],
  23. port=config["port"],
  24. user=config["user"],
  25. password=config["password"],
  26. db=config["db"],
  27. minsize=config["minsize"],
  28. maxsize=config["maxsize"],
  29. cursorclass=DictCursor,
  30. autocommit=True,
  31. )
  32. self.pools[db_name] = pool
  33. logging.info(f"Created connection pool for {db_name}")
  34. except Exception as e:
  35. logging.error(f"Failed to create pool for {db_name}: {str(e)}")
  36. self.pools[db_name] = None
  37. async def close_pools(self):
  38. for name, pool in self.pools.items():
  39. if pool:
  40. pool.close()
  41. await pool.wait_closed()
  42. logging.info(f"Closed connection pool for {name}")
  43. async def async_fetch(
  44. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  45. ):
  46. pool = self.pools[db_name]
  47. if not pool:
  48. await self.init_pools()
  49. # fetch from db
  50. try:
  51. async with pool.acquire() as conn:
  52. async with conn.cursor(cursor_type) as cursor:
  53. await cursor.execute(query, params)
  54. fetch_response = await cursor.fetchall()
  55. return fetch_response
  56. except Exception as e:
  57. logging.error(f"Failed to fetch {query}: {str(e)}")
  58. return None
  59. async def async_save(
  60. self, query, params, db_name="long_articles", batch: bool = False
  61. ):
  62. pool = self.pools[db_name]
  63. if not pool:
  64. await self.init_pools()
  65. async with pool.acquire() as connection:
  66. async with connection.cursor() as cursor:
  67. try:
  68. if batch:
  69. await cursor.executemany(query, params)
  70. else:
  71. await cursor.execute(query, params)
  72. affected_rows = cursor.rowcount
  73. await connection.commit()
  74. return affected_rows
  75. except Exception as e:
  76. await connection.rollback()
  77. raise e
  78. def get_pool(self, db_name):
  79. return self.pools.get(db_name)
  80. def list_databases(self):
  81. return list(self.databases.keys())