123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import asyncio
- import json
- from typing import Optional, Dict, Any
- import aiohttp
- from core.utils.log.logger_manager import LoggerManager
- class AsyncRequestClient:
- """
- 异步HTTP请求客户端,支持重试机制和详细日志记录
- 特性:
- - 请求失败自动重试(网络错误和业务逻辑错误)
- - 指数退避重试策略
- - 本地日志和阿里云日志双重记录
- - 可配置的重试参数和超时设置
- """
- class BusinessLogicError(Exception):
- """业务逻辑错误异常"""
- pass
- def __init__(
- self,
- logger: Optional[Any] = None,
- aliyun_log: Optional[Any] = None,
- max_retries: int = 3,
- base_delay: float = 1.0,
- timeout: int = 30
- ):
- """
- 初始化请求客户端
- Args:
- logger: 本地日志记录器实例
- aliyun_log: 阿里云日志记录器实例
- max_retries: 最大重试次数
- base_delay: 基础等待时间(秒),用于指数退避计算
- timeout: 请求超时时间(秒)
- """
- self.logger = logger
- self.aliyun_log = aliyun_log
- self.max_retries = max_retries
- self.base_delay = base_delay
- self.timeout = timeout
- async def request(
- self,
- session: aiohttp.ClientSession,
- method: str,
- url: str,
- **kwargs
- ) -> Optional[Dict]:
- """
- 发送HTTP请求,支持重试机制
- Args:
- session: aiohttp会话对象
- method: HTTP方法(GET, POST等)
- url: 请求URL
- **kwargs: 其他请求参数
- Returns:
- Optional[Dict]: 响应数据(JSON格式),失败时返回None
- """
- retries = 0
- last_error = None
- # 记录请求开始
- self._log_request_start(method, url, kwargs)
- while retries <= self.max_retries:
- try:
- # 计算等待时间(指数退避)
- if retries > 0:
- delay = self.base_delay * (2 ** (retries - 1))
- self._log_retry_delay(retries, delay)
- await asyncio.sleep(delay)
- # 发送请求
- async with session.request(
- method, url,
- timeout=aiohttp.ClientTimeout(total=self.timeout),
- **kwargs
- ) as response:
- # 检查HTTP状态
- response.raise_for_status()
- # 解析响应
- resp_data = await response.json()
- # 检查业务状态码
- if resp_data.get('code') != 0:
- raise self.BusinessLogicError(
- f"业务逻辑错误: code={resp_data.get('code')}"
- )
- # 记录成功响应
- self._log_success_response(url, resp_data)
- return resp_data
- except (aiohttp.ClientError, asyncio.TimeoutError) as e:
- last_error = e
- retries += 1
- self._log_network_error(e, retries, url)
- except self.BusinessLogicError as e:
- last_error = e
- retries += 1
- self._log_business_error(e, retries, url)
- except Exception as e:
- last_error = e
- retries += 1
- self._log_unexpected_error(e, retries, url)
- # 所有重试都失败
- self._log_final_failure(method, url, kwargs, last_error)
- return None
- def _log_request_start(self, method: str, url: str, kwargs: Dict):
- """记录请求开始"""
- if self.logger:
- # 简化日志,避免记录过大请求体
- # log_data = {k: v for k, v in kwargs.items() if k != 'json' and k != 'data'}
- # if 'json' in kwargs:
- # log_data['json'] = '[...]' # 简化JSON内容
- self.logger.info(f"请求 {method} {url}, 参数: {kwargs}")
- if self.aliyun_log:
- self.aliyun_log.logging(
- code="1012",
- message="初始化请求",
- data={
- "url": url,
- "method": method,
- "requestBody": {k: v for k, v in kwargs.items() if k == 'json'}
- }
- )
- def _log_retry_delay(self, retries: int, delay: float):
- """记录重试等待"""
- if self.logger:
- self.logger.info(f"第 {retries} 次重试,等待 {delay:.2f} 秒")
- def _log_network_error(self, error: Exception, retries: int, url: str):
- """记录网络错误"""
- if self.logger:
- self.logger.warning(
- f"请求 {url} 网络错误({retries}/{self.max_retries}): {error}"
- )
- def _log_business_error(self, error: Exception, retries: int, url: str):
- """记录业务逻辑错误"""
- if self.logger:
- self.logger.warning(
- f"请求 {url} 业务逻辑错误({retries}/{self.max_retries}): {error}"
- )
- def _log_unexpected_error(self, error: Exception, retries: int, url: str):
- """记录未预期错误"""
- if self.logger:
- self.logger.error(
- f"请求 {url} 未预期错误({retries}/{self.max_retries}): {error}"
- )
- def _log_success_response(self, url: str, response: Dict):
- """记录成功响应"""
- if self.logger:
- # 只记录关键信息,避免日志过大
- self.logger.info(f"请求 {url} 成功: code={response.get('code')}")
- def _log_final_failure(self, method: str, url: str, kwargs: Dict, error: Exception):
- """记录最终失败"""
- error_msg = f"请求 {method} {url} 最终失败: {error}"
- if self.logger:
- self.logger.error(error_msg)
- if self.aliyun_log:
- self.aliyun_log.logging(
- code="9006",
- message=error_msg,
- data={
- "url": url,
- "method": method,
- "requestBody": {k: v for k, v in kwargs.items() if k == 'json'},
- "error_type": type(error).__name__,
- "error_message": str(error)
- }
- )
|