collector.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import asyncio
  2. import traceback
  3. from typing import List
  4. import httpx
  5. import random
  6. from datetime import datetime, timedelta
  7. from tenacity import retry, stop_after_attempt, wait_fixed
  8. from app.core.config import settings
  9. from app.core.logger import logger
  10. from app.core.database import AsyncSessionLocal
  11. from app.core.aliyun_log import ali_log_client
  12. from app.repository.index_repo import IndexRepository
  13. from app.models.api_schema import ApiResponse, TrendItem
  14. class CollectorService:
  15. def __init__(self):
  16. # 使用 httpx 异步客户端
  17. self.client = httpx.AsyncClient(timeout=30.0)
  18. @retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
  19. async def _fetch_api(self, keyword: str) -> List[TrendItem]:
  20. """请求接口"""
  21. end_date = datetime.now()
  22. start_date = end_date - timedelta(days=settings.LIMIT_DAY)
  23. payload = {
  24. "keyword": keyword,
  25. "start_ymd": str(start_date.strftime("%Y%m%d")),
  26. "end_ymd": str(end_date.strftime("%Y%m%d"))
  27. }
  28. print(payload)
  29. logger.info(f"采集数据: {keyword}")
  30. try:
  31. resp = await self.client.post(settings.API_URL, json=payload)
  32. logger.info(f"API Response [{keyword}]: {resp.text}")
  33. resp.raise_for_status()
  34. data = resp.json()
  35. validated = ApiResponse(**data)
  36. if not validated.is_success:
  37. logger.warning(f"API请求失败 [{keyword}]: {validated.msg}")
  38. await ali_log_client.async_put_log(
  39. keyword=keyword,
  40. status="fail",
  41. msg="接口请求失败",
  42. result=data
  43. )
  44. return []
  45. raw_list = validated.data.get("data", [])
  46. raw_list.sort(key=lambda x: x['ymd'])
  47. return [TrendItem(**i) for i in raw_list]
  48. except Exception as e:
  49. logger.error(f"API请求失败 [{keyword}]: {str(e)}")
  50. await ali_log_client.async_put_log(
  51. keyword=keyword,
  52. status="error",
  53. msg=f"请求异常",
  54. result=f"{e},{traceback.format_exc()}"
  55. )
  56. raise
  57. async def run_pipeline(self):
  58. logger.info("开始执行采集管道")
  59. async with AsyncSessionLocal() as session:
  60. repo = IndexRepository(session)
  61. # 获取“待处理”任务
  62. keywords = await repo.get_pending_tasks()
  63. if not keywords:
  64. logger.info("没有需要执行的关键词")
  65. return
  66. logger.info(f"获取到 {len(keywords)} 个活跃关键词")
  67. for task in keywords:
  68. kw = task['keyword']
  69. try:
  70. items = await self._fetch_api(kw)
  71. if items:
  72. count = await repo.upsert_batch(task["id"], items)
  73. await repo.update_task_time(task["id"])
  74. logger.info(f"[{kw}] 成功保存 {count} 条记录")
  75. else:
  76. logger.info(f"[{kw}] 无数据")
  77. except Exception as e:
  78. logger.error(f"[{kw}] 采集失败: {str(e)[:200]}...")
  79. await ali_log_client.async_put_log(
  80. keyword=kw,
  81. status="error",
  82. msg="抓取数据发生异常",
  83. result=f"{str(e)},{traceback.format_exc()}"
  84. )
  85. await asyncio.sleep(random.uniform(2, 4))
  86. logger.info("采集管道执行完成")
  87. async def close(self):
  88. await self.client.aclose()