| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import asyncio
- import traceback
- from typing import List
- import httpx
- import random
- from datetime import datetime, timedelta
- from tenacity import retry, stop_after_attempt, wait_fixed
- from app.core.config import settings
- from app.core.logger import logger
- from app.core.database import AsyncSessionLocal
- from app.core.aliyun_log import ali_log_client
- from app.repository.index_repo import IndexRepository
- from app.models.api_schema import ApiResponse, TrendItem
- class CollectorService:
- def __init__(self):
- # 使用 httpx 异步客户端
- self.client = httpx.AsyncClient(timeout=30.0)
- @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
- async def _fetch_api(self, keyword: str) -> List[TrendItem]:
- """请求接口"""
- end_date = datetime.now()
- start_date = end_date - timedelta(days=settings.LIMIT_DAY)
- payload = {
- "keyword": keyword,
- "start_ymd": str(start_date.strftime("%Y%m%d")),
- "end_ymd": str(end_date.strftime("%Y%m%d"))
- }
- print(payload)
- logger.info(f"采集数据: {keyword}")
- try:
- resp = await self.client.post(settings.API_URL, json=payload)
- logger.info(f"API Response [{keyword}]: {resp.text}")
- resp.raise_for_status()
- data = resp.json()
- validated = ApiResponse(**data)
- if not validated.is_success:
- logger.warning(f"API请求失败 [{keyword}]: {validated.msg}")
- await ali_log_client.async_put_log(
- keyword=keyword,
- status="fail",
- msg="接口请求失败",
- result=data
- )
- return []
- raw_list = validated.data.get("data", [])
- raw_list.sort(key=lambda x: x['ymd'])
- return [TrendItem(**i) for i in raw_list]
- except Exception as e:
- logger.error(f"API请求失败 [{keyword}]: {str(e)}")
- await ali_log_client.async_put_log(
- keyword=keyword,
- status="error",
- msg=f"请求异常",
- result=f"{e},{traceback.format_exc()}"
- )
- raise
- async def run_pipeline(self):
- logger.info("开始执行采集管道")
- async with AsyncSessionLocal() as session:
- repo = IndexRepository(session)
- # 获取“待处理”任务
- keywords = await repo.get_pending_tasks()
- if not keywords:
- logger.info("没有需要执行的关键词")
- return
- logger.info(f"获取到 {len(keywords)} 个活跃关键词")
- for task in keywords:
- kw = task['keyword']
- try:
- items = await self._fetch_api(kw)
- if items:
- count = await repo.upsert_batch(task["id"], items)
- await repo.update_task_time(task["id"])
- logger.info(f"[{kw}] 成功保存 {count} 条记录")
- else:
- logger.info(f"[{kw}] 无数据")
- except Exception as e:
- logger.error(f"[{kw}] 采集失败: {str(e)[:200]}...")
- await ali_log_client.async_put_log(
- keyword=kw,
- status="error",
- msg="抓取数据发生异常",
- result=f"{str(e)},{traceback.format_exc()}"
- )
- await asyncio.sleep(random.uniform(2, 4))
- logger.info("采集管道执行完成")
- async def close(self):
- await self.client.aclose()
|