gzh_spider.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from __future__ import annotations
  2. import re
  3. import json
  4. import requests
  5. from fake_useragent import FakeUserAgent
  6. from tenacity import retry
  7. from applications.api import log
  8. from applications.utils import request_retry
  9. from applications.utils import AsyncHttpClient
  10. retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
  11. # url from aigc
  12. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  13. headers = {"Content-Type": "application/json"}
  14. @retry(**retry_desc)
  15. def get_article_detail(
  16. article_link: str, is_count: bool = False, is_cache: bool = True
  17. ) -> dict | None:
  18. """
  19. get official article detail
  20. """
  21. target_url = f"{base_url}/detail"
  22. payload = json.dumps(
  23. {
  24. "content_link": article_link,
  25. "is_count": is_count,
  26. "is_ad": False,
  27. "is_cache": is_cache,
  28. }
  29. )
  30. try:
  31. response = requests.post(
  32. url=target_url, headers=headers, data=payload, timeout=120
  33. )
  34. response.raise_for_status()
  35. return response.json()
  36. except requests.exceptions.RequestException as e:
  37. log(
  38. task="get_official_article_detail",
  39. function="get_official_article_detail",
  40. message=f"API请求失败: {e}",
  41. data={"link": article_link},
  42. )
  43. except json.JSONDecodeError as e:
  44. log(
  45. task="get_official_article_detail",
  46. function="get_official_article_detail",
  47. message=f"响应解析失败: {e}",
  48. data={"link": article_link},
  49. )
  50. return None
  51. @retry(**retry_desc)
  52. def get_article_list_from_account(account_id: str, index=None) -> dict | None:
  53. target_url = f"{base_url}/blogger"
  54. payload = json.dumps({"account_id": account_id, "cursor": index})
  55. try:
  56. response = requests.post(
  57. url=target_url, headers=headers, data=payload, timeout=120
  58. )
  59. response.raise_for_status()
  60. return response.json()
  61. except requests.exceptions.RequestException as e:
  62. log(
  63. task="get_official_account_article_list",
  64. function="get_official_account_article_list",
  65. message=f"API请求失败: {e}",
  66. data={"gh_id": account_id},
  67. )
  68. except json.JSONDecodeError as e:
  69. log(
  70. task="get_official_account_article_list",
  71. function="get_official_account_article_list",
  72. message=f"响应解析失败: {e}",
  73. data={"gh_id": account_id},
  74. )
  75. return None
  76. @retry(**retry_desc)
  77. def get_source_account_from_article(article_link) -> dict | None:
  78. """
  79. get account info from official article
  80. :param article_link:
  81. :return:
  82. """
  83. try:
  84. response = requests.get(
  85. url=article_link,
  86. headers={"User-Agent": FakeUserAgent().random},
  87. timeout=120,
  88. )
  89. response.raise_for_status()
  90. html_text = response.text
  91. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  92. regex_username = r"hit_username:\s*'([^']+)'"
  93. nickname = re.search(regex_nickname, html_text)
  94. username = re.search(regex_username, html_text)
  95. # 输出提取的结果
  96. if nickname and username:
  97. return {"name": nickname.group(1), "gh_id": username.group(1)}
  98. else:
  99. return {}
  100. except requests.exceptions.RequestException as e:
  101. log(
  102. task="get_source_account_from_article",
  103. function="get_source_account_from_article",
  104. message=f"API请求失败: {e}",
  105. data={"link": article_link},
  106. )
  107. except json.JSONDecodeError as e:
  108. log(
  109. task="get_source_account_from_article",
  110. function="get_source_account_from_article",
  111. message=f"响应解析失败: {e}",
  112. data={"link": article_link},
  113. )
  114. return None
  115. @retry(**retry_desc)
  116. async def weixin_search(keyword: str, page="1") -> dict | None:
  117. url = "{}/keyword".format(base_url)
  118. payload = json.dumps({"keyword": keyword, "cursor": page})
  119. # response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
  120. async with AsyncHttpClient(timeout=120) as http_client:
  121. response = await http_client.post(url=url, headers=headers, data=payload)
  122. return response