gzh_spider.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from __future__ import annotations
  2. import re
  3. import json
  4. import requests
  5. from fake_useragent import FakeUserAgent
  6. from tenacity import retry
  7. from applications.api import log
  8. from applications.utils import request_retry
  9. from applications.utils import AsyncHttpClient
  10. retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
  11. # url from aigc
  12. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  13. headers = {"Content-Type": "application/json"}
  14. @retry(**retry_desc)
  15. async def get_article_detail(
  16. article_link: str, is_count: bool = False, is_cache: bool = True
  17. ) -> dict | None:
  18. """
  19. get official article detail
  20. """
  21. target_url = f"{base_url}/detail"
  22. payload = json.dumps(
  23. {
  24. "content_link": article_link,
  25. "is_count": is_count,
  26. "is_ad": False,
  27. "is_cache": is_cache,
  28. }
  29. )
  30. async with AsyncHttpClient(timeout=10) as http_client:
  31. response = await http_client.post(target_url, headers=headers, data=payload)
  32. return response
  33. @retry(**retry_desc)
  34. async def get_article_list_from_account(account_id: str, index=None) -> dict | None:
  35. target_url = f"{base_url}/blogger"
  36. payload = json.dumps({"account_id": account_id, "cursor": index})
  37. async with AsyncHttpClient(timeout=120) as http_client:
  38. response = await http_client.post(target_url, headers=headers, data=payload)
  39. return response
  40. @retry(**retry_desc)
  41. def get_source_account_from_article(article_link) -> dict | None:
  42. """
  43. get account info from official article
  44. :param article_link:
  45. :return:
  46. """
  47. try:
  48. response = requests.get(
  49. url=article_link,
  50. headers={"User-Agent": FakeUserAgent().random},
  51. timeout=120,
  52. )
  53. response.raise_for_status()
  54. html_text = response.text
  55. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  56. regex_username = r"hit_username:\s*'([^']+)'"
  57. nickname = re.search(regex_nickname, html_text)
  58. username = re.search(regex_username, html_text)
  59. # 输出提取的结果
  60. if nickname and username:
  61. return {"name": nickname.group(1), "gh_id": username.group(1)}
  62. else:
  63. return {}
  64. except requests.exceptions.RequestException as e:
  65. log(
  66. task="get_source_account_from_article",
  67. function="get_source_account_from_article",
  68. message=f"API请求失败: {e}",
  69. data={"link": article_link},
  70. )
  71. except json.JSONDecodeError as e:
  72. log(
  73. task="get_source_account_from_article",
  74. function="get_source_account_from_article",
  75. message=f"响应解析失败: {e}",
  76. data={"link": article_link},
  77. )
  78. return None
  79. @retry(**retry_desc)
  80. async def weixin_search(keyword: str, page="1") -> dict | None:
  81. url = "{}/keyword".format(base_url)
  82. payload = json.dumps({"keyword": keyword, "cursor": page})
  83. async with AsyncHttpClient(timeout=120) as http_client:
  84. response = await http_client.post(url=url, headers=headers, data=payload)
  85. return response