import asyncio import sys import time from pathlib import Path from readline import insert_text import requests import json from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from core.utils.feishu_data_async import FeishuDataAsync sys.path.insert(0, str(Path(__file__).parent.parent)) class BaiduRecommend: """ 微信小程序域名信息获取类 一次性任务 """ def __init__(self): pass @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception)) ) def get_top_search(self,cursor=0,last_timestamp_ms=""): """获取热搜数据""" url = f"http://crawapi.piaoquantv.com/crawler/bai_du/recommend" body = { "task_type": "recommend", "cursor": cursor, "last_timestamp_ms": last_timestamp_ms } print( body) try: response = requests.post(url,json= body) response.raise_for_status() # 检查HTTP错误 return response.json() except requests.exceptions.RequestException as e: print(f"请求失败: {str(e)}") raise # 重新抛出异常以触发重试 except Exception as e: print(f"解析响应失败: {str(e)}") return None async def main(): global last_timestamp_ms last_timestamp_ms = "" for i in range(100): resp = BaiduRecommend().get_top_search(cursor=i,last_timestamp_ms=last_timestamp_ms) if resp and resp["code"] != 0: print(f"API请求失败: {resp.get('msg')}") await asyncio.sleep(60) # 请求失败时等待60秒再重试 continue if not resp or not resp.get("data") or not resp["data"].get("data"): print("未获取到数据,等待60秒后重试") await asyncio.sleep(60) continue last_timestamp_ms = resp.get("data").get("next_cursor").get("last_timestamp_ms") print(last_timestamp_ms) obj = resp.get("data").get("data") insert_datas = [] for item in obj: item_data = item.get("data") if item_data.get("mode") != "text": continue title = item_data.get("title") source = item_data.get("source") view_count = item_data.get("comment_num") publish_time = item_data.get("publish_time") id = item.get("id") url = f'https://mbd.baidu.com/newspage/data/landingsuper?pageType=1&_refluxos=i0&context={{"nid":"{id}","ssid":""}}' insert_data = [title,url,view_count,source,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(publish_time))) if publish_time and str(publish_time).isdigit() else None] insert_datas.append(insert_data) if insert_datas: try: async with FeishuDataAsync() as feishu_data: await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "G7kfw0", "A2:E", insert_datas) print(f"已插入 {len(insert_datas)} 条数据") except Exception as e: print(f"插入数据失败: {str(e)}") else: print("本次没有有效数据") if __name__ == '__main__': asyncio.run(main())