import asyncio import sys import time from pathlib import Path from readline import insert_text import requests import json from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from core.utils.feishu_data_async import FeishuDataAsync sys.path.insert(0, str(Path(__file__).parent.parent)) class BaiduTopSearch: """ 微信小程序域名信息获取类 一次性任务 """ def __init__(self): pass @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError)) ) def get_access_token(self, appid, secret): """获取微信接口访问令牌""" url = "https://api.weixin.qq.com/cgi-bin/token" params = { "grant_type": "client_credential", "appid": appid, "secret": secret } try: response = requests.get(url, params=params) response.raise_for_status() # 检查HTTP错误 print(response) return response.json().get("access_token") except requests.exceptions.RequestException as e: print(f"获取微信令牌失败: {str(e)}") raise # 重新抛出异常以触发重试 except Exception as e: print(f"解析响应失败: {str(e)}") return None @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception)) ) def get_top_search(self): """获取热搜数据""" url = f"http://crawapi.piaoquantv.com/crawler/bai_du/top_search" try: response = requests.post(url) response.raise_for_status() # 检查HTTP错误 return response.json() except requests.exceptions.RequestException as e: print(f"请求失败: {str(e)}") raise # 重新抛出异常以触发重试 except Exception as e: print(f"解析响应失败: {str(e)}") return None async def main(): last_public_time = 0 # 记录最后一次处理的时间 while True: resp = BaiduTopSearch().get_top_search() if resp and resp["code"] != 0: print(f"API请求失败: {resp.get('msg')}") await asyncio.sleep(60) # 请求失败时等待60秒再重试 continue if not resp or not resp.get("data") or not resp["data"].get("data"): print("未获取到数据,等待60秒后重试") await asyncio.sleep(60) continue obj = resp.get("data").get("data") insert_datas = [] for item in obj: if item.get("layout") != "hot_board_item": continue item_data = item.get("data") title = item_data.get("title") publictime = item_data.get("publicTime") last_public_time = publictime score = item_data.get("grext").get("score") position_type = item_data.get("position_type",0) if position_type == 1: position_name = "置顶内容" else: position_name = item_data.get("index") hot_tag_name = item_data.get("grext").get("hot_tag_name") insert_data = [title, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publictime)), score, position_name, hot_tag_name] insert_datas.append(insert_data) if insert_datas: try: async with FeishuDataAsync() as feishu_data: await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "07a356", "A2:E", insert_datas) print(f"已插入 {len(insert_datas)} 条数据") except Exception as e: print(f"插入数据失败: {str(e)}") else: print("本次没有有效数据") print("等待10分钟后继续执行") time.sleep(600) if __name__ == '__main__': asyncio.run(main())