async_http_client.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import aiohttp
  2. from typing import Optional, Union, Dict, Any
  3. class AsyncHttPClient:
  4. def __init__(
  5. self,
  6. timeout: int = 10,
  7. max_connections: int = 100,
  8. default_headers: Optional[Dict[str, str]] = None,
  9. ):
  10. """
  11. 简化版异步 HTTP 客户端
  12. :param timeout: 请求超时时间(秒)
  13. :param max_connections: 连接池最大连接数
  14. :param default_headers: 默认请求头
  15. """
  16. self.timeout = aiohttp.ClientTimeout(total=timeout)
  17. self.connector = aiohttp.TCPConnector(limit=max_connections)
  18. self.default_headers = default_headers or {}
  19. self.session = None
  20. async def __aenter__(self):
  21. self.session = aiohttp.ClientSession(
  22. connector=self.connector, timeout=self.timeout, headers=self.default_headers
  23. )
  24. return self
  25. async def __aexit__(self, exc_type, exc_val, exc_tb):
  26. await self.session.close()
  27. async def request(
  28. self,
  29. method: str,
  30. url: str,
  31. params: Optional[Dict[str, Any]] = None,
  32. data: Optional[Union[Dict[str, Any], str, bytes]] = None,
  33. json: Optional[Dict[str, Any]] = None,
  34. headers: Optional[Dict[str, str]] = None,
  35. ) -> Union[Dict[str, Any], str]:
  36. """核心请求方法"""
  37. request_headers = {**self.default_headers, **(headers or {})}
  38. try:
  39. async with self.session.request(
  40. method,
  41. url,
  42. params=params,
  43. data=data,
  44. json=json,
  45. headers=request_headers,
  46. ) as response:
  47. response.raise_for_status()
  48. content_type = response.headers.get("Content-Type", "")
  49. if "application/json" in content_type:
  50. return await response.json()
  51. return await response.text()
  52. except aiohttp.ClientResponseError as e:
  53. print(f"HTTP error: {e.status} {e.message}")
  54. raise
  55. except aiohttp.ClientError as e:
  56. print(f"Network error: {str(e)}")
  57. raise
  58. async def get(
  59. self,
  60. url: str,
  61. params: Optional[Dict[str, Any]] = None,
  62. headers: Optional[Dict[str, str]] = None,
  63. ) -> Union[Dict[str, Any], str]:
  64. """GET 请求"""
  65. return await self.request("GET", url, params=params, headers=headers)
  66. async def post(
  67. self,
  68. url: str,
  69. data: Optional[Union[Dict[str, Any], str, bytes]] = None,
  70. json: Optional[Dict[str, Any]] = None,
  71. headers: Optional[Dict[str, str]] = None,
  72. ) -> Union[Dict[str, Any], str]:
  73. """POST 请求"""
  74. return await self.request("POST", url, data=data, json=json, headers=headers)
  75. async def put(
  76. self,
  77. url: str,
  78. data: Optional[Union[Dict[str, Any], str, bytes]] = None,
  79. json: Optional[Dict[str, Any]] = None,
  80. headers: Optional[Dict[str, str]] = None
  81. ) -> Union[Dict[str, Any], str]:
  82. """
  83. PUT 请求
  84. 通常用于更新资源,可以发送表单数据或 JSON 数据
  85. """
  86. return await self.request("PUT", url, data=data, json=json, headers=headers)