import json from typing import Dict from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import DecodeTaskConst from ._mapper import ArticlesDecodeTaskMapper from ._utils import DecodeTaskUtil class FetchDecodeResults(DecodeTaskConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service self.mapper = ArticlesDecodeTaskMapper(self.pool) self.tool = DecodeTaskUtil() async def fetch_single_task(self, task: Dict): task_id = task["task_id"] # acquire lock acquire_lock = await self.mapper.update_decode_task_status( task_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING ) if not acquire_lock: return response = await self.tool.fetch_decode_result(task_id) if not response: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.INIT, remark="获取解构结果失败,服务异常,已回滚状态", ) return # 请求成功 response_code = response.get("code") if response_code != self.RequestDecode.SUCCESS: # 解构任务获取失败 await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.FAILED, remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}", ) return response_data = response.get("data", {}) response_task_id = response_data.get("taskId") or response_data.get("task_id") if task_id != response_task_id: # 解构任务获取失败 await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.FAILED, remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}", ) return status = response_data.get("status") match status: case self.DecodeStatus.PENDING: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.INIT, remark=f"解构任务状态为PENDING,继续轮询", ) case self.DecodeStatus.RUNNING: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.INIT, remark=f"解构任务状态为RUNNING,继续轮询", ) case self.DecodeStatus.SUCCESS: await self.mapper.set_decode_result( task_id=task_id, result=json.dumps(response_data, ensure_ascii=False), ) case self.DecodeStatus.FAILED: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.FAILED, remark=f"解构任务状态为FAILED,标记为失败", ) case _: await self.mapper.update_decode_task_status( task_id=task_id, ori_status=self.TaskStatus.PROCESSING, new_status=self.TaskStatus.INIT, remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}", ) await self.log_service.log( contents={ "task": "fetch_single_task", "task_id": task_id, "status": "unknown", "message": f"unexpected decode status: {status}", "data": response_data, } ) async def fetch_results(self): decoding_tasks = await self.mapper.fetch_decoding_tasks() if not decoding_tasks: await self.log_service.log( contents={"task": "fetch_results", "message": "No more tasks to fetch"} ) return for task in decoding_tasks: await self.fetch_single_task(task) async def deal(self): await self.fetch_results() __all__ = ["FetchDecodeResults"]