detail.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from utils.sync_mysql_help import mysql
  2. from utils.params import CapabilityEnum
  3. from loguru import logger
  4. import sys
  5. import json
  6. from typing import Optional, Dict, Any
  7. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  8. # 任务状态常量
  9. STATUS_PENDING = 0 # 待处理
  10. STATUS_RUNNING = 1 # 执行中
  11. STATUS_SUCCESS = 2 # 成功
  12. STATUS_FAILED = 3 # 失败
  13. # 不需要查询结果的状态
  14. STATUS_WITHOUT_RESULT = {STATUS_PENDING, STATUS_RUNNING, STATUS_FAILED}
  15. def _build_response(data: Dict[str, Any], reason: Optional[str] = None) -> Dict[str, Any]:
  16. """构建统一响应格式"""
  17. response = {
  18. "code": 0,
  19. "msg": "ok",
  20. "data": data
  21. }
  22. if reason:
  23. response["reason"] = reason
  24. return response
  25. def _parse_result_payload(payload: Optional[str]) -> Any:
  26. """解析结果负载(JSON字符串转对象)"""
  27. if not payload:
  28. return None
  29. try:
  30. return json.loads(payload)
  31. except (json.JSONDecodeError, TypeError):
  32. return payload
  33. def _fetch_decode_result(task_id: str) -> Optional[Dict[str, Any]]:
  34. """获取解构任务结果"""
  35. sql = "SELECT result_payload, error_message FROM workflow_decode_task_result WHERE task_id = %s"
  36. result_record = mysql.fetchone(sql, (task_id,))
  37. if not result_record:
  38. return None
  39. return {
  40. "result": _parse_result_payload(result_record.get("result_payload")),
  41. "error_message": result_record.get("error_message")
  42. }
  43. def _build_result_data(task_id: str, status: int, result: Any = None) -> Dict[str, Any]:
  44. """构建结果数据"""
  45. return {
  46. "taskId": task_id,
  47. "status": status,
  48. "result": result
  49. }
  50. def _handle_success_status(task_id: str, capability: int) -> Dict[str, Any]:
  51. """处理成功状态(status=2)"""
  52. # 只有解构任务需要查询结果表
  53. if capability != CapabilityEnum.DECODE.value:
  54. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  55. # 查询解构结果
  56. decode_result = _fetch_decode_result(task_id)
  57. if not decode_result:
  58. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  59. result_data = _build_result_data(
  60. task_id=task_id,
  61. status=STATUS_SUCCESS,
  62. result=decode_result.get("result")
  63. )
  64. return _build_response(
  65. data=result_data,
  66. reason=decode_result.get("error_message")
  67. )
  68. def get_decode_detail_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
  69. """获取任务详情"""
  70. # 查询任务基本信息
  71. sql = "SELECT task_id, status, capability FROM workflow_task WHERE task_id = %s"
  72. task = mysql.fetchone(sql, (task_id,))
  73. if not task:
  74. logger.info(f"task_id = {task_id} , 任务不存在")
  75. return None
  76. task_id_value = task.get("task_id")
  77. status = task.get("status")
  78. capability = task.get("capability")
  79. # 不需要查询结果的状态,直接返回
  80. if status in STATUS_WITHOUT_RESULT:
  81. result_data = _build_result_data(task_id_value, status)
  82. return _build_response(result_data)
  83. # 成功状态,需要查询结果
  84. if status == STATUS_SUCCESS:
  85. return _handle_success_status(task_id_value, capability)
  86. # 其他未知状态,返回基础数据
  87. result_data = _build_result_data(task_id_value, status)
  88. return _build_response(result_data)