| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import asyncio
- import sys
- import time
- from pathlib import Path
- from readline import insert_text
- import requests
- import json
- from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
- from core.utils.feishu_data_async import FeishuDataAsync
- sys.path.insert(0, str(Path(__file__).parent.parent))
- class BaiduRecommend:
- """
- 微信小程序域名信息获取类
- 一次性任务
- """
- def __init__(self):
- pass
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception))
- )
- def get_top_search(self,cursor=0,last_timestamp_ms=""):
- """获取热搜数据"""
- url = f"http://crawapi.piaoquantv.com/crawler/bai_du/recommend"
- body = {
- "task_type": "recommend",
- "cursor": cursor,
- "last_timestamp_ms": last_timestamp_ms
- }
- print( body)
- try:
- response = requests.post(url,json= body)
- response.raise_for_status() # 检查HTTP错误
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"请求失败: {str(e)}")
- raise # 重新抛出异常以触发重试
- except Exception as e:
- print(f"解析响应失败: {str(e)}")
- return None
- async def main():
- global last_timestamp_ms
- last_timestamp_ms = ""
- for i in range(100):
- resp = BaiduRecommend().get_top_search(cursor=i,last_timestamp_ms=last_timestamp_ms)
- if resp and resp["code"] != 0:
- print(f"API请求失败: {resp.get('msg')}")
- await asyncio.sleep(60) # 请求失败时等待60秒再重试
- continue
-
- if not resp or not resp.get("data") or not resp["data"].get("data"):
- print("未获取到数据,等待60秒后重试")
- await asyncio.sleep(60)
- continue
- last_timestamp_ms = resp.get("data").get("next_cursor").get("last_timestamp_ms")
- print(last_timestamp_ms)
- obj = resp.get("data").get("data")
- insert_datas = []
-
- for item in obj:
- item_data = item.get("data")
- if item_data.get("mode") != "text":
- continue
- title = item_data.get("title")
- source = item_data.get("source")
- view_count = item_data.get("comment_num")
- publish_time = item_data.get("publish_time")
- id = item.get("id")
- url = f'https://mbd.baidu.com/newspage/data/landingsuper?pageType=1&_refluxos=i0&context={{"nid":"{id}","ssid":""}}'
- insert_data = [title,url,view_count,source,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(publish_time))) if publish_time and str(publish_time).isdigit() else None]
- insert_datas.append(insert_data)
- if insert_datas:
- try:
- async with FeishuDataAsync() as feishu_data:
- await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "G7kfw0", "A2:E", insert_datas)
- print(f"已插入 {len(insert_datas)} 条数据")
- except Exception as e:
- print(f"插入数据失败: {str(e)}")
- else:
- print("本次没有有效数据")
- if __name__ == '__main__':
- asyncio.run(main())
|