import asyncio import json from typing import Optional, Dict, Any import aiohttp from core.utils.log.logger_manager import LoggerManager class AsyncRequestClient: """ 异步HTTP请求客户端,支持重试机制和详细日志记录 特性: - 请求失败自动重试(网络错误和业务逻辑错误) - 指数退避重试策略 - 本地日志和阿里云日志双重记录 - 可配置的重试参数和超时设置 """ class BusinessLogicError(Exception): """业务逻辑错误异常""" pass def __init__( self, logger: Optional[Any] = None, aliyun_log: Optional[Any] = None, max_retries: int = 3, base_delay: float = 1.0, timeout: int = 30 ): """ 初始化请求客户端 Args: logger: 本地日志记录器实例 aliyun_log: 阿里云日志记录器实例 max_retries: 最大重试次数 base_delay: 基础等待时间(秒),用于指数退避计算 timeout: 请求超时时间(秒) """ self.logger = logger self.aliyun_log = aliyun_log self.max_retries = max_retries self.base_delay = base_delay self.timeout = timeout async def request( self, session: aiohttp.ClientSession, method: str, url: str, **kwargs ) -> Optional[Dict]: """ 发送HTTP请求,支持重试机制 Args: session: aiohttp会话对象 method: HTTP方法(GET, POST等) url: 请求URL **kwargs: 其他请求参数 Returns: Optional[Dict]: 响应数据(JSON格式),失败时返回None """ retries = 0 last_error = None # 记录请求开始 self._log_request_start(method, url, kwargs) while retries <= self.max_retries: try: # 计算等待时间(指数退避) if retries > 0: delay = self.base_delay * (2 ** (retries - 1)) self._log_retry_delay(retries, delay) await asyncio.sleep(delay) # 发送请求 async with session.request( method, url, timeout=aiohttp.ClientTimeout(total=self.timeout), **kwargs ) as response: # 检查HTTP状态 response.raise_for_status() # 解析响应 resp_data = await response.json() # 检查业务状态码 if resp_data.get('code') != 0: raise self.BusinessLogicError( f"业务逻辑错误: code={resp_data.get('code')}" ) # 记录成功响应 self._log_success_response(url, resp_data) return resp_data except (aiohttp.ClientError, asyncio.TimeoutError) as e: last_error = e retries += 1 self._log_network_error(e, retries, url) except self.BusinessLogicError as e: last_error = e retries += 1 self._log_business_error(e, retries, url) except Exception as e: last_error = e retries += 1 self._log_unexpected_error(e, retries, url) # 所有重试都失败 self._log_final_failure(method, url, kwargs, last_error) return None def _log_request_start(self, method: str, url: str, kwargs: Dict): """记录请求开始""" if self.logger: # 简化日志,避免记录过大请求体 # log_data = {k: v for k, v in kwargs.items() if k != 'json' and k != 'data'} # if 'json' in kwargs: # log_data['json'] = '[...]' # 简化JSON内容 self.logger.info(f"请求 {method} {url}, 参数: {kwargs}") if self.aliyun_log: self.aliyun_log.logging( code="1012", message="初始化请求", data={ "url": url, "method": method, "requestBody": {k: v for k, v in kwargs.items() if k != 'json'} } ) def _log_retry_delay(self, retries: int, delay: float): """记录重试等待""" if self.logger: self.logger.info(f"第 {retries} 次重试,等待 {delay:.2f} 秒") def _log_network_error(self, error: Exception, retries: int, url: str): """记录网络错误""" if self.logger: self.logger.warning( f"请求 {url} 网络错误({retries}/{self.max_retries}): {error}" ) def _log_business_error(self, error: Exception, retries: int, url: str): """记录业务逻辑错误""" if self.logger: self.logger.warning( f"请求 {url} 业务逻辑错误({retries}/{self.max_retries}): {error}" ) def _log_unexpected_error(self, error: Exception, retries: int, url: str): """记录未预期错误""" if self.logger: self.logger.error( f"请求 {url} 未预期错误({retries}/{self.max_retries}): {error}" ) def _log_success_response(self, url: str, response: Dict): """记录成功响应""" if self.logger: # 只记录关键信息,避免日志过大 self.logger.info(f"请求 {url} 成功: code={response.get('code')}") def _log_final_failure(self, method: str, url: str, kwargs: Dict, error: Exception): """记录最终失败""" error_msg = f"请求 {method} {url} 最终失败: {error}" if self.logger: self.logger.error(error_msg) if self.aliyun_log: self.aliyun_log.logging( code="9006", message=error_msg, data={ "url": url, "method": method, "requestBody": {k: v for k, v in kwargs.items() if k != 'json'}, "error_type": type(error).__name__, "error_message": str(error) } )