| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import json
- from typing import List, Dict
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.shared import run_tasks_with_asyncio_task_group
- from ._const import DecodeArticleConst
- from ._mapper import ArticlesDecodeTaskMapper
- from ._utils import AigcDecodeUtils
- class FetchDecodeResults(DecodeArticleConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = ArticlesDecodeTaskMapper(self.pool)
- self.tool = AigcDecodeUtils()
- async def _process_batch(self, tasks: List[Dict]):
- source_ids = [t["source_id"] for t in tasks]
- results = await self.tool.query_decode_results_batch(source_ids)
- for task in tasks:
- source_id = task["source_id"]
- result = results.get(source_id)
- if not result:
- await self.mapper.update_task_status_by_source_id(
- source_id=source_id,
- ori_status=self.TaskStatus.INIT,
- new_status=self.TaskStatus.FAILED,
- remark="解构任务在结果查询中未返回,可能不存在",
- )
- await self.log_service.log(
- contents={
- "task": "fetch_decode_results_v2",
- "source_id": source_id,
- "status": "fail",
- "message": "source_id not in query response",
- }
- )
- continue
- status = result.get("status")
- if status == "API_ERROR":
- # 查询 API 调用失败,保持 INIT 等待重试
- continue
- elif status == self.QueryStatus.SUCCESS:
- data_content = result.get("dataContent") or "{}"
- html = result.get("html")
- await self.mapper.set_decode_result(
- source_id=source_id,
- result=json.dumps(
- {"dataContent": data_content, "html": html},
- ensure_ascii=False,
- ),
- remark="解构结果获取成功",
- )
- elif status in (self.QueryStatus.PENDING, self.QueryStatus.RUNNING):
- pass
- elif status == self.QueryStatus.FAILED:
- await self.mapper.update_task_status_by_source_id(
- source_id=source_id,
- ori_status=self.TaskStatus.INIT,
- new_status=self.TaskStatus.FAILED,
- remark=f"解构任务失败: {result.get('errorMessage', '')}",
- )
- else:
- await self.log_service.log(
- contents={
- "task": "fetch_decode_results_v2",
- "source_id": source_id,
- "status": "unknown",
- "message": f"unexpected query status: {status}",
- "data": result,
- }
- )
- async def deal(self):
- pending_tasks = await self.mapper.fetch_pending_tasks()
- if not pending_tasks:
- await self.log_service.log(
- contents={
- "task": "fetch_decode_results_v2",
- "message": "No more tasks to fetch",
- }
- )
- return
- # 拆成多个批次,并发查询
- batches = [
- pending_tasks[i : i + self.SUBMIT_BATCH]
- for i in range(0, len(pending_tasks), self.SUBMIT_BATCH)
- ]
- await run_tasks_with_asyncio_task_group(
- task_list=batches,
- handler=self._process_batch,
- description="批量查询解构结果",
- unit="batch",
- )
- await self.log_service.log(
- contents={
- "task": "fetch_decode_results_v2",
- "message": f"Processed {len(pending_tasks)} pending tasks in {len(batches)} batches",
- }
- )
- __all__ = ["FetchDecodeResults"]
|