import asyncio from typing import Optional, Dict import aiohttp from core.utils.log.logger_manager import LoggerManager class AsyncRequestClient: """ 请求失败重试,本地日志记录 请求返回code!=0重试,本地日志记录 重试达到最大次数后上报阿里云日志 """ def __init__(self, logger: Optional[LoggerManager.get_logger()] = None, aliyun_log: Optional[LoggerManager.get_aliyun_logger()] = None, max_retries=3, timeout=30): self.logger = logger self.aliyun_log = aliyun_log self.max_retries = max_retries self.timeout = timeout async def request(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> Optional[Dict]: retries = 0 resp = None # 初始化resp变量 while retries < self.max_retries: try: if self.logger: self.logger.info(f"请求 {method} {url}, 请求参数{kwargs}") if self.aliyun_log: self.aliyun_log.logging( code="1012", message="初始化请求", data={"url": url, "method": method, "requestBody": kwargs} ) async with session.request(method, url, **kwargs) as response: response.raise_for_status() resp = await response.json() if resp.get('code') != 0: retries += 1 if self.logger: self.logger.info(f"{url} 响应 {resp}, 重试 {retries}/{self.max_retries}") if retries >= self.max_retries: error_msg = f"请求响应code非0且达到最大重试次数 {self.max_retries}" if self.logger: self.logger.error(error_msg) if self.aliyun_log: self.aliyun_log.logging( code="9006", message=error_msg, data={ "url": url, "method": method, "requestBody": kwargs, "response": resp } ) await asyncio.sleep(5) continue self.logger.info(f"{url} 响应: {resp}") return resp except Exception as e: retries += 1 if retries >= self.max_retries: if self.logger: self.logger.error(f"请求失败达到最大重试次数: {e}") if self.aliyun_log: self.aliyun_log.logging( code="9006", message="请求异常达到最大重试次数", data={ "url": url, "method": method, "requestBody": kwargs, "response": str(resp) if resp else str(e), "error_type": type(e).__name__ } ) return if self.logger: self.logger.warning(f"请求失败 {e}, 重试 {retries}/{self.max_retries}") await asyncio.sleep(5)