Kaynağa Gözat

refactor(decode): 移除脚本任务相关代码并简化解码逻辑

删除不再使用的脚本任务处理代码,包括脚本任务状态处理器和相关导入
简化解码结果处理逻辑,统一使用decode_videos表查询结果
更新接口返回格式,移除脚本结果相关字段
max_liu 2 gün önce
ebeveyn
işleme
75a467ea95
4 değiştirilmiş dosya ile 10 ekleme ve 188 silme
  1. 5 10
      decode_task/decodeTask.py
  2. 0 170
      decode_task/scriptTask.py
  3. 5 7
      main.py
  4. 0 1
      task_schedule.py

+ 5 - 10
decode_task/decodeTask.py

@@ -1,17 +1,13 @@
-from ast import Dict
-import json
-import re
 from loguru import logger
 import sys
 import time
 from utils.sync_mysql_help import mysql
-from typing import List
-from datetime import datetime
+from typing import Dict, Any
+
 from utils.params import DecodeWorkflowParam
 from src.workflows.decode_workflow import DecodeWorkflow
 
 
-from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
 
 
 logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
@@ -21,7 +17,7 @@ logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
    
 
 
-def invoke_decode_workflow(task_params: Dict[DecodeWorkflowParam]):
+def invoke_decode_workflow(task_params: Dict[str, Any]):
     """主函数"""
 
     result = DecodeWorkflow(task_params)
@@ -33,12 +29,12 @@ def invoke_decode_workflow(task_params: Dict[DecodeWorkflowParam]):
 
 
 def get_decode_result_by_id(task_id:str):
-    sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
+    sql = "SELECT * FROM decode_videos WHERE task_id = %s AND task_status = 2 "
     tasks = mysql.fetchone(sql, (task_id,))
     if not tasks:
         logger.info(f"task_id = {task_id} , 任务不存在")
         return None
-    return tasks['decode_result'], tasks['task_status']
+    return tasks['result'], tasks['status']
 
 
 def  decode_task_status_handler():
@@ -85,4 +81,3 @@ def  decode_task_status_handler():
         logger.info(f"task_id = {task_id} ,任务异常")
         raise {"task_id": task_id, "error": '任务异常'}
 
-

+ 0 - 170
decode_task/scriptTask.py

@@ -1,170 +0,0 @@
-import json
-from loguru import logger
-import sys
-import time
-from typing import Dict, Any, List
-from datetime import datetime
-from utils.sync_mysql_help import mysql
-from examples.run_batch_script import build_script_input
-
-from src.workflows.script_workflow import ScriptWorkflow
-
-
-logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
-
-
-def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]):
-    """主函数"""
-   
-
-    # 读取原始三点解构结果
-    raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
-
-    # 读取已有的脚本理解输出,支持增量追加
-    output_data = script_result if isinstance(script_result, dict) else None
-    if not output_data:
-        output_data = {
-            "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
-            "total": 0,
-            "success_count": 0,
-            "fail_count": 0,
-            "results": [],
-        }
-
-    existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
-    # 用 channel_content_id + video URL 去重,避免重复处理
-    processed_keys = {
-        f"{item.get('video_data', {}).get('channel_content_id','')}|"
-        f"{item.get('video_data', {}).get('video','')}"
-        for item in existing_results
-    }
-
-    workflow = ScriptWorkflow()
-
-    for item in raw_results:
-        video_data = item.get("video_data", {}) or {}
-        result = item.get("result", {}) or {}
-
-        key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
-        if key in processed_keys:
-            logger.info(f"已处理过该视频,跳过: {key}")
-            continue
-
-        logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
-
-        try:
-            script_input = build_script_input(video_data, result)
-            script_result = workflow.invoke(script_input)
-
-            record = {
-                "video_data": video_data,
-                "what_deconstruction_result": result,
-                "script_result": script_result,
-                "success": True,
-                "error": None,
-            }
-
-            output_data["success_count"] = output_data.get("success_count", 0) + 1
-
-        except Exception as e:
-            logger.error(f"脚本理解处理失败: {e}", exc_info=True)
-            record = {
-                "video_data": video_data,
-                "what_deconstruction_result": result,
-                "script_result": None,
-                "success": False,
-                "error": str(e),
-            }
-            output_data["fail_count"] = output_data.get("fail_count", 0) + 1
-
-        output_data["results"].append(record)
-        output_data["total"] = output_data.get("total", 0) + 1
-
-    # 返回序列化后的 JSON 字符串(供数据库存储)
-    return json.dumps(output_data, ensure_ascii=False)
-
-    
-
-
-def get_script_result_by_id(task_id:str):
-    sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 "
-    tasks = mysql.fetchone(sql, (task_id,))
-    if not tasks:
-        logger.info(f"task_id = {task_id} , 任务不存在")
-        return None
-    return tasks['decode_result'], tasks['script_result'], tasks['task_status']
-
-
-def  script_task_status_handler():
-    # 从数据库中获取任务,每次获取一个
-    sql = "SELECT * FROM decode_record WHERE task_status = 2 "
-    """json"""
-    tasks = mysql.fetchall(sql)
-    # tasks = mysql.fetchall(sql,())
-
-    if not tasks:
-        logger.info("script任务列表为空")
-        return
-    for task in tasks:
-        task_id = task['task_id']
-        # 解析 decode_result(可能为 None 或已是对象)
-        decode_result_raw = task.get('decode_result')
-        if isinstance(decode_result_raw, (dict, list)):
-            decode_result = decode_result_raw
-        elif decode_result_raw:
-            try:
-                decode_result = json.loads(decode_result_raw)
-            except Exception as e:
-                logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}")
-                decode_result = {"results": []}
-        else:
-            decode_result = {"results": []}
-
-        # 解析 script_result(可能为 None 或已是对象)
-        script_result_raw = task.get('script_result')
-        if isinstance(script_result_raw, (dict, list)):
-            script_result = script_result_raw
-        elif script_result_raw:
-            try:
-                script_result = json.loads(script_result_raw)
-            except Exception as e:
-                logger.error(f"task_id = {task_id} , script_result 解析失败: {e}")
-                script_result = None
-        else:
-            script_result = None
-
-        logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
-        #如何任务超过30分钟,则认为任务超时,更新任务状态为3
-        task_create_timestamp = task['create_timestamp']
-        current_timestamp = int(time.time() * 1000)
-
-
-        try:
-            sql = "UPDATE decode_record SET task_status = 3,task_id = %s"
-            mysql.execute(sql, (task_id))
-        
-        except Exception as e:
-            logger.error(f"task_id = {task_id} , error = {e}")
-            sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
-            mysql.execute(sql,  (task_id))
-            logger.info(f"task_id = {task_id} ,任务异常")
-            continue
-
-        try:
-            result = invoke_script_workflow(decode_result, script_result)
-            if result:
-                # 更新任务状态为2,任务完成
-                sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s"
-                mysql.execute(sql, (result, task_id))
-                logger.info(f"task_id = {task_id} , script_result = {result}")
-            else:
-                if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
-                    sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s"
-                    mysql.execute(sql, (result, task_id))
-                    logger.info(f"task_id = {task_id} ,任务状态异常")
-        except Exception as e:
-            logger.error(f"task_id = {task_id} , error = {e}")
-            sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
-            mysql.execute(sql,  (task_id))
-            logger.info(f"task_id = {task_id} ,任务异常")
-            continue

+ 5 - 7
main.py

@@ -9,7 +9,6 @@ from dotenv import load_dotenv
 
 
 from decode_task.decodeTask import get_decode_result_by_id
-from decode_task.scriptTask import get_script_result_by_id
 
 
 
@@ -74,16 +73,15 @@ def decode_video(param:DecodeListParam):
 
 
 @app.post("/decode/result")
-def get_script_result(param: TaskStatusParam):
-    decode_result,script_result, task_status = get_script_result_by_id(param.task_id)
-    if script_result:
+def get_decode_result_by_id(param: TaskStatusParam):
+    result, status = get_decode_result_by_id(param.task_id)
+    if result:
         return {
             "code": 0,
             "message": "success",
             "data": {
-                "script_result": script_result,
-                "decode_result": decode_result,
-                "task_status": task_status
+                "decode_result": result,
+                "task_status": status
             }
         }
     else:

+ 0 - 1
task_schedule.py

@@ -1,7 +1,6 @@
 from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.triggers.interval import IntervalTrigger
 from decode_task.decodeTask import  decode_task_status_handler
-from decode_task.scriptTask import  script_task_status_handler
 from loguru import logger