from utils.sync_mysql_help import mysql from utils.params import CapabilityEnum, SceneEnum from loguru import logger import sys import json from typing import Optional, Dict, Any logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) # 任务状态常量 STATUS_PENDING = 0 # 待处理 STATUS_RUNNING = 1 # 执行中 STATUS_SUCCESS = 2 # 成功 STATUS_FAILED = 3 # 失败 # 不需要查询结果的状态(失败状态需要查询 error_message,因此不在此集合中) STATUS_WITHOUT_RESULT = {STATUS_PENDING, STATUS_RUNNING} def _build_response(data: Dict[str, Any]) -> Dict[str, Any]: """构建统一响应格式""" response = { "code": 0, "msg": "ok", "data": data } return response def _parse_result_payload(payload: Optional[str]) -> Any: """解析结果负载(JSON字符串转对象)""" if not payload: return None try: return json.loads(payload) except (json.JSONDecodeError, TypeError): return payload def _parse_web_url(web_url: Optional[str]) -> Any: """解析结果表中的 web_url 字段(JSON 字符串 -> 对象)""" if not web_url: return None try: return json.loads(web_url) except (json.JSONDecodeError, TypeError): # 如果不是合法 JSON,则原样返回,避免接口直接报错 return web_url def _build_script_point_url(result: Any) -> Dict[str, str]: """ 从创作解构的 result 中构建链接字段: 1. 优先使用 result['script_elements_table'] 作为链接 2. 如果没有该字段,则任意取一个字段的值作为链接 3. 统一返回结构为 { "pointUrl": "<链接或空字符串>" } """ point_url = "" # 优先从 script_elements_table 字段取值 if isinstance(result, dict): value = result.get("script_elements_table") if isinstance(value, str) and value: point_url = value else: # 没有 script_elements_table 时,从任意字段中取一个非空字符串值 for v in result.values(): if isinstance(v, str) and v: point_url = v break elif isinstance(result, str): # 如果整体就是一个字符串,直接作为链接使用 point_url = result return {"pointUrl": point_url} def _fetch_decode_result(task_id: str) -> Optional[Dict[str, Any]]: """获取选题解构任务结果""" sql = "SELECT result_payload, error_message, web_url FROM workflow_decode_task_result WHERE task_id = %s" result_record = mysql.fetchone(sql, (task_id,)) if not result_record: return None return { "result": _parse_result_payload(result_record.get("result_payload")), "error_message": result_record.get("error_message"), "url": _parse_web_url(result_record.get("web_url")) } def _fetch_script_decode_result(task_id: str) -> Optional[Dict[str, Any]]: """获取创作解构任务结果""" sql = "SELECT result_payload, error_message, web_url FROM workflow_script_task_result WHERE task_id = %s" result_record = mysql.fetchone(sql, (task_id,)) if not result_record: return None parsed_result = _parse_result_payload(result_record.get("result_payload")) url_result = _parse_result_payload(result_record.get("web_url")) # 创作解构的链接从 result 中提取,返回 { "pointUrl": "<链接>" } 结构 url = _build_script_point_url(url_result) return { "result": parsed_result, "error_message": result_record.get("error_message"), "url": url, } def _fetch_pattern_result(task_id: str) -> Optional[Dict[str, Any]]: """获取聚类任务结果""" sql = "SELECT result_payload, error_message, web_url FROM workflow_pattern_task_result WHERE task_id = %s" result_record = mysql.fetchone(sql, (task_id,)) if not result_record: return None return { "result": _parse_result_payload(result_record.get("result_payload")), "error_message": result_record.get("error_message"), "url": _parse_web_url(result_record.get("web_url")) } def _build_result_data( task_id: str, status: int, result: Any = None, reason: Optional[str] = None, url: Any = None ) -> Dict[str, Any]: """构建结果数据""" data: Dict[str, Any] = { "taskId": task_id, "status": status, "result": result, "reason": reason } # 对于有可视化页面的任务,增加 url 字段 if url is not None: data["url"] = url return data def _handle_success_status(task_id: str, capability: int, scene: Optional[int] = None) -> Dict[str, Any]: """处理成功状态(status=2)""" # 解构任务 if capability == CapabilityEnum.DECODE.value: # 按业务场景区分:0 选题 -> 选题解构表;1 创作 -> 创作解构表 if scene is not None and scene == SceneEnum.CREATION.value: decode_result = _fetch_script_decode_result(task_id) else: decode_result = _fetch_decode_result(task_id) if not decode_result: return _build_response(_build_result_data(task_id, STATUS_SUCCESS)) result_data = _build_result_data( task_id=task_id, status=STATUS_SUCCESS, result=decode_result.get("result"), reason=decode_result.get("error_message"), url=decode_result.get("url") ) return _build_response(result_data) # 聚类任务 if capability == CapabilityEnum.PATTERN.value: pattern_result = _fetch_pattern_result(task_id) if not pattern_result: return _build_response(_build_result_data(task_id, STATUS_SUCCESS)) result_data = _build_result_data( task_id=task_id, status=STATUS_SUCCESS, result=pattern_result.get("result"), reason=pattern_result.get("error_message"), url=pattern_result.get("url") ) return _build_response(result_data) # 其他能力:只返回基础任务信息 return _build_response(_build_result_data(task_id, STATUS_SUCCESS)) def get_decode_detail_by_task_id(task_id: str) -> Optional[Dict[str, Any]]: """获取任务详情""" # 查询任务基本信息 sql = "SELECT task_id, status, capability, scene FROM workflow_task WHERE task_id = %s" task = mysql.fetchone(sql, (task_id,)) if not task: logger.info(f"task_id = {task_id} , 任务不存在") return None task_id_value = task.get("task_id") status = task.get("status") capability = task.get("capability") scene = task.get("scene") # 不需要查询结果的状态,直接返回 if status in STATUS_WITHOUT_RESULT: result_data = _build_result_data(task_id_value, status) return _build_response(result_data) # 成功状态,需要查询结果 if status == STATUS_SUCCESS: return _handle_success_status(task_id_value, capability, scene) # 失败状态,需要返回 error_message 到 reason 字段,result 固定为 "[]" if status == STATUS_FAILED: error_message: Optional[str] = None # 解构任务失败原因 if capability == CapabilityEnum.DECODE.value: # 按业务场景区分:0 选题 -> 选题解构表;1 创作 -> 创作解构表 if scene is not None and scene == SceneEnum.CREATION.value: decode_result = _fetch_script_decode_result(task_id_value) else: decode_result = _fetch_decode_result(task_id_value) if decode_result: error_message = decode_result.get("error_message") # 聚类任务失败原因 elif capability == CapabilityEnum.PATTERN.value: pattern_result = _fetch_pattern_result(task_id_value) if pattern_result: error_message = pattern_result.get("error_message") result_data = _build_result_data( task_id=task_id_value, status=STATUS_FAILED, result="[]", reason=error_message or "" ) return _build_response( result_data ) # 其他未知状态,返回基础数据 result_data = _build_result_data(task_id_value, status) return _build_response(result_data)