mysql_pools.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import logging
  2. from aiomysql import create_pool
  3. from aiomysql.cursors import DictCursor
  4. from applications.config import *
  5. logging.basicConfig(level=logging.INFO)
  6. class DatabaseManager:
  7. def __init__(self):
  8. self.databases = None
  9. self.pools = {}
  10. async def init_pools(self):
  11. # 从配置获取数据库配置,也可以直接在这里配置
  12. self.databases = {
  13. "aigc": aigc_db_config,
  14. "long_video": long_video_db_config,
  15. "long_articles": long_articles_db_config,
  16. "piaoquan_crawler": piaoquan_crawler_db_config,
  17. }
  18. for db_name, config in self.databases.items():
  19. try:
  20. pool = await create_pool(
  21. host=config["host"],
  22. port=config["port"],
  23. user=config["user"],
  24. password=config["password"],
  25. db=config["db"],
  26. minsize=config["minsize"],
  27. maxsize=config["maxsize"],
  28. cursorclass=DictCursor,
  29. autocommit=True,
  30. )
  31. self.pools[db_name] = pool
  32. logging.info(f"Created connection pool for {db_name}")
  33. except Exception as e:
  34. logging.error(f"Failed to create pool for {db_name}: {str(e)}")
  35. self.pools[db_name] = None
  36. async def close_pools(self):
  37. for name, pool in self.pools.items():
  38. if pool:
  39. pool.close()
  40. await pool.wait_closed()
  41. logging.info(f"Closed connection pool for {name}")
  42. async def async_fetch(
  43. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  44. ):
  45. pool = self.pools[db_name]
  46. if not pool:
  47. await self.init_pools()
  48. # fetch from db
  49. try:
  50. async with pool.acquire() as conn:
  51. async with conn.cursor(cursor_type) as cursor:
  52. await cursor.execute(query, params)
  53. fetch_response = await cursor.fetchall()
  54. return fetch_response
  55. except Exception as e:
  56. logging.error(f"Failed to fetch {query}: {str(e)}")
  57. return None
  58. async def async_save(self, query, params, db_name="long_articles"):
  59. pool = self.pools[db_name]
  60. if not pool:
  61. await self.init_pools()
  62. async with pool.acquire() as connection:
  63. async with connection.cursor() as cursor:
  64. try:
  65. await cursor.execute(query, params)
  66. affected_rows = cursor.rowcount
  67. await connection.commit()
  68. return affected_rows
  69. except Exception as e:
  70. await connection.rollback()
  71. raise e
  72. def get_pool(self, db_name):
  73. return self.pools.get(db_name)
  74. def list_databases(self):
  75. return list(self.databases.keys())