async_request_client.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import asyncio
  2. import json
  3. from typing import Optional, Dict, Any
  4. import aiohttp
  5. from core.utils.log.logger_manager import LoggerManager
  6. class AsyncRequestClient:
  7. """
  8. 异步HTTP请求客户端,支持重试机制和详细日志记录
  9. 特性:
  10. - 请求失败自动重试(网络错误和业务逻辑错误)
  11. - 指数退避重试策略
  12. - 本地日志和阿里云日志双重记录
  13. - 可配置的重试参数和超时设置
  14. """
  15. class BusinessLogicError(Exception):
  16. """业务逻辑错误异常"""
  17. pass
  18. def __init__(
  19. self,
  20. logger: Optional[Any] = None,
  21. aliyun_log: Optional[Any] = None,
  22. max_retries: int = 3,
  23. base_delay: float = 1.0,
  24. timeout: int = 30
  25. ):
  26. """
  27. 初始化请求客户端
  28. Args:
  29. logger: 本地日志记录器实例
  30. aliyun_log: 阿里云日志记录器实例
  31. max_retries: 最大重试次数
  32. base_delay: 基础等待时间(秒),用于指数退避计算
  33. timeout: 请求超时时间(秒)
  34. """
  35. self.logger = logger
  36. self.aliyun_log = aliyun_log
  37. self.max_retries = max_retries
  38. self.base_delay = base_delay
  39. self.timeout = timeout
  40. async def request(
  41. self,
  42. session: aiohttp.ClientSession,
  43. method: str,
  44. url: str,
  45. **kwargs
  46. ) -> Optional[Dict]:
  47. """
  48. 发送HTTP请求,支持重试机制
  49. Args:
  50. session: aiohttp会话对象
  51. method: HTTP方法(GET, POST等)
  52. url: 请求URL
  53. **kwargs: 其他请求参数
  54. Returns:
  55. Optional[Dict]: 响应数据(JSON格式),失败时返回None
  56. """
  57. retries = 0
  58. last_error = None
  59. # 记录请求开始
  60. self._log_request_start(method, url, kwargs)
  61. while retries <= self.max_retries:
  62. try:
  63. # 计算等待时间(指数退避)
  64. if retries > 0:
  65. delay = self.base_delay * (2 ** (retries - 1))
  66. self._log_retry_delay(retries, delay)
  67. await asyncio.sleep(delay)
  68. # 发送请求
  69. async with session.request(
  70. method, url,
  71. timeout=aiohttp.ClientTimeout(total=self.timeout),
  72. **kwargs
  73. ) as response:
  74. # 检查HTTP状态
  75. response.raise_for_status()
  76. # 解析响应
  77. resp_data = await response.json()
  78. # 检查业务状态码
  79. if resp_data.get('code') != 0:
  80. raise self.BusinessLogicError(
  81. f"业务逻辑错误: code={resp_data.get('code')}"
  82. )
  83. # 记录成功响应
  84. self._log_success_response(url, resp_data)
  85. return resp_data
  86. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  87. last_error = e
  88. retries += 1
  89. self._log_network_error(e, retries, url)
  90. except self.BusinessLogicError as e:
  91. last_error = e
  92. retries += 1
  93. self._log_business_error(e, retries, url)
  94. except Exception as e:
  95. last_error = e
  96. retries += 1
  97. self._log_unexpected_error(e, retries, url)
  98. # 所有重试都失败
  99. self._log_final_failure(method, url, kwargs, last_error)
  100. return None
  101. def _log_request_start(self, method: str, url: str, kwargs: Dict):
  102. """记录请求开始"""
  103. if self.logger:
  104. # 简化日志,避免记录过大请求体
  105. # log_data = {k: v for k, v in kwargs.items() if k != 'json' and k != 'data'}
  106. # if 'json' in kwargs:
  107. # log_data['json'] = '[...]' # 简化JSON内容
  108. self.logger.info(f"请求 {method} {url}, 参数: {kwargs}")
  109. if self.aliyun_log:
  110. self.aliyun_log.logging(
  111. code="1012",
  112. message="初始化请求",
  113. data={
  114. "url": url,
  115. "method": method,
  116. "requestBody": {k: v for k, v in kwargs.items() if k != 'json'}
  117. }
  118. )
  119. def _log_retry_delay(self, retries: int, delay: float):
  120. """记录重试等待"""
  121. if self.logger:
  122. self.logger.info(f"第 {retries} 次重试,等待 {delay:.2f} 秒")
  123. def _log_network_error(self, error: Exception, retries: int, url: str):
  124. """记录网络错误"""
  125. if self.logger:
  126. self.logger.warning(
  127. f"请求 {url} 网络错误({retries}/{self.max_retries}): {error}"
  128. )
  129. def _log_business_error(self, error: Exception, retries: int, url: str):
  130. """记录业务逻辑错误"""
  131. if self.logger:
  132. self.logger.warning(
  133. f"请求 {url} 业务逻辑错误({retries}/{self.max_retries}): {error}"
  134. )
  135. def _log_unexpected_error(self, error: Exception, retries: int, url: str):
  136. """记录未预期错误"""
  137. if self.logger:
  138. self.logger.error(
  139. f"请求 {url} 未预期错误({retries}/{self.max_retries}): {error}"
  140. )
  141. def _log_success_response(self, url: str, response: Dict):
  142. """记录成功响应"""
  143. if self.logger:
  144. # 只记录关键信息,避免日志过大
  145. self.logger.info(f"请求 {url} 成功: code={response.get('code')}")
  146. def _log_final_failure(self, method: str, url: str, kwargs: Dict, error: Exception):
  147. """记录最终失败"""
  148. error_msg = f"请求 {method} {url} 最终失败: {error}"
  149. if self.logger:
  150. self.logger.error(error_msg)
  151. if self.aliyun_log:
  152. self.aliyun_log.logging(
  153. code="9006",
  154. message=error_msg,
  155. data={
  156. "url": url,
  157. "method": method,
  158. "requestBody": {k: v for k, v in kwargs.items() if k != 'json'},
  159. "error_type": type(error).__name__,
  160. "error_message": str(error)
  161. }
  162. )