import json from typing import Dict from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.shared import run_tasks_with_asyncio_task_group from ._const import DecodeArticleConst from ._mapper import ArticlesDecodeTaskMapper from ._utils import AigcDecodeUtils class ExtractDecodeTaskDetail(DecodeArticleConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = ArticlesDecodeTaskMapper(self.pool) self.tool = AigcDecodeUtils() async def extract_single_result(self, task: Dict): task_id = task["id"] acquire_lock = await self.mapper.update_extract_status( task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING ) if not acquire_lock: return try: raw_result = json.loads(task["result"]) # 新 API 结果格式: {"dataContent": "{...}", "html": "..."} data_content = raw_result.get("dataContent") if isinstance(data_content, str): inner_result = json.loads(data_content) else: inner_result = data_content or {} except (TypeError, KeyError, json.JSONDecodeError) as e: await self.mapper.update_extract_status( task_id, self.ExtractStatus.PROCESSING, self.ExtractStatus.FAILED, ) await self.log_service.log( contents={ "task": "extract_decode_result_v2", "task_id": task_id, "status": "fail", "message": f"parse decode result error: {e}", "raw": task.get("result"), } ) return detail = self.tool.extract_decode_result(inner_result) if detail.get("error"): await self.mapper.update_extract_status( task_id, self.ExtractStatus.PROCESSING, self.ExtractStatus.FAILED, ) await self.log_service.log( contents={ "task": "extract_decode_result_v2", "task_id": task_id, "status": "fail", "message": detail["error"], } ) return saved = await self.mapper.record_extract_detail(task_id, detail) if not saved: await self.mapper.update_extract_status( task_id, self.ExtractStatus.PROCESSING, self.ExtractStatus.FAILED, ) await self.log_service.log( contents={ "task": "extract_decode_result_v2", "task_id": task_id, "status": "fail", "message": "insert long_articles_decode_task_detail failed", "detail": detail, } ) return await self.mapper.update_extract_status( task_id, self.ExtractStatus.PROCESSING, self.ExtractStatus.SUCCESS, ) async def deal(self): tasks = await self.mapper.fetch_extract_tasks() if not tasks: await self.log_service.log( contents={ "task": "extract_decode_result_v2", "message": "No more tasks to extract", } ) return await run_tasks_with_asyncio_task_group( task_list=tasks, handler=self.extract_single_result, description="批量解析解构结果", unit="task", ) __all__ = ["ExtractDecodeTaskDetail"]