| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import asyncio
- import sys
- import time
- from pathlib import Path
- from readline import insert_text
- import requests
- import json
- from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
- from core.utils.feishu_data_async import FeishuDataAsync
- sys.path.insert(0, str(Path(__file__).parent.parent))
- class BaiduTopSearch:
- """
- 微信小程序域名信息获取类
- 一次性任务
- """
- def __init__(self):
- pass
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError))
- )
- def get_access_token(self, appid, secret):
- """获取微信接口访问令牌"""
- url = "https://api.weixin.qq.com/cgi-bin/token"
- params = {
- "grant_type": "client_credential",
- "appid": appid,
- "secret": secret
- }
- try:
- response = requests.get(url, params=params)
- response.raise_for_status() # 检查HTTP错误
- print(response)
- return response.json().get("access_token")
- except requests.exceptions.RequestException as e:
- print(f"获取微信令牌失败: {str(e)}")
- raise # 重新抛出异常以触发重试
- except Exception as e:
- print(f"解析响应失败: {str(e)}")
- return None
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError,Exception))
- )
- def get_top_search(self):
- """获取热搜数据"""
- url = f"http://crawapi.piaoquantv.com/crawler/bai_du/top_search"
- try:
- response = requests.post(url)
- response.raise_for_status() # 检查HTTP错误
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"请求失败: {str(e)}")
- raise # 重新抛出异常以触发重试
- except Exception as e:
- print(f"解析响应失败: {str(e)}")
- return None
- async def main():
- last_public_time = 0 # 记录最后一次处理的时间
-
- while True:
- resp = BaiduTopSearch().get_top_search()
- if resp and resp["code"] != 0:
- print(f"API请求失败: {resp.get('msg')}")
- await asyncio.sleep(60) # 请求失败时等待60秒再重试
- continue
-
- if not resp or not resp.get("data") or not resp["data"].get("data"):
- print("未获取到数据,等待60秒后重试")
- await asyncio.sleep(60)
- continue
-
- obj = resp.get("data").get("data")
- insert_datas = []
-
- for item in obj:
- if item.get("layout") != "hot_board_item":
- continue
- item_data = item.get("data")
- title = item_data.get("title")
- publictime = item_data.get("publicTime")
- last_public_time = publictime
- score = item_data.get("grext").get("score")
- position_type = item_data.get("position_type",0)
- if position_type == 1:
- position_name = "置顶内容"
- else:
- position_name = item_data.get("index")
- hot_tag_name = item_data.get("grext").get("hot_tag_name")
- insert_data = [title, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publictime)), score, position_name, hot_tag_name]
- insert_datas.append(insert_data)
- if insert_datas:
- try:
- async with FeishuDataAsync() as feishu_data:
- await feishu_data.insert_values("NktPwBtcviP8mwkC027cQc4JnXq", "07a356", "A2:E", insert_datas)
- print(f"已插入 {len(insert_datas)} 条数据")
- except Exception as e:
- print(f"插入数据失败: {str(e)}")
- else:
- print("本次没有有效数据")
- print("等待10分钟后继续执行")
- time.sleep(600)
- if __name__ == '__main__':
- asyncio.run(main())
|