detail.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. from utils.sync_mysql_help import mysql
  2. from utils.params import CapabilityEnum, SceneEnum
  3. from loguru import logger
  4. import sys
  5. import json
  6. from typing import Optional, Dict, Any
  7. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  8. # 任务状态常量
  9. STATUS_PENDING = 0 # 待处理
  10. STATUS_RUNNING = 1 # 执行中
  11. STATUS_SUCCESS = 2 # 成功
  12. STATUS_FAILED = 3 # 失败
  13. # 不需要查询结果的状态(失败状态需要查询 error_message,因此不在此集合中)
  14. STATUS_WITHOUT_RESULT = {STATUS_PENDING, STATUS_RUNNING}
  15. def _build_response(data: Dict[str, Any]) -> Dict[str, Any]:
  16. """构建统一响应格式"""
  17. response = {
  18. "code": 0,
  19. "msg": "ok",
  20. "data": data
  21. }
  22. return response
  23. def _parse_result_payload(payload: Optional[str]) -> Any:
  24. """解析结果负载(JSON字符串转对象)"""
  25. if not payload:
  26. return None
  27. try:
  28. return json.loads(payload)
  29. except (json.JSONDecodeError, TypeError):
  30. return payload
  31. def _parse_web_url(web_url: Optional[str]) -> Any:
  32. """解析结果表中的 web_url 字段(JSON 字符串 -> 对象)"""
  33. if not web_url:
  34. return None
  35. try:
  36. return json.loads(web_url)
  37. except (json.JSONDecodeError, TypeError):
  38. # 如果不是合法 JSON,则原样返回,避免接口直接报错
  39. return web_url
  40. def _build_script_point_url(result: Any) -> Dict[str, str]:
  41. """
  42. 从创作解构的 result 中构建链接字段:
  43. 1. 优先使用 result['script_elements_table'] 作为链接
  44. 2. 如果没有该字段,则任意取一个字段的值作为链接
  45. 3. 统一返回结构为 { "pointUrl": "<链接或空字符串>" }
  46. """
  47. point_url = ""
  48. # 优先从 script_elements_table 字段取值
  49. if isinstance(result, dict):
  50. value = result.get("script_elements_table")
  51. if isinstance(value, str) and value:
  52. point_url = value
  53. else:
  54. # 没有 script_elements_table 时,从任意字段中取一个非空字符串值
  55. for v in result.values():
  56. if isinstance(v, str) and v:
  57. point_url = v
  58. break
  59. elif isinstance(result, str):
  60. # 如果整体就是一个字符串,直接作为链接使用
  61. point_url = result
  62. return {"pointUrl": point_url}
  63. def _fetch_decode_result(task_id: str) -> Optional[Dict[str, Any]]:
  64. """获取选题解构任务结果"""
  65. sql = "SELECT result_payload, error_message, web_url FROM workflow_decode_task_result WHERE task_id = %s"
  66. result_record = mysql.fetchone(sql, (task_id,))
  67. if not result_record:
  68. return None
  69. return {
  70. "result": _parse_result_payload(result_record.get("result_payload")),
  71. "error_message": result_record.get("error_message"),
  72. "url": _parse_web_url(result_record.get("web_url"))
  73. }
  74. def _fetch_script_decode_result(task_id: str) -> Optional[Dict[str, Any]]:
  75. """获取创作解构任务结果"""
  76. sql = "SELECT result_payload, error_message, web_url FROM workflow_script_task_result WHERE task_id = %s"
  77. result_record = mysql.fetchone(sql, (task_id,))
  78. if not result_record:
  79. return None
  80. parsed_result = _parse_result_payload(result_record.get("result_payload"))
  81. url_result = _parse_result_payload(result_record.get("web_url"))
  82. # 创作解构的链接从 result 中提取,返回 { "pointUrl": "<链接>" } 结构
  83. url = _build_script_point_url(url_result)
  84. return {
  85. "result": parsed_result,
  86. "error_message": result_record.get("error_message"),
  87. "url": url,
  88. }
  89. def _fetch_pattern_result(task_id: str) -> Optional[Dict[str, Any]]:
  90. """获取聚类任务结果"""
  91. sql = "SELECT result_payload, error_message, web_url FROM workflow_pattern_task_result WHERE task_id = %s"
  92. result_record = mysql.fetchone(sql, (task_id,))
  93. if not result_record:
  94. return None
  95. return {
  96. "result": _parse_result_payload(result_record.get("result_payload")),
  97. "error_message": result_record.get("error_message"),
  98. "url": _parse_web_url(result_record.get("web_url"))
  99. }
  100. def _build_result_data(
  101. task_id: str,
  102. status: int,
  103. result: Any = None,
  104. reason: Optional[str] = None,
  105. url: Any = None
  106. ) -> Dict[str, Any]:
  107. """构建结果数据"""
  108. data: Dict[str, Any] = {
  109. "taskId": task_id,
  110. "status": status,
  111. "result": result,
  112. "reason": reason
  113. }
  114. # 对于有可视化页面的任务,增加 url 字段
  115. if url is not None:
  116. data["url"] = url
  117. return data
  118. def _handle_success_status(task_id: str, capability: int, scene: Optional[int] = None) -> Dict[str, Any]:
  119. """处理成功状态(status=2)"""
  120. # 解构任务
  121. if capability == CapabilityEnum.DECODE.value:
  122. # 按业务场景区分:0 选题 -> 选题解构表;1 创作 -> 创作解构表
  123. if scene is not None and scene == SceneEnum.CREATION.value:
  124. decode_result = _fetch_script_decode_result(task_id)
  125. else:
  126. decode_result = _fetch_decode_result(task_id)
  127. if not decode_result:
  128. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  129. result_data = _build_result_data(
  130. task_id=task_id,
  131. status=STATUS_SUCCESS,
  132. result=decode_result.get("result"),
  133. reason=decode_result.get("error_message"),
  134. url=decode_result.get("url")
  135. )
  136. return _build_response(result_data)
  137. # 聚类任务
  138. if capability == CapabilityEnum.PATTERN.value:
  139. pattern_result = _fetch_pattern_result(task_id)
  140. if not pattern_result:
  141. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  142. result_data = _build_result_data(
  143. task_id=task_id,
  144. status=STATUS_SUCCESS,
  145. result=pattern_result.get("result"),
  146. reason=pattern_result.get("error_message"),
  147. url=pattern_result.get("url")
  148. )
  149. return _build_response(result_data)
  150. # 其他能力:只返回基础任务信息
  151. return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
  152. def get_decode_detail_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
  153. """获取任务详情"""
  154. # 查询任务基本信息
  155. sql = "SELECT task_id, status, capability, scene FROM workflow_task WHERE task_id = %s"
  156. task = mysql.fetchone(sql, (task_id,))
  157. if not task:
  158. logger.info(f"task_id = {task_id} , 任务不存在")
  159. return None
  160. task_id_value = task.get("task_id")
  161. status = task.get("status")
  162. capability = task.get("capability")
  163. scene = task.get("scene")
  164. # 不需要查询结果的状态,直接返回
  165. if status in STATUS_WITHOUT_RESULT:
  166. result_data = _build_result_data(task_id_value, status)
  167. return _build_response(result_data)
  168. # 成功状态,需要查询结果
  169. if status == STATUS_SUCCESS:
  170. return _handle_success_status(task_id_value, capability, scene)
  171. # 失败状态,需要返回 error_message 到 reason 字段,result 固定为 "[]"
  172. if status == STATUS_FAILED:
  173. error_message: Optional[str] = None
  174. # 解构任务失败原因
  175. if capability == CapabilityEnum.DECODE.value:
  176. # 按业务场景区分:0 选题 -> 选题解构表;1 创作 -> 创作解构表
  177. if scene is not None and scene == SceneEnum.CREATION.value:
  178. decode_result = _fetch_script_decode_result(task_id_value)
  179. else:
  180. decode_result = _fetch_decode_result(task_id_value)
  181. if decode_result:
  182. error_message = decode_result.get("error_message")
  183. # 聚类任务失败原因
  184. elif capability == CapabilityEnum.PATTERN.value:
  185. pattern_result = _fetch_pattern_result(task_id_value)
  186. if pattern_result:
  187. error_message = pattern_result.get("error_message")
  188. result_data = _build_result_data(
  189. task_id=task_id_value,
  190. status=STATUS_FAILED,
  191. result="[]",
  192. reason=error_message or ""
  193. )
  194. return _build_response(
  195. result_data
  196. )
  197. # 其他未知状态,返回基础数据
  198. result_data = _build_result_data(task_id_value, status)
  199. return _build_response(result_data)