fetch_decode_results.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import json
  2. from collections import defaultdict
  3. from typing import List, Dict
  4. from app.core.database import DatabaseManager
  5. from app.core.observability import LogService
  6. from app.infra.shared import run_tasks_with_asyncio_task_group
  7. from ._const import DecodeCardConst
  8. from ._mapper import CardDecodeTaskMapper
  9. from ._utils import CardDecodeUtils
  10. class FetchCardDecodeResults(DecodeCardConst):
  11. def __init__(self, pool: DatabaseManager, log_service: LogService):
  12. self.pool = pool
  13. self.log_service = log_service
  14. self.mapper = CardDecodeTaskMapper(self.pool)
  15. self.tool = CardDecodeUtils()
  16. @staticmethod
  17. def _group_tasks_by_config(tasks: List[Dict]) -> Dict[int, List[Dict]]:
  18. grouped = defaultdict(list)
  19. for task in tasks:
  20. grouped[task["config_id"]].append(task)
  21. return dict(grouped)
  22. async def _process_batch(self, tasks: List[Dict], config_id: int):
  23. source_ids = [t["source_id"] for t in tasks]
  24. results = await self.tool.query_decode_results_batch(
  25. source_ids, config_id=config_id
  26. )
  27. for task in tasks:
  28. source_id = task["source_id"]
  29. result = results.get(source_id)
  30. if not result:
  31. await self.mapper.update_task_status_by_source_id(
  32. source_id=source_id,
  33. config_id=config_id,
  34. new_status=self.TaskStatus.FAILED,
  35. remark="卡片解构任务在结果查询中未返回",
  36. )
  37. await self.log_service.log(
  38. contents={
  39. "task": "fetch_card_decode_results",
  40. "source_id": source_id,
  41. "config_id": config_id,
  42. "status": "fail",
  43. "message": "source_id not in query response",
  44. }
  45. )
  46. continue
  47. status = result.get("status")
  48. if status == "API_ERROR":
  49. continue
  50. elif status == self.QueryStatus.SUCCESS:
  51. data_content = result.get("dataContent") or "{}"
  52. html = result.get("html")
  53. await self.mapper.set_decode_result(
  54. source_id=source_id,
  55. config_id=config_id,
  56. result=json.dumps(
  57. {"dataContent": data_content, "html": html},
  58. ensure_ascii=False,
  59. ),
  60. remark="卡片解构结果获取成功",
  61. )
  62. elif status in (self.QueryStatus.PENDING, self.QueryStatus.RUNNING):
  63. pass
  64. elif status == self.QueryStatus.FAILED:
  65. await self.mapper.update_task_status_by_source_id(
  66. source_id=source_id,
  67. config_id=config_id,
  68. new_status=self.TaskStatus.FAILED,
  69. remark=f"卡片解构任务失败: {result.get('errorMessage', '')}",
  70. )
  71. else:
  72. await self.log_service.log(
  73. contents={
  74. "task": "fetch_card_decode_results",
  75. "source_id": source_id,
  76. "config_id": config_id,
  77. "status": "unknown",
  78. "message": f"unexpected query status: {status}",
  79. "data": result,
  80. }
  81. )
  82. async def deal(self):
  83. pending_tasks = await self.mapper.fetch_pending_tasks()
  84. if not pending_tasks:
  85. await self.log_service.log(
  86. contents={
  87. "task": "fetch_card_decode_results",
  88. "message": "No more card tasks to fetch",
  89. }
  90. )
  91. return
  92. grouped = self._group_tasks_by_config(pending_tasks)
  93. for config_id, tasks in grouped.items():
  94. batches = [
  95. tasks[i : i + self.SUBMIT_BATCH]
  96. for i in range(0, len(tasks), self.SUBMIT_BATCH)
  97. ]
  98. await run_tasks_with_asyncio_task_group(
  99. task_list=[
  100. {"batch": b, "config_id": config_id} for b in batches
  101. ],
  102. handler=lambda item, cid=config_id: self._process_batch(
  103. item["batch"], cid
  104. ),
  105. description="批量查询卡片解构结果",
  106. unit="batch",
  107. )
  108. await self.log_service.log(
  109. contents={
  110. "task": "fetch_card_decode_results",
  111. "message": f"Processed {len(pending_tasks)} pending card tasks across {len(grouped)} configs",
  112. }
  113. )
  114. __all__ = ["FetchCardDecodeResults"]