| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- import json
- from typing import Dict
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.shared import run_tasks_with_asyncio_task_group
- from ._const import DecodeArticleConst
- from ._mapper import ArticlesDecodeTaskMapper
- from ._utils import AigcDecodeUtils
- class ExtractDecodeTaskDetail(DecodeArticleConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = ArticlesDecodeTaskMapper(self.pool)
- self.tool = AigcDecodeUtils()
- async def extract_single_result(self, task: Dict):
- task_id = task["id"]
- acquire_lock = await self.mapper.update_extract_status(
- task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING
- )
- if not acquire_lock:
- return
- try:
- raw_result = json.loads(task["result"])
- # 新 API 结果格式: {"dataContent": "{...}", "html": "..."}
- data_content = raw_result.get("dataContent")
- if isinstance(data_content, str):
- inner_result = json.loads(data_content)
- else:
- inner_result = data_content or {}
- except (TypeError, KeyError, json.JSONDecodeError) as e:
- await self.mapper.update_extract_status(
- task_id,
- self.ExtractStatus.PROCESSING,
- self.ExtractStatus.FAILED,
- )
- await self.log_service.log(
- contents={
- "task": "extract_decode_result_v2",
- "task_id": task_id,
- "status": "fail",
- "message": f"parse decode result error: {e}",
- "raw": task.get("result"),
- }
- )
- return
- detail = self.tool.extract_decode_result(inner_result)
- if detail.get("error"):
- await self.mapper.update_extract_status(
- task_id,
- self.ExtractStatus.PROCESSING,
- self.ExtractStatus.FAILED,
- )
- await self.log_service.log(
- contents={
- "task": "extract_decode_result_v2",
- "task_id": task_id,
- "status": "fail",
- "message": detail["error"],
- }
- )
- return
- saved = await self.mapper.record_extract_detail(task_id, detail)
- if not saved:
- await self.mapper.update_extract_status(
- task_id,
- self.ExtractStatus.PROCESSING,
- self.ExtractStatus.FAILED,
- )
- await self.log_service.log(
- contents={
- "task": "extract_decode_result_v2",
- "task_id": task_id,
- "status": "fail",
- "message": "insert long_articles_decode_task_detail failed",
- "detail": detail,
- }
- )
- return
- await self.mapper.update_extract_status(
- task_id,
- self.ExtractStatus.PROCESSING,
- self.ExtractStatus.SUCCESS,
- )
- async def deal(self):
- tasks = await self.mapper.fetch_extract_tasks()
- if not tasks:
- await self.log_service.log(
- contents={
- "task": "extract_decode_result_v2",
- "message": "No more tasks to extract",
- }
- )
- return
- await run_tasks_with_asyncio_task_group(
- task_list=tasks,
- handler=self.extract_single_result,
- description="批量解析解构结果",
- unit="task",
- )
- __all__ = ["ExtractDecodeTaskDetail"]
|