import asyncio import random import time from readline import insert_text import requests import json import sys import os from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type sys.path.insert(0, os.path.abspath("/root/AutoScraperX")) from core.utils.feishu_data_async import FeishuDataAsync class WechatDomainFetcher: """ 微信小程序域名信息获取类 一次性任务 """ def __init__(self): pass @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError)) ) def get_domain_info(self, keyword, start_ymd): headers = { 'Content-Type': 'application/json', } json_data = { 'keyword': keyword, 'start_ymd': '20260423', 'end_ymd': '20260423', } try: response = requests.post('http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex', headers=headers, json=json_data) response.raise_for_status() # 检查HTTP错误 return response.json() except requests.exceptions.RequestException as e: print(f"获取域名信息失败: {str(e)}") raise # 重新抛出异常以触发重试 except Exception as e: print(f"解析响应失败: {str(e)}") return None async def main(): while True: try: async with FeishuDataAsync() as feishu_data: config = await feishu_data.get_values_v3("TWeZsuGW4hURHatWnaec12blnAe", "jvQdJL",range_str="A2:B") # 初始化微信域名信息获取器 domain_fetcher = WechatDomainFetcher() for row in config[1:]: if len(row) < 2: continue keyword = row[1] start_ymd = row[0] if not keyword or not start_ymd: continue domain_info = domain_fetcher.get_domain_info(keyword, start_ymd) if not domain_info: continue wx_index_datas = domain_info.get('data', {}).get("data", []) if not wx_index_datas: continue for data in wx_index_datas: if data.get("ymd") == str(start_ymd): channel_score = data.get("channel_score", {}) insert_row = [ keyword, start_ymd, str(channel_score.get("total_score", 0)), str(channel_score.get("mpdoc_score", 0)), str(channel_score.get("finder_score", 0)), str(channel_score.get("query_score", 0)), str(channel_score.get("live_score", 0)), str(channel_score.get("miniapp_score", 0)) ] print(f"准备追加数据: {insert_row}") async with FeishuDataAsync() as feishu_data: await feishu_data.append_values( "JvUOwyowjir2wSkueEBcIxaSn4f", "Nv3PJg", "A2:H", [insert_row] ) print(f"成功追加: {keyword} - {start_ymd}") break time.sleep(random.randint(60, 120)) except Exception as e: print(f"执行过程中出现错误: {str(e)}") import traceback traceback.print_exc() if __name__ == '__main__': asyncio.run(main())