fetch_decode_results.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import json
  2. from typing import List, Dict
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from app.infra.shared import run_tasks_with_asyncio_task_group
  6. from ._const import DecodeMaterialConst
  7. from ._mapper import MaterialDecodeTaskMapper
  8. from ._utils import MaterialDecodeUtils
  9. class FetchMaterialDecodeResults(DecodeMaterialConst):
  10. def __init__(self, pool: DatabaseManager, log_service: LogService):
  11. self.pool = pool
  12. self.log_service = log_service
  13. self.mapper = MaterialDecodeTaskMapper(self.pool)
  14. self.tool = MaterialDecodeUtils()
  15. async def _process_batch(self, tasks: List[Dict]):
  16. source_ids = [t["source_id"] for t in tasks]
  17. results = await self.tool.query_decode_results_batch(source_ids)
  18. for task in tasks:
  19. source_id = task["source_id"]
  20. result = results.get(source_id)
  21. if not result:
  22. await self.mapper.update_task_status_by_source_id(
  23. source_id=source_id,
  24. new_status=self.TaskStatus.FAILED,
  25. remark="素材解构任务在结果查询中未返回",
  26. )
  27. await self.log_service.log(
  28. contents={
  29. "task": "fetch_material_decode_results",
  30. "source_id": source_id,
  31. "status": "fail",
  32. "message": "source_id not in query response",
  33. }
  34. )
  35. continue
  36. status = result.get("status")
  37. if status == "API_ERROR":
  38. continue
  39. elif status == self.QueryStatus.SUCCESS:
  40. data_content = result.get("dataContent") or "{}"
  41. html = result.get("html")
  42. await self.mapper.set_decode_result(
  43. source_id=source_id,
  44. result=json.dumps(
  45. {"dataContent": data_content, "html": html},
  46. ensure_ascii=False,
  47. ),
  48. remark="素材解构结果获取成功",
  49. )
  50. elif status in (self.QueryStatus.PENDING, self.QueryStatus.RUNNING):
  51. pass
  52. elif status == self.QueryStatus.FAILED:
  53. await self.mapper.update_task_status_by_source_id(
  54. source_id=source_id,
  55. new_status=self.TaskStatus.FAILED,
  56. remark=f"素材解构任务失败: {result.get('errorMessage', '')}",
  57. )
  58. else:
  59. await self.log_service.log(
  60. contents={
  61. "task": "fetch_material_decode_results",
  62. "source_id": source_id,
  63. "status": "unknown",
  64. "message": f"unexpected query status: {status}",
  65. "data": result,
  66. }
  67. )
  68. async def deal(self):
  69. pending_tasks = await self.mapper.fetch_pending_tasks()
  70. if not pending_tasks:
  71. await self.log_service.log(
  72. contents={
  73. "task": "fetch_material_decode_results",
  74. "message": "No more material tasks to fetch",
  75. }
  76. )
  77. return
  78. batches = [
  79. pending_tasks[i : i + self.SUBMIT_BATCH]
  80. for i in range(0, len(pending_tasks), self.SUBMIT_BATCH)
  81. ]
  82. await run_tasks_with_asyncio_task_group(
  83. task_list=batches,
  84. handler=self._process_batch,
  85. description="批量查询素材解构结果",
  86. unit="batch",
  87. )
  88. await self.log_service.log(
  89. contents={
  90. "task": "fetch_material_decode_results",
  91. "message": f"Processed {len(pending_tasks)} pending material tasks in {len(batches)} batches",
  92. }
  93. )
  94. __all__ = ["FetchMaterialDecodeResults"]