detail.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. from utils.sync_mysql_help import mysql
  2. from utils.params import CapabilityEnum
  3. from loguru import logger
  4. import sys
  5. import json
  6. from typing import Optional, Dict, Any
  7. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  8. # 任务状态常量
  9. STATUS_PENDING = 0 # 待处理
  10. STATUS_RUNNING = 1 # 执行中
  11. STATUS_SUCCESS = 2 # 成功
  12. STATUS_FAILED = 3 # 失败
  13. # 不需要查询结果的状态(失败状态需要查询 error_message,因此不在此集合中)
  14. STATUS_WITHOUT_RESULT = {STATUS_PENDING, STATUS_RUNNING}
  15. def _build_response(data: Dict[str, Any]) -> Dict[str, Any]:
  16. """构建统一响应格式"""
  17. response = {
  18. "code": 0,
  19. "msg": "ok",
  20. "data": data
  21. }
  22. return response
  23. def _parse_result_payload(payload: Optional[str]) -> Any:
  24. """解析结果负载(JSON字符串转对象)"""
  25. if not payload:
  26. return None
  27. try:
  28. return json.loads(payload)
  29. except (json.JSONDecodeError, TypeError):
  30. return payload
  31. def _parse_web_url(web_url: Optional[str]) -> Optional[Dict[str, str]]:
  32. """解析结果表中的 web_url 字段,拆分出 pointUrl 和 weightUrl"""
  33. if not web_url:
  34. return None
  35. segments = [segment.strip() for segment in web_url.split(",") if segment.strip()]
  36. if not segments:
  37. return None
  38. point_url: Optional[str] = None
  39. weight_url: Optional[str] = None
  40. for segment in segments:
  41. if "weight_visualization" in segment:
  42. weight_url = segment
  43. else:
  44. point_url = segment
  45. if not point_url and not weight_url:
  46. return None
  47. return {
  48. "pointUrl": point_url or "",
  49. "weightUrl": weight_url or ""
  50. }
  51. def _fetch_decode_result(task_id: str) -> Optional[Dict[str, Any]]:
  52. """获取解构任务结果"""
  53. sql = "SELECT result_payload, error_message, web_url FROM workflow_decode_task_result WHERE task_id = %s"
  54. result_record = mysql.fetchone(sql, (task_id,))
  55. if not result_record:
  56. return None
  57. return {
  58. "result": _parse_result_payload(result_record.get("result_payload")),
  59. "error_message": result_record.get("error_message"),
  60. "url": _parse_web_url(result_record.get("web_url"))
  61. }
  62. def _build_result_data(
  63. task_id: str,
  64. status: int,
  65. result: Any = None,
  66. reason: Optional[str] = None,
  67. url: Optional[Dict[str, str]] = None
  68. ) -> Dict[str, Any]:
  69. """构建结果数据"""
  70. data: Dict[str, Any] = {
  71. "taskId": task_id,
  72. "status": status,
  73. "result": result,
  74. "reason": reason
  75. }
  76. # 对于解构任务,增加 url 字段(data.url.pointUrl / data.url.weightUrl)
  77. if url is not None:
  78. data["url"] = url
  79. return data
  80. def _handle_success_status(task_id: str, capability: int) -> Dict[str, Any]:
  81. """处理成功状态(status=2)"""
  82. # 只有解构任务需要查询结果表
  83. if capability != CapabilityEnum.DECODE.value:
  84. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  85. # 查询解构结果
  86. decode_result = _fetch_decode_result(task_id)
  87. if not decode_result:
  88. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  89. result_data = _build_result_data(
  90. task_id=task_id,
  91. status=STATUS_SUCCESS,
  92. result=decode_result.get("result"),
  93. reason=decode_result.get("error_message"),
  94. url=decode_result.get("url")
  95. )
  96. return _build_response(
  97. result_data
  98. )
  99. def get_decode_detail_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
  100. """获取任务详情"""
  101. # 查询任务基本信息
  102. sql = "SELECT task_id, status, capability FROM workflow_task WHERE task_id = %s"
  103. task = mysql.fetchone(sql, (task_id,))
  104. if not task:
  105. logger.info(f"task_id = {task_id} , 任务不存在")
  106. return None
  107. task_id_value = task.get("task_id")
  108. status = task.get("status")
  109. capability = task.get("capability")
  110. # 不需要查询结果的状态,直接返回
  111. if status in STATUS_WITHOUT_RESULT:
  112. result_data = _build_result_data(task_id_value, status)
  113. return _build_response(result_data)
  114. # 成功状态,需要查询结果
  115. if status == STATUS_SUCCESS:
  116. return _handle_success_status(task_id_value, capability)
  117. # 失败状态,需要返回 error_message 到 reason 字段,result 固定为 "[]"
  118. if status == STATUS_FAILED:
  119. error_message: Optional[str] = None
  120. # 仅对解构任务从结果表中查询失败原因
  121. if capability == CapabilityEnum.DECODE.value:
  122. decode_result = _fetch_decode_result(task_id_value)
  123. if decode_result:
  124. error_message = decode_result.get("error_message")
  125. result_data = _build_result_data(
  126. task_id=task_id_value,
  127. status=STATUS_FAILED,
  128. result="[]",
  129. reason=error_message or ""
  130. )
  131. return _build_response(
  132. result_data
  133. )
  134. # 其他未知状态,返回基础数据
  135. result_data = _build_result_data(task_id_value, status)
  136. return _build_response(result_data)