http_client.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import asyncio
  2. import logging
  3. import aiohttp
  4. from typing import Optional, Union, Dict, Any
  5. logger = logging.getLogger(__name__)
  6. class AsyncHttpClient:
  7. def __init__(
  8. self,
  9. timeout: int = 10,
  10. max_connections: int = 100,
  11. default_headers: Optional[Dict[str, str]] = None,
  12. ):
  13. """
  14. 简化版异步 HTTP 客户端
  15. :param timeout: 请求超时时间(秒)
  16. :param max_connections: 连接池最大连接数
  17. :param default_headers: 默认请求头
  18. """
  19. self.timeout = aiohttp.ClientTimeout(total=timeout)
  20. self.connector = aiohttp.TCPConnector(limit=max_connections)
  21. self.default_headers = default_headers or {}
  22. self.session = None
  23. async def __aenter__(self):
  24. self.session = aiohttp.ClientSession(
  25. connector=self.connector, timeout=self.timeout, headers=self.default_headers
  26. )
  27. return self
  28. async def __aexit__(self, exc_type, exc_val, exc_tb):
  29. await self.session.close()
  30. async def request(
  31. self,
  32. method: str,
  33. url: str,
  34. params: Optional[Dict[str, Any]] = None,
  35. data: Optional[Union[Dict[str, Any], str, bytes]] = None,
  36. json: Optional[Dict[str, Any]] = None,
  37. headers: Optional[Dict[str, str]] = None,
  38. ) -> Union[Dict[str, Any], str]:
  39. """核心请求方法"""
  40. request_headers = {**self.default_headers, **(headers or {})}
  41. try:
  42. async with self.session.request(
  43. method,
  44. url,
  45. params=params,
  46. data=data,
  47. json=json,
  48. headers=request_headers,
  49. ) as response:
  50. response.raise_for_status()
  51. content_type = response.headers.get("Content-Type", "")
  52. if "application/json" in content_type:
  53. return await response.json()
  54. return await response.text()
  55. except aiohttp.ClientResponseError as e:
  56. logger.error(f"HTTP error: {e.status} {e.message} url={url}")
  57. raise
  58. except aiohttp.ClientError as e:
  59. logger.error(f"Network error: {e} url={url}")
  60. raise
  61. async def get(
  62. self,
  63. url: str,
  64. params: Optional[Dict[str, Any]] = None,
  65. headers: Optional[Dict[str, str]] = None,
  66. ) -> Union[Dict[str, Any], str]:
  67. """GET 请求"""
  68. return await self.request("GET", url, params=params, headers=headers)
  69. async def post(
  70. self,
  71. url: str,
  72. data: Optional[Union[Dict[str, Any], str, bytes]] = None,
  73. json: Optional[Dict[str, Any]] = None,
  74. headers: Optional[Dict[str, str]] = None,
  75. ) -> Union[Dict[str, Any], str]:
  76. """POST 请求"""
  77. return await self.request("POST", url, data=data, json=json, headers=headers)
  78. async def put(
  79. self,
  80. url: str,
  81. data: Optional[Union[Dict[str, Any], str, bytes]] = None,
  82. json: Optional[Dict[str, Any]] = None,
  83. headers: Optional[Dict[str, str]] = None,
  84. ) -> Union[Dict[str, Any], str]:
  85. """
  86. PUT 请求
  87. 通常用于更新资源,可以发送表单数据或 JSON 数据
  88. """
  89. return await self.request("PUT", url, data=data, json=json, headers=headers)