mysql_pools.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. from aiomysql import create_pool
  2. from aiomysql.cursors import DictCursor
  3. from applications.config import *
  4. class DatabaseManager:
  5. def __init__(self):
  6. self.databases = None
  7. self.pools = {}
  8. async def init_pools(self):
  9. # 从配置获取数据库配置,也可以直接在这里配置
  10. self.databases = {
  11. "aigc_db_pool": aigc_db_config,
  12. "long_video_db_pool": long_video_db_config,
  13. "long_articles": long_articles_db_config,
  14. "piaoquan_crawler_db": piaoquan_crawler_db_config,
  15. }
  16. for db_name, config in self.databases.items():
  17. try:
  18. pool = await create_pool(
  19. host=config["host"],
  20. port=config["port"],
  21. user=config["user"],
  22. password=config["password"],
  23. db=config["db"],
  24. minsize=config["minsize"],
  25. maxsize=config["maxsize"],
  26. cursorclass=DictCursor,
  27. autocommit=True,
  28. )
  29. self.pools[db_name] = pool
  30. print(f"✅ Created connection pool for {db_name}")
  31. except Exception as e:
  32. print(f"❌ Failed to create pool for {db_name}: {str(e)}")
  33. self.pools[db_name] = None
  34. async def close_pools(self):
  35. for name, pool in self.pools.items():
  36. if pool:
  37. pool.close()
  38. await pool.wait_closed()
  39. print(f"🔌 Closed connection pool for {name}")
  40. async def async_fetch(
  41. self, query, db_name="long_articles", params=None, cursor_type=DictCursor
  42. ):
  43. pool = self.pools[db_name]
  44. if not pool:
  45. await self.init_pools()
  46. # fetch from db
  47. try:
  48. async with pool.acquire() as conn:
  49. async with conn.cursor(cursor_type) as cursor:
  50. await cursor.execute(query, params)
  51. fetch_response = await cursor.fetchall()
  52. return fetch_response, None
  53. except Exception as e:
  54. return None, str(e)
  55. async def async_save(self, query, params, db_name="long_articles"):
  56. pool = self.pools[db_name]
  57. if not pool:
  58. await self.init_pools()
  59. async with pool.acquire() as connection:
  60. async with connection.cursor() as cursor:
  61. try:
  62. await cursor.execute(query, params)
  63. affected_rows = cursor.rowcount
  64. await connection.commit()
  65. return affected_rows
  66. except Exception as e:
  67. await connection.rollback()
  68. raise e
  69. def get_pool(self, db_name):
  70. return self.pools.get(db_name)
  71. def list_databases(self):
  72. return list(self.databases.keys())