async_mysql_service.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import asyncio
  2. import json
  3. from typing import List, Optional, Dict, Any, Tuple
  4. from config import settings
  5. from core.base.async_mysql_client import AsyncMySQLClient
  6. from core.models.crawler_account_info import CrawlerAccountInfo
  7. from core.utils.log.logger_manager import LoggerManager
  8. class AsyncMysqlService:
  9. """
  10. 异步业务数据库访问类(支持单例和async with)
  11. 功能特点:
  12. - 单例模式实现,相同配置共享连接池
  13. - 支持async with上下文管理,自动处理连接池生命周期
  14. - 封装业务相关的SQL操作
  15. - 完善的错误处理和日志记录
  16. """
  17. # 存储不同配置的单例实例,键为(platform, mode)元组
  18. _instances: Dict[Tuple[str, str], "AsyncMysqlService"] = {}
  19. def __new__(cls, platform: Optional[str] = None, mode: Optional[str] = None):
  20. """基于配置的单例模式,相同platform和mode共享同一个实例"""
  21. # 处理None值,设置默认值为"system"
  22. platform = platform or "system"
  23. mode = mode or "crawler"
  24. key = (platform, mode)
  25. if key not in cls._instances:
  26. instance = super().__new__(cls)
  27. instance._platform = platform
  28. instance._mode = mode
  29. instance._client = None
  30. instance._pool_initialized = False
  31. cls._instances[key] = instance
  32. return cls._instances[key]
  33. def __init__(self, platform: Optional[str] = None, mode: Optional[str] = None,trace_id: Optional[str] = None):
  34. """初始化数据库配置(仅在创建新实例时执行)"""
  35. # 避免重复初始化
  36. if self._client is not None:
  37. return
  38. self.platform = platform or "system"
  39. self.mode = mode or "crawler"
  40. self.trace_id = trace_id
  41. # 加载环境变量配置
  42. db_config = {
  43. "host": settings.DB_HOST,
  44. "port": settings.DB_PORT,
  45. "user": settings.DB_USER,
  46. "password": settings.DB_PASSWORD,
  47. "db": settings.DB_NAME,
  48. "charset": settings.DB_CHARSET
  49. }
  50. self.logger = LoggerManager.get_logger(platform=self.platform, mode=self.mode)
  51. self.aliyun_logr = LoggerManager.get_aliyun_logger(platform=self.platform, mode=self.mode)
  52. # 创建数据库客户端
  53. self._client = AsyncMySQLClient(
  54. host=db_config["host"],
  55. port=db_config["port"],
  56. user=db_config["user"],
  57. password=db_config["password"],
  58. db=db_config["db"],
  59. charset=db_config["charset"],
  60. minsize=1,
  61. maxsize=10,
  62. logger=self.logger,
  63. aliyun_logr=self.aliyun_logr
  64. )
  65. async def __aenter__(self):
  66. """支持async with上下文,初始化连接池"""
  67. if not self._pool_initialized:
  68. try:
  69. await self._client.init_pool()
  70. self._pool_initialized = True
  71. self.logger.info(f"连接池初始化成功: classname={self._platform}, mode={self._mode}")
  72. except Exception as e:
  73. self.logger.error(f"连接池初始化失败: {str(e)}")
  74. raise
  75. return self
  76. async def __aexit__(self, exc_type, exc_val, exc_tb):
  77. """支持async with上下文,关闭连接池"""
  78. if self._pool_initialized:
  79. try:
  80. await self._client.close()
  81. self._pool_initialized = False
  82. self.logger.info(f"连接池已关闭: classname={self._platform}, mode={self._mode}")
  83. except Exception as e:
  84. self.logger.warning(f"连接池关闭失败: {str(e)}")
  85. async def fetch_all(self, sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]:
  86. """执行查询并返回多行结果"""
  87. try:
  88. return await self._client.fetch_all(sql, params or [])
  89. except Exception as e:
  90. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  91. raise
  92. async def fetch_one(self, sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]:
  93. """执行查询并返回单行结果"""
  94. try:
  95. return await self._client.fetch_one(sql, params or [])
  96. except Exception as e:
  97. self.logger.error(f"查询失败 [SQL: {sql}]: {str(e)}")
  98. raise
  99. async def execute(self, sql: str, params: Optional[List[Any]] = None) -> int:
  100. """执行单条写操作(insert/update/delete)"""
  101. try:
  102. return await self._client.execute(sql, params or [])
  103. except Exception as e:
  104. self.logger.error(f"写操作失败 [SQL: {sql}]: {str(e)}")
  105. raise
  106. async def executemany(self, sql: str, params_list: List[List[Any]]) -> int:
  107. """批量执行写操作"""
  108. try:
  109. return await self._client.executemany(sql, params_list)
  110. except Exception as e:
  111. self.logger.error(f"批量写操作失败 [SQL: {sql}]: {str(e)}")
  112. raise
  113. # 业务相关方法保持不变...
  114. async def get_user_list(self, task_id: int) -> List[Dict[str, Any]]:
  115. sql = "SELECT uid, link, nick_name FROM crawler_user_v3 WHERE task_id = %s"
  116. return await self.fetch_all(sql, [task_id])
  117. async def get_rule_dict(self, rule_id: int) -> Optional[Dict[str, Any]]:
  118. sql = "SELECT rule FROM crawler_task_v3 WHERE id = %s"
  119. row = await self.fetch_one(sql, [rule_id])
  120. if not row or "rule" not in row:
  121. self.logger.warning(f"未找到规则: rule_id={rule_id}")
  122. return None
  123. try:
  124. rule_data = json.loads(row["rule"])
  125. return {k: v for item in rule_data for k, v in item.items()}
  126. except json.JSONDecodeError as e:
  127. self.logger.error(f"规则解析失败 [rule_id={rule_id}]: {str(e)}")
  128. return None
  129. async def get_today_videos(self) -> int:
  130. sql = """
  131. SELECT COUNT(*) as cnt
  132. FROM crawler_video
  133. WHERE DATE (create_time) = CURDATE()
  134. AND platform = %s
  135. AND strategy = %s \
  136. """
  137. result = await self.fetch_one(sql, [self.platform, self.mode])
  138. return result["cnt"] if result else 0
  139. async def get_xng_mid(self) -> int:
  140. # order by `create_time` desc
  141. sql = """select uid,link,nick_name from crawler_user_v3 where task_id=21"""
  142. result = await self.fetch_all(sql)
  143. return result if result else 0
  144. async def get_crawler_account_list(self, platform: str, mode: str) -> List[CrawlerAccountInfo]:
  145. """
  146. 获取指定平台和模式的爬虫账户列表
  147. Args:
  148. platform: 平台名称
  149. mode: 平台模式
  150. Returns:
  151. List[CrawlerAccountInfo]: 爬虫账户信息列表
  152. """
  153. sql = """
  154. SELECT id, platform, platform_mode, priority, created_at, updated_at, last_crawled_at
  155. FROM crawler_account_info
  156. WHERE platform = %s AND platform_mode = %s
  157. ORDER BY priority DESC, id ASC
  158. """
  159. rows = await self.fetch_all(sql, [platform, mode])
  160. return [CrawlerAccountInfo(**row) for row in rows]
  161. async def update_crawler_account_last_crawled(self, account_id: int) -> bool:
  162. """
  163. 更新爬虫账户的最后爬取时间
  164. Args:
  165. account_id: 账户ID
  166. Returns:
  167. bool: 是否更新成功
  168. """
  169. sql = "UPDATE crawler_account_info SET last_crawled_at = CURRENT_TIMESTAMP WHERE id = %s"
  170. affected_rows = await self.execute(sql, [account_id])
  171. return affected_rows > 0
  172. async def insert_crawler_account(self, account: CrawlerAccountInfo) -> Optional[int]:
  173. """
  174. 插入新的爬虫账户
  175. Args:
  176. account: 爬虫账户信息
  177. Returns:
  178. Optional[int]: 新插入记录的ID,失败返回None
  179. """
  180. sql = """
  181. INSERT INTO crawler_account_info
  182. (platform, platform_mode, priority)
  183. VALUES (%s, %s, %s)
  184. """
  185. try:
  186. # 注意:这里我们只插入指定的字段,其他字段使用默认值
  187. affected_rows = await self.execute(
  188. sql,
  189. [account.platform, account.platform_mode, account.priority]
  190. )
  191. if affected_rows > 0:
  192. # 获取插入的记录ID
  193. result = await self.fetch_one("SELECT LAST_INSERT_ID() as id")
  194. return result["id"] if result else None
  195. except Exception as e:
  196. self.logger.error(f"插入爬虫账户失败: {str(e)}")
  197. return None
  198. # 全局便捷访问函数(支持None参数)
  199. async def get_db_service(platform: Optional[str] = None, mode: Optional[str] = None) -> AsyncMysqlService:
  200. """获取数据库服务实例的便捷函数,支持platform/mode为None"""
  201. service = AsyncMysqlService(platform, mode)
  202. await service.__aenter__()
  203. return service
  204. # 示例用法
  205. async def demo_usage():
  206. # 方式一:platform和mode为None,使用默认值"system"
  207. async with AsyncMysqlService() as default_service:
  208. users = await default_service.get_user_list(21)
  209. print(f"系统配置用户数: {users}")
  210. async with AsyncMysqlService() as default_service:
  211. users = await default_service.get_xng_mid()
  212. print(f"小年糕用户数: {users}")
  213. # 方式二:显式传入None
  214. async with AsyncMysqlService(None, None) as system_service:
  215. rule = await system_service.get_rule_dict(18)
  216. print(f"自定义配置规则: {rule}")
  217. # 方式三:使用便捷函数
  218. service = await get_db_service("benshanzhufu", "recommend")
  219. try:
  220. count = await service.get_today_videos()
  221. print(f"默认配置今日视频数: {count}")
  222. finally:
  223. await service.__aexit__(None, None, None)
  224. if __name__ == '__main__':
  225. asyncio.run(demo_usage())