123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import asyncio
- from typing import Optional, Dict
- from pprint import pformat
- import aiohttp
- from core.utils.log.logger_manager import LoggerManager
- class AsyncRequestClient:
- """
- 请求失败重试,本地日志记录
- 请求返回code!=0重试,本地日志记录
- 重试达到最大次数后上报阿里云日志
- """
- def __init__(self, logger:Optional[LoggerManager.get_logger()] = None ,
- aliyun_log:Optional[LoggerManager.get_aliyun_logger()] = None,
- max_retries=3, timeout=30
- ):
- self.logger = logger
- self.aliyun_log = aliyun_log
- self.max_retries = max_retries
- self.timeout = timeout
- async def request(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> Optional[Dict]:
- retries = 0
- while retries < self.max_retries:
- try:
- if self.logger:
- self.logger.info(f"请求 {method} {url}, 请求参数{kwargs}")
- if self.aliyun_log:
- self.aliyun_log.logging(
- code = "1012",
- message="初始化请求",
- data={"utl":url,
- "method":method,
- "requestBody":kwargs
- }
- )
- async with session.request(method, url, **kwargs) as response:
- response.raise_for_status()
- resp = await response.json()
- if resp.get('code') != 0:
- retries += 1
- if self.logger:
- self.logger.info(f"响应 {resp}, 重试 {retries}/{self.max_retries}")
- await asyncio.sleep(5)
- continue
- return resp
- except Exception as e:
- retries += 1
- if retries >= self.max_retries:
- if self.logger:
- self.logger.error(f"请求失败达到最大重试次数: {e}")
- if self.aliyun_log:
- self.aliyun_log.logging(
- code="9006",
- message=f"请求异常达到最大重试次数",
- data={"utl": url,
- "method": method,
- "requestBody": kwargs,
- "response": resp
- }
- )
- return
- if self.logger:
- self.logger.warning(f"请求失败 {e}, 重试 {retries}/{self.max_retries}")
- await asyncio.sleep(5)
|