extract_decode_task_detail.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import json
  2. from typing import Dict
  3. from app.core.database import DatabaseManager
  4. from app.core.observability import LogService
  5. from app.infra.shared import run_tasks_with_asyncio_task_group
  6. from ._const import DecodeArticleConst
  7. from ._mapper import ArticlesDecodeTaskMapper
  8. from ._utils import AigcDecodeUtils
  9. class ExtractDecodeTaskDetail(DecodeArticleConst):
  10. def __init__(self, pool: DatabaseManager, log_service: LogService):
  11. self.pool = pool
  12. self.log_service = log_service
  13. self.mapper = ArticlesDecodeTaskMapper(self.pool)
  14. self.tool = AigcDecodeUtils()
  15. async def extract_single_result(self, task: Dict):
  16. task_id = task["id"]
  17. acquire_lock = await self.mapper.update_extract_status(
  18. task_id, self.ExtractStatus.INIT, self.ExtractStatus.PROCESSING
  19. )
  20. if not acquire_lock:
  21. return
  22. try:
  23. raw_result = json.loads(task["result"])
  24. # 新 API 结果格式: {"dataContent": "{...}", "html": "..."}
  25. data_content = raw_result.get("dataContent")
  26. if isinstance(data_content, str):
  27. inner_result = json.loads(data_content)
  28. else:
  29. inner_result = data_content or {}
  30. except (TypeError, KeyError, json.JSONDecodeError) as e:
  31. await self.mapper.update_extract_status(
  32. task_id,
  33. self.ExtractStatus.PROCESSING,
  34. self.ExtractStatus.FAILED,
  35. )
  36. await self.log_service.log(
  37. contents={
  38. "task": "extract_decode_result_v2",
  39. "task_id": task_id,
  40. "status": "fail",
  41. "message": f"parse decode result error: {e}",
  42. "raw": task.get("result"),
  43. }
  44. )
  45. return
  46. detail = self.tool.extract_decode_result(inner_result)
  47. if detail.get("error"):
  48. await self.mapper.update_extract_status(
  49. task_id,
  50. self.ExtractStatus.PROCESSING,
  51. self.ExtractStatus.FAILED,
  52. )
  53. await self.log_service.log(
  54. contents={
  55. "task": "extract_decode_result_v2",
  56. "task_id": task_id,
  57. "status": "fail",
  58. "message": detail["error"],
  59. }
  60. )
  61. return
  62. saved = await self.mapper.record_extract_detail(task_id, detail)
  63. if not saved:
  64. await self.mapper.update_extract_status(
  65. task_id,
  66. self.ExtractStatus.PROCESSING,
  67. self.ExtractStatus.FAILED,
  68. )
  69. await self.log_service.log(
  70. contents={
  71. "task": "extract_decode_result_v2",
  72. "task_id": task_id,
  73. "status": "fail",
  74. "message": "insert long_articles_decode_task_detail failed",
  75. "detail": detail,
  76. }
  77. )
  78. return
  79. await self.mapper.update_extract_status(
  80. task_id,
  81. self.ExtractStatus.PROCESSING,
  82. self.ExtractStatus.SUCCESS,
  83. )
  84. async def deal(self):
  85. tasks = await self.mapper.fetch_extract_tasks()
  86. if not tasks:
  87. await self.log_service.log(
  88. contents={
  89. "task": "extract_decode_result_v2",
  90. "message": "No more tasks to extract",
  91. }
  92. )
  93. return
  94. await run_tasks_with_asyncio_task_group(
  95. task_list=tasks,
  96. handler=self.extract_single_result,
  97. description="批量解析解构结果",
  98. unit="task",
  99. )
  100. __all__ = ["ExtractDecodeTaskDetail"]