| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- from utils.sync_mysql_help import mysql
- from utils.params import CapabilityEnum, SceneEnum
- from loguru import logger
- import sys
- import json
- from typing import Optional, Dict, Any
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- # 任务状态常量
- STATUS_PENDING = 0 # 待处理
- STATUS_RUNNING = 1 # 执行中
- STATUS_SUCCESS = 2 # 成功
- STATUS_FAILED = 3 # 失败
- # 不需要查询结果的状态(失败状态需要查询 error_message,因此不在此集合中)
- STATUS_WITHOUT_RESULT = {STATUS_PENDING, STATUS_RUNNING}
- def _build_response(data: Dict[str, Any]) -> Dict[str, Any]:
- """构建统一响应格式"""
- response = {
- "code": 0,
- "msg": "ok",
- "data": data
- }
- return response
- def _parse_result_payload(payload: Optional[str]) -> Any:
- """解析结果负载(JSON字符串转对象)"""
- if not payload:
- return None
-
- try:
- return json.loads(payload)
- except (json.JSONDecodeError, TypeError):
- return payload
- def _parse_web_url(web_url: Optional[str]) -> Any:
- """解析结果表中的 web_url 字段(JSON 字符串 -> 对象)"""
- if not web_url:
- return None
- try:
- return json.loads(web_url)
- except (json.JSONDecodeError, TypeError):
- # 如果不是合法 JSON,则原样返回,避免接口直接报错
- return web_url
- def _build_script_point_url(result: Any) -> Dict[str, str]:
- """
- 从创作解构的 result 中构建链接字段:
- 1. 优先使用 result['script_elements_table'] 作为链接
- 2. 如果没有该字段,则任意取一个字段的值作为链接
- 3. 统一返回结构为 { "pointUrl": "<链接或空字符串>" }
- """
- point_url = ""
- # 优先从 script_elements_table 字段取值
- if isinstance(result, dict):
- value = result.get("script_elements_table")
- if isinstance(value, str) and value:
- point_url = value
- else:
- # 没有 script_elements_table 时,从任意字段中取一个非空字符串值
- for v in result.values():
- if isinstance(v, str) and v:
- point_url = v
- break
- elif isinstance(result, str):
- # 如果整体就是一个字符串,直接作为链接使用
- point_url = result
- return {"pointUrl": point_url}
- def _fetch_decode_result(task_id: str) -> Optional[Dict[str, Any]]:
- """获取选题解构任务结果"""
- sql = "SELECT result_payload, error_message, web_url FROM workflow_decode_task_result WHERE task_id = %s"
- result_record = mysql.fetchone(sql, (task_id,))
-
- if not result_record:
- return None
-
- return {
- "result": _parse_result_payload(result_record.get("result_payload")),
- "error_message": result_record.get("error_message"),
- "url": _parse_web_url(result_record.get("web_url"))
- }
- def _fetch_script_decode_result(task_id: str) -> Optional[Dict[str, Any]]:
- """获取创作解构任务结果"""
- sql = "SELECT result_payload, error_message, web_url FROM workflow_script_task_result WHERE task_id = %s"
- result_record = mysql.fetchone(sql, (task_id,))
- if not result_record:
- return None
- parsed_result = _parse_result_payload(result_record.get("result_payload"))
- url_result = _parse_result_payload(result_record.get("web_url"))
- # 创作解构的链接从 result 中提取,返回 { "pointUrl": "<链接>" } 结构
- url = _build_script_point_url(url_result)
- return {
- "result": parsed_result,
- "error_message": result_record.get("error_message"),
- "url": url,
- }
- def _fetch_pattern_result(task_id: str) -> Optional[Dict[str, Any]]:
- """获取聚类任务结果"""
- sql = "SELECT result_payload, error_message, web_url FROM workflow_pattern_task_result WHERE task_id = %s"
- result_record = mysql.fetchone(sql, (task_id,))
- if not result_record:
- return None
- return {
- "result": _parse_result_payload(result_record.get("result_payload")),
- "error_message": result_record.get("error_message"),
- "url": _parse_web_url(result_record.get("web_url"))
- }
- def _build_result_data(
- task_id: str,
- status: int,
- result: Any = None,
- reason: Optional[str] = None,
- url: Any = None
- ) -> Dict[str, Any]:
- """构建结果数据"""
- data: Dict[str, Any] = {
- "taskId": task_id,
- "status": status,
- "result": result,
- "reason": reason
- }
- # 对于有可视化页面的任务,增加 url 字段
- if url is not None:
- data["url"] = url
- return data
- def _handle_success_status(task_id: str, capability: int, scene: Optional[int] = None) -> Dict[str, Any]:
- """处理成功状态(status=2)"""
- # 解构任务
- if capability == CapabilityEnum.DECODE.value:
- # 按业务场景区分:0 选题 -> 选题解构表;1 创作 -> 创作解构表
- if scene is not None and scene == SceneEnum.CREATION.value:
- decode_result = _fetch_script_decode_result(task_id)
- else:
- decode_result = _fetch_decode_result(task_id)
- if not decode_result:
- return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
- result_data = _build_result_data(
- task_id=task_id,
- status=STATUS_SUCCESS,
- result=decode_result.get("result"),
- reason=decode_result.get("error_message"),
- url=decode_result.get("url")
- )
- return _build_response(result_data)
- # 聚类任务
- if capability == CapabilityEnum.PATTERN.value:
- pattern_result = _fetch_pattern_result(task_id)
- if not pattern_result:
- return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
- result_data = _build_result_data(
- task_id=task_id,
- status=STATUS_SUCCESS,
- result=pattern_result.get("result"),
- reason=pattern_result.get("error_message"),
- url=pattern_result.get("url")
- )
- return _build_response(result_data)
- # 其他能力:只返回基础任务信息
- return _build_response(_build_result_data(task_id, STATUS_SUCCESS))
- def get_decode_detail_by_task_id(task_id: str) -> Optional[Dict[str, Any]]:
- """获取任务详情"""
- # 查询任务基本信息
- sql = "SELECT task_id, status, capability, scene FROM workflow_task WHERE task_id = %s"
- task = mysql.fetchone(sql, (task_id,))
-
- if not task:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
-
- task_id_value = task.get("task_id")
- status = task.get("status")
- capability = task.get("capability")
- scene = task.get("scene")
-
- # 不需要查询结果的状态,直接返回
- if status in STATUS_WITHOUT_RESULT:
- result_data = _build_result_data(task_id_value, status)
- return _build_response(result_data)
-
- # 成功状态,需要查询结果
- if status == STATUS_SUCCESS:
- return _handle_success_status(task_id_value, capability, scene)
-
- # 失败状态,需要返回 error_message 到 reason 字段,result 固定为 "[]"
- if status == STATUS_FAILED:
- error_message: Optional[str] = None
- # 解构任务失败原因
- if capability == CapabilityEnum.DECODE.value:
- # 按业务场景区分:0 选题 -> 选题解构表;1 创作 -> 创作解构表
- if scene is not None and scene == SceneEnum.CREATION.value:
- decode_result = _fetch_script_decode_result(task_id_value)
- else:
- decode_result = _fetch_decode_result(task_id_value)
- if decode_result:
- error_message = decode_result.get("error_message")
- # 聚类任务失败原因
- elif capability == CapabilityEnum.PATTERN.value:
- pattern_result = _fetch_pattern_result(task_id_value)
- if pattern_result:
- error_message = pattern_result.get("error_message")
-
- result_data = _build_result_data(
- task_id=task_id_value,
- status=STATUS_FAILED,
- result="[]",
- reason=error_message or ""
- )
- return _build_response(
- result_data
- )
-
- # 其他未知状态,返回基础数据
- result_data = _build_result_data(task_id_value, status)
- return _build_response(result_data)
|