gzh_spider.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from __future__ import annotations
  2. import re
  3. import json
  4. import requests
  5. from fake_useragent import FakeUserAgent
  6. from tenacity import retry
  7. from app.infra.external import log
  8. from app.infra.shared.tools import request_retry
  9. from app.infra.shared import AsyncHttpClient
  10. retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=16)
  11. # url from aigc
  12. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  13. headers = {"Content-Type": "application/json"}
  14. @retry(**retry_desc)
  15. async def get_article_detail(
  16. article_link: str, is_count: bool = False, is_cache: bool = True
  17. ) -> dict | None:
  18. """
  19. get official article detail
  20. """
  21. target_url = f"{base_url}/detail"
  22. payload = json.dumps(
  23. {
  24. "content_link": article_link,
  25. "is_count": is_count,
  26. "is_ad": False,
  27. "is_cache": is_cache,
  28. }
  29. )
  30. try:
  31. async with AsyncHttpClient(timeout=10) as http_client:
  32. response = await http_client.post(target_url, headers=headers, data=payload)
  33. except Exception as e:
  34. log(
  35. task="get_article_detail",
  36. function="get_article_detail",
  37. message=f"API请求失败: {e}",
  38. data={"link": article_link},
  39. )
  40. return None
  41. return response
  42. @retry(**retry_desc)
  43. async def get_article_list_from_account(
  44. account_id: str, index=None, is_cache=True
  45. ) -> dict | None:
  46. target_url = f"{base_url}/blogger"
  47. payload = json.dumps(
  48. {
  49. "account_id": account_id,
  50. "cursor": index,
  51. "token": "1fa4c0ad5c66e43ebd525611f3869f53",
  52. "is_cache": is_cache,
  53. }
  54. )
  55. try:
  56. async with AsyncHttpClient(timeout=120) as http_client:
  57. response = await http_client.post(target_url, headers=headers, data=payload)
  58. except Exception as e:
  59. log(
  60. task="get_article_list_from_account",
  61. function="get_article_list_from_account",
  62. message=f"API请求失败: {e}",
  63. data={"account_id": account_id, "index": index},
  64. )
  65. return None
  66. return response
  67. @retry(**retry_desc)
  68. def get_source_account_from_article(article_link) -> dict | None:
  69. """
  70. get account info from official article
  71. :param article_link:
  72. :return:
  73. """
  74. try:
  75. response = requests.get(
  76. url=article_link,
  77. headers={"User-Agent": FakeUserAgent().random},
  78. timeout=120,
  79. )
  80. response.raise_for_status()
  81. html_text = response.text
  82. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  83. regex_username = r"hit_username:\s*'([^']+)'"
  84. nickname = re.search(regex_nickname, html_text)
  85. username = re.search(regex_username, html_text)
  86. # 输出提取的结果
  87. if nickname and username:
  88. return {"name": nickname.group(1), "gh_id": username.group(1)}
  89. else:
  90. return {}
  91. except requests.exceptions.RequestException as e:
  92. log(
  93. task="get_source_account_from_article",
  94. function="get_source_account_from_article",
  95. message=f"API请求失败: {e}",
  96. data={"link": article_link},
  97. )
  98. except json.JSONDecodeError as e:
  99. log(
  100. task="get_source_account_from_article",
  101. function="get_source_account_from_article",
  102. message=f"响应解析失败: {e}",
  103. data={"link": article_link},
  104. )
  105. return None
  106. @retry(**retry_desc)
  107. async def weixin_search(keyword: str, page="1") -> dict | None:
  108. url = "{}/keyword".format(base_url)
  109. payload = json.dumps({"keyword": keyword, "cursor": page})
  110. try:
  111. async with AsyncHttpClient(timeout=120) as http_client:
  112. response = await http_client.post(url=url, headers=headers, data=payload)
  113. except Exception as e:
  114. log(
  115. task="weixin_search",
  116. function="weixin_search",
  117. message=f"API请求失败: {e}",
  118. data={"keyword": keyword, "page": page},
  119. )
  120. return None
  121. return response