import asyncio import traceback from typing import List import httpx import random from datetime import datetime, timedelta from tenacity import retry, stop_after_attempt, wait_fixed from app.core.config import settings from app.core.logger import logger from app.core.database import AsyncSessionLocal from app.core.aliyun_log import ali_log_client from app.repository.index_repo import IndexRepository from app.models.api_schema import ApiResponse, TrendItem class CollectorService: def __init__(self): # 使用 httpx 异步客户端 self.client = httpx.AsyncClient(timeout=30.0) @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) async def _fetch_api(self, keyword: str) -> List[TrendItem]: """请求接口""" end_date = datetime.now() start_date = end_date - timedelta(days=settings.LIMIT_DAY) payload = { "keyword": keyword, "start_ymd": str(start_date.strftime("%Y%m%d")), "end_ymd": str(end_date.strftime("%Y%m%d")) } print(payload) logger.info(f"采集数据: {keyword}") try: resp = await self.client.post(settings.API_URL, json=payload) logger.info(f"API Response [{keyword}]: {resp.text}") resp.raise_for_status() data = resp.json() validated = ApiResponse(**data) if not validated.is_success: logger.warning(f"API请求失败 [{keyword}]: {validated.msg}") await ali_log_client.async_put_log( keyword=keyword, status="fail", msg="接口请求失败", result=data ) return [] raw_list = validated.data.get("data", []) raw_list.sort(key=lambda x: x['ymd']) return [TrendItem(**i) for i in raw_list] except Exception as e: logger.error(f"API请求失败 [{keyword}]: {str(e)}") await ali_log_client.async_put_log( keyword=keyword, status="error", msg=f"请求异常", result=f"{e},{traceback.format_exc()}" ) raise async def run_pipeline(self): logger.info("开始执行采集管道") async with AsyncSessionLocal() as session: repo = IndexRepository(session) # 获取“待处理”任务 keywords = await repo.get_pending_tasks() if not keywords: logger.info("没有需要执行的关键词") return logger.info(f"获取到 {len(keywords)} 个活跃关键词") for task in keywords: kw = task['keyword'] try: items = await self._fetch_api(kw) if items: count = await repo.upsert_batch(task["id"], items) await repo.update_task_time(task["id"]) logger.info(f"[{kw}] 成功保存 {count} 条记录") else: logger.info(f"[{kw}] 无数据") except Exception as e: logger.error(f"[{kw}] 采集失败: {str(e)[:200]}...") await ali_log_client.async_put_log( keyword=kw, status="error", msg="抓取数据发生异常", result=f"{str(e)},{traceback.format_exc()}" ) await asyncio.sleep(random.uniform(2, 4)) logger.info("采集管道执行完成") async def close(self): await self.client.aclose()