get_baidu_topsearch.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import asyncio
  2. import sys
  3. import time
  4. from pathlib import Path
  5. from readline import insert_text
  6. import requests
  7. import json
  8. from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
  9. from core.utils.feishu_data_async import FeishuDataAsync
  10. sys.path.insert(0, str(Path(__file__).parent.parent))
  11. class BaiduTopSearch:
  12. """
  13. 微信小程序域名信息获取类
  14. 一次性任务
  15. """
  16. def __init__(self):
  17. pass
  18. @retry(
  19. stop=stop_after_attempt(3),
  20. wait=wait_exponential(multiplier=1, min=4, max=10),
  21. retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError))
  22. )
  23. def get_access_token(self, appid, secret):
  24. """获取微信接口访问令牌"""
  25. url = "https://api.weixin.qq.com/cgi-bin/token"
  26. params = {
  27. "grant_type": "client_credential",
  28. "appid": appid,
  29. "secret": secret
  30. }
  31. try:
  32. response = requests.get(url, params=params)
  33. response.raise_for_status() # 检查HTTP错误
  34. print(response)
  35. return response.json().get("access_token")
  36. except requests.exceptions.RequestException as e:
  37. print(f"获取微信令牌失败: {str(e)}")
  38. raise # 重新抛出异常以触发重试
  39. except Exception as e:
  40. print(f"解析响应失败: {str(e)}")
  41. return None
  42. @retry(
  43. stop=stop_after_attempt(3),
  44. wait=wait_exponential(multiplier=1, min=4, max=10),
  45. retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception))
  46. )
  47. def get_top_search(self):
  48. """获取热搜数据"""
  49. url = f"http://crawapi.piaoquantv.com/crawler/bai_du/top_search"
  50. try:
  51. response = requests.post(url)
  52. response.raise_for_status() # 检查HTTP错误
  53. return response.json()
  54. except requests.exceptions.RequestException as e:
  55. print(f"请求失败: {str(e)}")
  56. raise # 重新抛出异常以触发重试
  57. except Exception as e:
  58. print(f"解析响应失败: {str(e)}")
  59. return None
  60. async def main():
  61. last_public_time = 0 # 记录最后一次处理的时间
  62. while True:
  63. resp = BaiduTopSearch().get_top_search()
  64. if resp and resp["code"] != 0:
  65. print(f"API请求失败: {resp.get('msg')}")
  66. await asyncio.sleep(60) # 请求失败时等待60秒再重试
  67. continue
  68. if not resp or not resp.get("data") or not resp["data"].get("data"):
  69. print("未获取到数据,等待60秒后重试")
  70. await asyncio.sleep(60)
  71. continue
  72. obj = resp.get("data").get("data")
  73. insert_datas = []
  74. for item in obj:
  75. if item.get("layout") != "hot_board_item":
  76. continue
  77. item_data = item.get("data")
  78. title = item_data.get("title")
  79. publictime = item_data.get("publicTime")
  80. last_public_time = publictime
  81. score = item_data.get("grext").get("score")
  82. position_type = item_data.get("position_type",0)
  83. if position_type == 1:
  84. position_name = "置顶内容"
  85. else:
  86. position_name = item_data.get("index")
  87. hot_tag_name = item_data.get("grext").get("hot_tag_name")
  88. insert_data = [title, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publictime)), score, position_name, hot_tag_name]
  89. insert_datas.append(insert_data)
  90. if insert_datas:
  91. try:
  92. async with FeishuDataAsync() as feishu_data:
  93. await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "07a356", "A2:E", insert_datas)
  94. print(f"已插入 {len(insert_datas)} 条数据")
  95. except Exception as e:
  96. print(f"插入数据失败: {str(e)}")
  97. else:
  98. print("本次没有有效数据")
  99. print("等待10分钟后继续执行")
  100. time.sleep(600)
  101. if __name__ == '__main__':
  102. asyncio.run(main())