fetch_decode_results.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import json
  2. from typing import Dict
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from ._const import DecodeTaskConst
  6. from ._mapper import ArticlesDecodeTaskMapper
  7. from ._utils import DecodeTaskUtil
  8. class FetchDecodeResults(DecodeTaskConst):
  9. def __init__(self, pool: DatabaseManager, log_service: LogService):
  10. self.pool = pool
  11. self.log_service = log_service
  12. self.mapper = ArticlesDecodeTaskMapper(self.pool)
  13. self.tool = DecodeTaskUtil()
  14. async def fetch_single_task(self, task: Dict):
  15. task_id = task["task_id"]
  16. # acquire lock
  17. acquire_lock = await self.mapper.update_decode_task_status(
  18. task_id, self.TaskStatus.INIT, self.TaskStatus.PROCESSING
  19. )
  20. if not acquire_lock:
  21. return
  22. response = await self.tool.fetch_decode_result(task_id)
  23. if not response:
  24. await self.mapper.update_decode_task_status(
  25. task_id=task_id,
  26. ori_status=self.TaskStatus.PROCESSING,
  27. new_status=self.TaskStatus.INIT,
  28. remark="获取解构结果失败,服务异常,已回滚状态",
  29. )
  30. return
  31. # 请求成功
  32. response_code = response.get("code")
  33. if response_code != self.RequestDecode.SUCCESS:
  34. # 解构任务获取失败
  35. await self.mapper.update_decode_task_status(
  36. task_id=task_id,
  37. ori_status=self.TaskStatus.PROCESSING,
  38. new_status=self.TaskStatus.FAILED,
  39. remark=f"请求解构接口返回异常,标记为失败:{json.dumps(response, ensure_ascii=False)}",
  40. )
  41. return
  42. response_data = response.get("data", {})
  43. response_task_id = response_data.get("taskId") or response_data.get("task_id")
  44. if task_id != response_task_id:
  45. # 解构任务获取失败
  46. await self.mapper.update_decode_task_status(
  47. task_id=task_id,
  48. ori_status=self.TaskStatus.PROCESSING,
  49. new_status=self.TaskStatus.FAILED,
  50. remark=f"请求解构接口TaskId异常:{json.dumps(response, ensure_ascii=False)}",
  51. )
  52. return
  53. status = response_data.get("status")
  54. match status:
  55. case self.DecodeStatus.PENDING:
  56. await self.mapper.update_decode_task_status(
  57. task_id=task_id,
  58. ori_status=self.TaskStatus.PROCESSING,
  59. new_status=self.TaskStatus.INIT,
  60. remark=f"解构任务状态为PENDING,继续轮询",
  61. )
  62. case self.DecodeStatus.RUNNING:
  63. await self.mapper.update_decode_task_status(
  64. task_id=task_id,
  65. ori_status=self.TaskStatus.PROCESSING,
  66. new_status=self.TaskStatus.INIT,
  67. remark=f"解构任务状态为RUNNING,继续轮询",
  68. )
  69. case self.DecodeStatus.SUCCESS:
  70. await self.mapper.set_decode_result(
  71. task_id=task_id,
  72. result=json.dumps(response_data, ensure_ascii=False),
  73. )
  74. case self.DecodeStatus.FAILED:
  75. await self.mapper.update_decode_task_status(
  76. task_id=task_id,
  77. ori_status=self.TaskStatus.PROCESSING,
  78. new_status=self.TaskStatus.FAILED,
  79. remark=f"解构任务状态为FAILED,标记为失败",
  80. )
  81. case _:
  82. await self.mapper.update_decode_task_status(
  83. task_id=task_id,
  84. ori_status=self.TaskStatus.PROCESSING,
  85. new_status=self.TaskStatus.INIT,
  86. remark=f"解构任务状态未知(status={status}),回滚待重试:{json.dumps(response_data, ensure_ascii=False)}",
  87. )
  88. await self.log_service.log(
  89. contents={
  90. "task": "fetch_single_task",
  91. "task_id": task_id,
  92. "status": "unknown",
  93. "message": f"unexpected decode status: {status}",
  94. "data": response_data,
  95. }
  96. )
  97. async def fetch_results(self):
  98. decoding_tasks = await self.mapper.fetch_decoding_tasks()
  99. if not decoding_tasks:
  100. await self.log_service.log(
  101. contents={"task": "fetch_results", "message": "No more tasks to fetch"}
  102. )
  103. return
  104. for task in decoding_tasks:
  105. await self.fetch_single_task(task)
  106. async def deal(self):
  107. await self.fetch_results()
  108. __all__ = ["FetchDecodeResults"]