gzh_spider.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from __future__ import annotations
  2. import re
  3. import json
  4. import requests
  5. from fake_useragent import FakeUserAgent
  6. from tenacity import retry
  7. from applications.api import log
  8. from applications.utils import request_retry
  9. from applications.utils import AsyncHttpClient
  10. retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=16)
  11. # url from aigc
  12. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  13. headers = {"Content-Type": "application/json"}
  14. @retry(**retry_desc)
  15. async def get_article_detail(
  16. article_link: str, is_count: bool = False, is_cache: bool = True
  17. ) -> dict | None:
  18. """
  19. get official article detail
  20. """
  21. target_url = f"{base_url}/detail"
  22. payload = json.dumps(
  23. {
  24. "content_link": article_link,
  25. "is_count": is_count,
  26. "is_ad": False,
  27. "is_cache": is_cache,
  28. }
  29. )
  30. try:
  31. async with AsyncHttpClient(timeout=10) as http_client:
  32. response = await http_client.post(target_url, headers=headers, data=payload)
  33. except Exception as e:
  34. log(
  35. task="get_article_detail",
  36. function="get_article_detail",
  37. message=f"API请求失败: {e}",
  38. data={"link": article_link},
  39. )
  40. return None
  41. return response
  42. @retry(**retry_desc)
  43. async def get_article_list_from_account(account_id: str, index=None, is_cache=True) -> dict | None:
  44. target_url = f"{base_url}/blogger"
  45. payload = json.dumps(
  46. {
  47. "account_id": account_id,
  48. "cursor": index,
  49. "token": "1fa4c0ad5c66e43ebd525611f3869f53",
  50. "is_cache": is_cache,
  51. }
  52. )
  53. try:
  54. async with AsyncHttpClient(timeout=120) as http_client:
  55. response = await http_client.post(target_url, headers=headers, data=payload)
  56. except Exception as e:
  57. log(
  58. task="get_article_list_from_account",
  59. function="get_article_list_from_account",
  60. message=f"API请求失败: {e}",
  61. data={"account_id": account_id, "index": index},
  62. )
  63. return None
  64. return response
  65. @retry(**retry_desc)
  66. def get_source_account_from_article(article_link) -> dict | None:
  67. """
  68. get account info from official article
  69. :param article_link:
  70. :return:
  71. """
  72. try:
  73. response = requests.get(
  74. url=article_link,
  75. headers={"User-Agent": FakeUserAgent().random},
  76. timeout=120,
  77. )
  78. response.raise_for_status()
  79. html_text = response.text
  80. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  81. regex_username = r"hit_username:\s*'([^']+)'"
  82. nickname = re.search(regex_nickname, html_text)
  83. username = re.search(regex_username, html_text)
  84. # 输出提取的结果
  85. if nickname and username:
  86. return {"name": nickname.group(1), "gh_id": username.group(1)}
  87. else:
  88. return {}
  89. except requests.exceptions.RequestException as e:
  90. log(
  91. task="get_source_account_from_article",
  92. function="get_source_account_from_article",
  93. message=f"API请求失败: {e}",
  94. data={"link": article_link},
  95. )
  96. except json.JSONDecodeError as e:
  97. log(
  98. task="get_source_account_from_article",
  99. function="get_source_account_from_article",
  100. message=f"响应解析失败: {e}",
  101. data={"link": article_link},
  102. )
  103. return None
  104. @retry(**retry_desc)
  105. async def weixin_search(keyword: str, page="1") -> dict | None:
  106. url = "{}/keyword".format(base_url)
  107. payload = json.dumps({"keyword": keyword, "cursor": page})
  108. try:
  109. async with AsyncHttpClient(timeout=120) as http_client:
  110. response = await http_client.post(url=url, headers=headers, data=payload)
  111. except Exception as e:
  112. log(
  113. task="weixin_search",
  114. function="weixin_search",
  115. message=f"API请求失败: {e}",
  116. data={"keyword": keyword, "page": page},
  117. )
  118. return None
  119. return response