| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import json
- from typing import Dict
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import DecodeTaskConst
- from ._mapper import ArticlesDecodeTaskMapper
- from ._utils import DecodeTaskUtil
- class FetchDecodeResults(DecodeTaskConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- self.mapper = ArticlesDecodeTaskMapper(self.pool)
- self.tool = DecodeTaskUtil()
- async def fetch_single_task(self, task: Dict):
- task_id = task["task_id"]
- # acquire lock
- acquire_lock = await self.mapper.update_decode_task_status(
- task_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
- )
- if not acquire_lock:
- return
- response = await self.tool.fetch_decode_result(task_id)
- if not response:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.INIT,
- remark="获取解构结果失败,服务异常,已回滚状态",
- )
- return
- # 请求成功
- response_code = response.get("code")
- if response_code != self.RequestDecode.SUCCESS:
- # 解构任务获取失败
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.FAILED,
- remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
- )
- return
- response_data = response.get("data", {})
- response_task_id = response_data.get("taskId") or response_data.get("task_id")
- if task_id != response_task_id:
- # 解构任务获取失败
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.FAILED,
- remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
- )
- return
- status = response_data.get("status")
- match status:
- case self.DecodeStatus.PENDING:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.INIT,
- remark=f"解构任务状态为PENDING,继续轮询",
- )
- case self.DecodeStatus.RUNNING:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.INIT,
- remark=f"解构任务状态为RUNNING,继续轮询",
- )
- case self.DecodeStatus.SUCCESS:
- await self.mapper.set_decode_result(
- task_id=task_id,
- result=json.dumps(response_data, ensure_ascii=False),
- )
- case self.DecodeStatus.FAILED:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.FAILED,
- remark=f"解构任务状态为FAILED,标记为失败",
- )
- case _:
- await self.mapper.update_decode_task_status(
- task_id=task_id,
- ori_status=self.TaskStatus.PROCESSING,
- new_status=self.TaskStatus.INIT,
- remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
- )
- await self.log_service.log(
- contents={
- "task": "fetch_single_task",
- "task_id": task_id,
- "status": "unknown",
- "message": f"unexpected decode status: {status}",
- "data": response_data,
- }
- )
- async def fetch_results(self):
- decoding_tasks = await self.mapper.fetch_decoding_tasks()
- if not decoding_tasks:
- await self.log_service.log(
- contents={"task": "fetch_results", "message": "No more tasks to fetch"}
- )
- return
- for task in decoding_tasks:
- await self.fetch_single_task(task)
- async def deal(self):
- await self.fetch_results()
- __all__ = ["FetchDecodeResults"]
|