mysql_pools.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from aiomysql import create_pool, DictCursor
  2. from applications.config import *
  3. class DatabaseManager:
  4. def __init__(self):
  5. self.databases = None
  6. self.pools = {}
  7. async def init_pools(self):
  8. # 从配置获取数据库配置,也可以直接在这里配置
  9. self.databases = {
  10. "aigc_db_pool": aigc_db_config,
  11. "long_video_db_pool": long_video_db_config,
  12. "long_articles": long_articles_db_config,
  13. "piaoquan_crawler_db": piaoquan_crawler_db_config,
  14. }
  15. for db_name, config in self.databases.items():
  16. try:
  17. pool = await create_pool(
  18. host=config["host"],
  19. port=config["port"],
  20. user=config["user"],
  21. password=config["password"],
  22. db=config["db"],
  23. minsize=config["minsize"],
  24. maxsize=config["maxsize"],
  25. cursorclass=DictCursor,
  26. autocommit=True,
  27. )
  28. self.pools[db_name] = pool
  29. print(f"✅ Created connection pool for {db_name}")
  30. except Exception as e:
  31. print(f"❌ Failed to create pool for {db_name}: {str(e)}")
  32. self.pools[db_name] = None
  33. async def close_pools(self):
  34. for name, pool in self.pools.items():
  35. if pool:
  36. pool.close()
  37. await pool.wait_closed()
  38. print(f"🔌 Closed connection pool for {name}")
  39. async def async_fetch(
  40. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  41. ):
  42. pool = self.pools[db_name]
  43. if not pool:
  44. await self.init_pools()
  45. # fetch from db
  46. try:
  47. async with pool.acquire() as conn:
  48. async with conn.cursor(cursor_type) as cursor:
  49. await cursor.execute(query, params)
  50. fetch_response = await cursor.fetchall()
  51. return fetch_response, None
  52. except Exception as e:
  53. return None, str(e)
  54. async def async_save(self, query, params, db_name="long_articles"):
  55. pool = self.pools[db_name]
  56. if not pool:
  57. await self.init_pools()
  58. async with pool.acquire() as connection:
  59. async with connection.cursor() as cursor:
  60. try:
  61. await cursor.execute(query, params)
  62. affected_rows = cursor.rowcount
  63. await connection.commit()
  64. return affected_rows
  65. except Exception as e:
  66. await connection.rollback()
  67. raise e
  68. def get_pool(self, db_name):
  69. return self.pools.get(db_name)
  70. def list_databases(self):
  71. return list(self.databases.keys())