Sfoglia il codice sorgente

feat(解码流程): 重构解码任务处理逻辑并简化状态管理

- 新增DecodeWorkflowParam模型用于工作流参数传递
- 简化DecodeRecord模型,移除冗余字段仅保留状态字段
- 重构decodeTask.py中的任务处理逻辑,使用新的工作流接口
- 更新任务状态处理逻辑,简化状态流转和错误处理
max_liu 4 giorni fa
parent
commit
7e9ffe5d6c
4 ha cambiato i file con 58 aggiunte e 123 eliminazioni
  1. 49 96
      decode_task/decodeTask.py
  2. 3 23
      main.py
  3. 1 4
      models/decode_record.py
  4. 5 0
      utils/params.py

+ 49 - 96
decode_task/decodeTask.py

@@ -1,12 +1,15 @@
+from ast import Dict
 import json
+import re
+from tkinter import E
 from loguru import logger
 import sys
 import time
 from utils.sync_mysql_help import mysql
 from typing import List
 from datetime import datetime
-from utils.params import DecodeParam
-from examples.run_batch import process_single_video
+from utils.params import DecodeWorkflowParam
+from src.workflows.decode_workflow import DecodeWorkflow
 
 
 from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
@@ -15,73 +18,19 @@ from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflo
 logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
 
 
-def save_decode_result(results, timestamp, total, success_count, fail_count):
-    output_data = {
-        "timestamp": timestamp,
-        "total": total,
-        "success_count": success_count,
-        "fail_count": fail_count,
-        "results": results
-    }
-    return output_data
-   
-
-
-def invoke_decode_workflow(video_list: List[DecodeParam]):
-    """主函数"""
-    print("=" * 80)
-    print("批量处理视频 - What 解构工作流(视频分析版本)")
-    print("=" * 80)
 
-    # 1. 读取视频列表
-    print(f"\n[1] 读取视频列表...{video_list}")
-
-    # 2. 初始化工作流
-    print("\n[2] 初始化工作流...")
-    try:
-        workflow = WhatDeconstructionWorkflow(
-            model_provider="google_genai",
-            max_depth=10
-        )
-        print(f"✅ 工作流初始化成功")
-    except Exception as e:
-        print(f"❌ 工作流初始化失败: {e}")
-        import traceback
-        traceback.print_exc()
-        return
-
-    # 3. 准备结果文件路径和时间戳
-    print("\n[3] 准备结果文件...")
    
 
-    # 4. 批量处理视频(每处理完一个立即保存)
-    print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
-    results = []
-    total = len(video_list)
-    success_count = 0
-    fail_count = 0
-
-    for index, video_data in enumerate[DecodeParam](video_list, 1):
-        # 处理单个视频
-        result = process_single_video(workflow, video_data, index, total)
-        results.append(result)
-
-        # 更新统计
-        if result["success"]:
-            success_count += 1
-        else:
-            fail_count += 1
 
-        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+def invoke_decode_workflow(task_params: Dict[DecodeWorkflowParam]):
+    """主函数"""
 
-        # 立即保存结果到文件
-        output_data = save_decode_result(results, timestamp, total, success_count, fail_count)
-        # print(f"   保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
-        if output_data:
-            return output_data
-        else:
-            print(f"❌ 保存结果失败,但将继续处理")
-            return None
+    result = DecodeWorkflow(task_params)
+    if result:
+            return result
+    else:
+        print(f"❌ 保存结果失败,但将继续处理")
+        return None
 
 
 def get_decode_result_by_id(task_id:str):
@@ -95,42 +44,46 @@ def get_decode_result_by_id(task_id:str):
 
 def  decode_task_status_handler():
     # 从数据库中获取任务,每次获取一个
-    sql = "SELECT * FROM decode_record WHERE task_status = 1 "
+    sql = "SELECT * FROM decode_record WHERE task_status = 0 "
     """json"""
-    tasks = mysql.fetchall(sql)
-    """字典"""
-    # tasks = mysql.fetchall(sql,())
+    task = mysql.fetchone(sql)
+    task_id = task['task_id']
+    video_url = task['video_url']
 
-    if not tasks:
+    if not task:
         logger.info("任务列表为空")
         return
-    for task in tasks:
-        task_id = task['task_id']
-        task_params = json.loads(task['task_params'])
-        logger.info(f"task_id = {task_id} , task_params = {task_params}")
-        #如何任务超过30分钟,则认为任务超时,更新任务状态为3
-        task_create_timestamp = task['create_timestamp']
-        current_timestamp = int(time.time() * 1000)
- 
+    else:
+        sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s"
+        mysql.execute(sql, (task_id,))     
         
         # 获取任务结果
-        try:
-            decode_result = invoke_decode_workflow(task_params['video_list'])
-
-            if decode_result:
-                # 更新任务状态为2,任务完成
-                sql = "UPDATE decode_record SET task_status = 2, decode_result = %s WHERE task_id = %s"
-                mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
-                logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
-            else:
-                if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
-                    sql = "UPDATE decode_record SET task_status = 5, decode_result = %s WHERE task_id = %s"
-                    mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
-                    logger.info(f"task_id = {task_id} ,任务状态异常")
-        except Exception as e:
-            logger.error(f"task_id = {task_id} , error = {e}")
-            sql = "UPDATE decode_record SET task_status = 5, decode_result = '任务异常' WHERE task_id = %s"
-            mysql.execute(sql,  (task_id))
-            logger.info(f"task_id = {task_id} ,任务异常")
-            continue
+    try:
+        video_id = task['video_id']
+        task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url}
+        logger.info(f"task_id = {task_id} , video_id = {video_id}")
+        task_create_timestamp = task['create_timestamp']
+        current_timestamp = int(time.time() * 1000)
+
+        decode_result = invoke_decode_workflow(task_params)
+
+        if decode_result:
+            # 更新任务状态为2,任务完成
+            sql = "UPDATE decode_record SET task_status = 2, WHERE task_id = %s"
+            mysql.execute(sql, (task_id))
+            logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
+
+           
+        else:
+            if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
+                sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
+                mysql.execute(sql, (task_id))
+                logger.info(f"task_id = {task_id} ,任务状态异常")
+    except Exception as e:
+        logger.error(f"task_id = {task_id} , error = {e}")
+        sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
+        mysql.execute(sql,  (task_id))
+        logger.info(f"task_id = {task_id} ,任务异常")
+        raise {"task_id": task_id, "error": '任务异常'}
+
 

+ 3 - 23
main.py

@@ -58,10 +58,7 @@ def decode_video(param:DecodeListParam):
             task_id=task_id,
             video_id=video_id,
             video_url=video_url,
-            task_params=json.dumps(param.model_dump()),
-            decode_result=None,
-            script_result=None,
-            task_status = 1
+            task_status = 0
         ).save()
         data_list.append({
             "task_id": task_id,
@@ -74,26 +71,9 @@ def decode_video(param:DecodeListParam):
     }
 
 
-@app.post("/decode/result")
-def get_decode_result(param: TaskStatusParam):
-    decode_result, task_status = get_decode_result_by_id(param.task_id)
-    if decode_result:
-        return {
-            "code": 0,
-            "message": "success",
-            "data": {
-                "decode_result": decode_result,
-                "task_status": task_status
-            }
-        }
-    else:
-        return {
-            "code": -1,
-            "message": "error",
-            "data": None
-        }
 
-@app.post("/script/result")
+
+@app.post("/decode/result")
 def get_script_result(param: TaskStatusParam):
     decode_result,script_result, task_status = get_script_result_by_id(param.task_id)
     if script_result:

+ 1 - 4
models/decode_record.py

@@ -12,10 +12,7 @@ class DecodeRecord(BaseModel):
     task_id:          Annotated[str, Field(description='任务ID')]
     video_id:            Annotated[str, Field(description='视频ID')]
     video_url:            Annotated[str, Field(description='视频地址')]
-    task_params:      Annotated[Optional[str], Field(description='任务参数', default=None)]
-    task_status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 1: 进行中, 2: 已完成, 3: 失败
-    decode_result:      Annotated[Optional[str], Field(description='解构结果', default=None)]
-    script_result:      Annotated[Optional[str], Field(description='脚本结果', default=None)]
+    status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 0:待执行  /   1:执行中  /  2:执行成功  3:执行失败
     create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
 
     def save(self):

+ 5 - 0
utils/params.py

@@ -11,5 +11,10 @@ class DecodeParam(BaseModel):
     video: str
     title: str
 
+class DecodeWorkflowParam(BaseModel):
+    video_id: str
+    video_url: str
+    task_id: str
+
 class DecodeListParam(BaseModel):
     video_list: List[DecodeParam]