gzh_spider.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from __future__ import annotations
  2. import re
  3. import json
  4. import requests
  5. from fake_useragent import FakeUserAgent
  6. from tenacity import retry
  7. from applications.api import log
  8. from applications.utils import request_retry
  9. from applications.utils import AsyncHttpClient
  10. retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
  11. # url from aigc
  12. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  13. headers = {"Content-Type": "application/json"}
  14. @retry(**retry_desc)
  15. async def get_article_detail(
  16. article_link: str, is_count: bool = False, is_cache: bool = True
  17. ) -> dict | None:
  18. """
  19. get official article detail
  20. """
  21. target_url = f"{base_url}/detail"
  22. payload = json.dumps(
  23. {
  24. "content_link": article_link,
  25. "is_count": is_count,
  26. "is_ad": False,
  27. "is_cache": is_cache,
  28. }
  29. )
  30. async with AsyncHttpClient(timeout=10) as http_client:
  31. response = await http_client.post(target_url, headers=headers, data=payload)
  32. return response
  33. # try:
  34. # response = requests.post(
  35. # url=target_url, headers=headers, data=payload, timeout=120
  36. # )
  37. # response.raise_for_status()
  38. # return response.json()
  39. # except requests.exceptions.RequestException as e:
  40. # log(
  41. # task="get_official_article_detail",
  42. # function="get_official_article_detail",
  43. # message=f"API请求失败: {e}",
  44. # data={"link": article_link},
  45. # )
  46. # except json.JSONDecodeError as e:
  47. # log(
  48. # task="get_official_article_detail",
  49. # function="get_official_article_detail",
  50. # message=f"响应解析失败: {e}",
  51. # data={"link": article_link},
  52. # )
  53. # return None
  54. @retry(**retry_desc)
  55. async def get_article_list_from_account(account_id: str, index=None) -> dict | None:
  56. target_url = f"{base_url}/blogger"
  57. payload = json.dumps({"account_id": account_id, "cursor": index})
  58. async with AsyncHttpClient(timeout=120) as http_client:
  59. response = await http_client.post(target_url, headers=headers, data=payload)
  60. return response
  61. # try:
  62. # response = requests.post(
  63. # url=target_url, headers=headers, data=payload, timeout=120
  64. # )
  65. # response.raise_for_status()
  66. # return response.json()
  67. # except requests.exceptions.RequestException as e:
  68. # log(
  69. # task="get_official_account_article_list",
  70. # function="get_official_account_article_list",
  71. # message=f"API请求失败: {e}",
  72. # data={"gh_id": account_id},
  73. # )
  74. # except json.JSONDecodeError as e:
  75. # log(
  76. # task="get_official_account_article_list",
  77. # function="get_official_account_article_list",
  78. # message=f"响应解析失败: {e}",
  79. # data={"gh_id": account_id},
  80. # )
  81. # return None
  82. @retry(**retry_desc)
  83. def get_source_account_from_article(article_link) -> dict | None:
  84. """
  85. get account info from official article
  86. :param article_link:
  87. :return:
  88. """
  89. try:
  90. response = requests.get(
  91. url=article_link,
  92. headers={"User-Agent": FakeUserAgent().random},
  93. timeout=120,
  94. )
  95. response.raise_for_status()
  96. html_text = response.text
  97. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  98. regex_username = r"hit_username:\s*'([^']+)'"
  99. nickname = re.search(regex_nickname, html_text)
  100. username = re.search(regex_username, html_text)
  101. # 输出提取的结果
  102. if nickname and username:
  103. return {"name": nickname.group(1), "gh_id": username.group(1)}
  104. else:
  105. return {}
  106. except requests.exceptions.RequestException as e:
  107. log(
  108. task="get_source_account_from_article",
  109. function="get_source_account_from_article",
  110. message=f"API请求失败: {e}",
  111. data={"link": article_link},
  112. )
  113. except json.JSONDecodeError as e:
  114. log(
  115. task="get_source_account_from_article",
  116. function="get_source_account_from_article",
  117. message=f"响应解析失败: {e}",
  118. data={"link": article_link},
  119. )
  120. return None
  121. @retry(**retry_desc)
  122. async def weixin_search(keyword: str, page="1") -> dict | None:
  123. url = "{}/keyword".format(base_url)
  124. payload = json.dumps({"keyword": keyword, "cursor": page})
  125. # response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
  126. async with AsyncHttpClient(timeout=120) as http_client:
  127. response = await http_client.post(url=url, headers=headers, data=payload)
  128. return response