get_baidu_recommend.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import asyncio
  2. import sys
  3. import time
  4. from pathlib import Path
  5. from readline import insert_text
  6. import requests
  7. import json
  8. from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
  9. from core.utils.feishu_data_async import FeishuDataAsync
  10. sys.path.insert(0, str(Path(__file__).parent.parent))
  11. class BaiduRecommend:
  12. """
  13. 微信小程序域名信息获取类
  14. 一次性任务
  15. """
  16. def __init__(self):
  17. pass
  18. @retry(
  19. stop=stop_after_attempt(3),
  20. wait=wait_exponential(multiplier=1, min=4, max=10),
  21. retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception))
  22. )
  23. def get_top_search(self,cursor=0,last_timestamp_ms=""):
  24. """获取热搜数据"""
  25. url = f"http://crawapi.piaoquantv.com/crawler/bai_du/recommend"
  26. body = {
  27. "task_type": "recommend",
  28. "cursor": cursor,
  29. "last_timestamp_ms": last_timestamp_ms
  30. }
  31. print( body)
  32. try:
  33. response = requests.post(url,json= body)
  34. response.raise_for_status() # 检查HTTP错误
  35. return response.json()
  36. except requests.exceptions.RequestException as e:
  37. print(f"请求失败: {str(e)}")
  38. raise # 重新抛出异常以触发重试
  39. except Exception as e:
  40. print(f"解析响应失败: {str(e)}")
  41. return None
  42. async def main():
  43. global last_timestamp_ms
  44. last_timestamp_ms = ""
  45. for i in range(100):
  46. resp = BaiduRecommend().get_top_search(cursor=i,last_timestamp_ms=last_timestamp_ms)
  47. if resp and resp["code"] != 0:
  48. print(f"API请求失败: {resp.get('msg')}")
  49. await asyncio.sleep(60) # 请求失败时等待60秒再重试
  50. continue
  51. if not resp or not resp.get("data") or not resp["data"].get("data"):
  52. print("未获取到数据,等待60秒后重试")
  53. await asyncio.sleep(60)
  54. continue
  55. last_timestamp_ms = resp.get("data").get("next_cursor").get("last_timestamp_ms")
  56. print(last_timestamp_ms)
  57. obj = resp.get("data").get("data")
  58. insert_datas = []
  59. for item in obj:
  60. item_data = item.get("data")
  61. if item_data.get("mode") != "text":
  62. continue
  63. title = item_data.get("title")
  64. source = item_data.get("source")
  65. view_count = item_data.get("comment_num")
  66. publish_time = item_data.get("publish_time")
  67. id = item.get("id")
  68. url = f'https://mbd.baidu.com/newspage/data/landingsuper?pageType=1&_refluxos=i0&context={{"nid":"{id}","ssid":""}}'
  69. insert_data = [title,url,view_count,source,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(publish_time))) if publish_time and str(publish_time).isdigit() else None]
  70. insert_datas.append(insert_data)
  71. if insert_datas:
  72. try:
  73. async with FeishuDataAsync() as feishu_data:
  74. await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "G7kfw0", "A2:E", insert_datas)
  75. print(f"已插入 {len(insert_datas)} 条数据")
  76. except Exception as e:
  77. print(f"插入数据失败: {str(e)}")
  78. else:
  79. print("本次没有有效数据")
  80. if __name__ == '__main__':
  81. asyncio.run(main())