| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import asyncio
- import random
- import time
- from readline import insert_text
- import requests
- import json
- import sys
- import os
- from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
- sys.path.insert(0, os.path.abspath("/root/AutoScraperX"))
- from core.utils.feishu_data_async import FeishuDataAsync
- class WechatDomainFetcher:
- """
- 微信小程序域名信息获取类
- 一次性任务
- """
- def __init__(self):
- pass
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type((requests.exceptions.RequestException, ConnectionError))
- )
- def get_domain_info(self, keyword, start_ymd):
- headers = {
- 'Content-Type': 'application/json',
- }
- json_data = {
- 'keyword': keyword,
- 'start_ymd': '20260423',
- 'end_ymd': '20260423',
- }
- try:
- response = requests.post('http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex',
- headers=headers,
- json=json_data)
- response.raise_for_status() # 检查HTTP错误
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"获取域名信息失败: {str(e)}")
- raise # 重新抛出异常以触发重试
- except Exception as e:
- print(f"解析响应失败: {str(e)}")
- return None
- async def main():
- while True:
- try:
- async with FeishuDataAsync() as feishu_data:
- config = await feishu_data.get_values_v3("TWeZsuGW4hURHatWnaec12blnAe", "jvQdJL",range_str="A2:B")
- # 初始化微信域名信息获取器
- domain_fetcher = WechatDomainFetcher()
- for row in config[1:]:
- if len(row) < 2:
- continue
- keyword = row[1]
- start_ymd = row[0]
- if not keyword or not start_ymd:
- continue
- domain_info = domain_fetcher.get_domain_info(keyword, start_ymd)
- if not domain_info:
- continue
- wx_index_datas = domain_info.get('data', {}).get("data", [])
- if not wx_index_datas:
- continue
- for data in wx_index_datas:
- if data.get("ymd") == str(start_ymd):
- channel_score = data.get("channel_score", {})
- insert_row = [
- keyword,
- start_ymd,
- str(channel_score.get("total_score", 0)),
- str(channel_score.get("mpdoc_score", 0)),
- str(channel_score.get("finder_score", 0)),
- str(channel_score.get("query_score", 0)),
- str(channel_score.get("live_score", 0)),
- str(channel_score.get("miniapp_score", 0))
- ]
- print(f"准备追加数据: {insert_row}")
- async with FeishuDataAsync() as feishu_data:
- await feishu_data.append_values(
- "JvUOwyowjir2wSkueEBcIxaSn4f",
- "Nv3PJg",
- "A2:H",
- [insert_row]
- )
- print(f"成功追加: {keyword} - {start_ymd}")
- break
- time.sleep(random.randint(60, 120))
- except Exception as e:
- print(f"执行过程中出现错误: {str(e)}")
- import traceback
- traceback.print_exc()
- if __name__ == '__main__':
- asyncio.run(main())
|