Procházet zdrojové kódy

Merge branch 'dev-1209'

max_liu před 1 dnem
rodič
revize
ac307d2190
5 změnil soubory, kde provedl 148 přidání a 14 odebrání
  1. 1 1
      decode_task/decodeTask.py
  2. 89 0
      decode_task/evalueteTask.py
  3. 38 3
      main.py
  4. 10 9
      models/evaluate_record.py
  5. 10 1
      utils/params.py

+ 1 - 1
decode_task/decodeTask.py

@@ -34,7 +34,7 @@ def get_decode_result_by_id(task_id:str):
     if not tasks:
         logger.info(f"task_id = {task_id} , 任务不存在")
         return None
-    return tasks['result'], tasks['status'],tasks['error_reason']
+    return tasks['result'], tasks['status'],tasks['error_reason'],tasks['search_keyword']
 
 
 def  decode_task_status_handler():

+ 89 - 0
decode_task/evalueteTask.py

@@ -0,0 +1,89 @@
+import json
+from loguru import logger
+import sys
+import time
+from utils.sync_mysql_help import mysql
+from typing import Dict, Any
+
+from utils.params import DecodeWorkflowParam
+from src.workflows.decode_workflow import DecodeWorkflow
+
+
+
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+
+
+   
+
+
+def invoke_evaluate_workflow(evaluate_params: Dict[str, Any]):
+    """主函数"""
+    workflow = DecodeWorkflow()
+    result = workflow.invoke(evaluate_params)
+    if result:
+        return result
+    else:
+        print(f"❌ 保存结果失败,但将继续处理")
+        return None
+
+
+def get_evaluate_result_by_id(evaluate_id:str):
+    sql = "SELECT * FROM evaluate_record WHERE evaluate_id = %s"
+    tasks = mysql.fetchone(sql, (evaluate_id,))
+    if not tasks:
+        logger.info(f"evaluate_id = {evaluate_id} , 任务不存在")    
+        return None
+    return tasks['evaluate_result'], tasks['status'],tasks['error_reason']
+
+
+def  evaluate_task_status_handler():
+    # 从数据库中获取任务,每次获取一个
+    sql = "SELECT * FROM evaluate_record WHERE status = 0 ORDER BY create_timestamp ASC LIMIT 1"
+    task = mysql.fetchone(sql)
+   
+
+    if not task:
+        logger.info("任务列表为空")
+        return
+    else:
+        task_id = task['evaluate_id']
+        sql = "UPDATE evaluate_record SET status = 1 WHERE evaluate_id = %s AND status = 0"
+        mysql.execute(sql, (task_id,))
+        
+        # 获取任务结果
+    try:
+        evaluate_id = task['evaluate_id']
+        video_url = task['video_url']
+        video_id = task['video_id']
+        evaluate_params = {'evaluate_id':evaluate_id, 'video_id':video_id, 'video_url':video_url}
+        logger.info(f"evaluate_id = {evaluate_id} , video_id = {video_id}")     
+        task_create_timestamp = task['created_at']
+        current_timestamp = int(time.time() * 1000)
+
+        evaluate_result = invoke_evaluate_workflow(evaluate_params)
+
+        if evaluate_result:
+            # 更新任务状态为2,任务完成
+            sql = "UPDATE evaluate_record SET status = 2, evaluate_result = %s WHERE evaluate_id = %s"
+            mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
+            logger.info(f"evaluate_id = {evaluate_id} , evaluate_result = {evaluate_result}")
+
+           
+        else:
+            if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
+                sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
+                mysql.execute(sql, (json.dumps(evaluate_result),evaluate_id))
+                logger.info(f"evaluate_id = {evaluate_id} ,任务状态异常")   
+    except Exception as e:
+        logger.error(f"evaluate_id = {evaluate_id} , error = {e}")
+        sql = "UPDATE evaluate_record SET status = 3, error_reason = %s WHERE evaluate_id = %s"
+        mysql.execute(sql, (json.dumps(str(e)),evaluate_id))
+        logger.info(f"evaluate_id = {evaluate_id} ,任务异常")
+        raise RuntimeError(f"evaluate_id={evaluate_id} 任务异常: {e}")      
+
+
+# if __name__ == "__main__":
+#     decode_task_status_handler()
+   

+ 38 - 3
main.py

@@ -4,7 +4,7 @@ import uuid
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import JSONResponse
 from pydantic import BaseModel
-from utils.params import TaskStatusParam, DecodeListParam
+from utils.params import TaskStatusParam, DecodeListParam,EvaluateListParam
 from dotenv import load_dotenv
 
 
@@ -14,6 +14,8 @@ from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_
 
 from typing import List
 from models.decode_record import DecodeRecord
+from models.evaluate_record import EvaluateRecord
+
 from task_schedule import TaskScheduler
 
 from loguru import logger
@@ -81,7 +83,7 @@ def get_decode_result(param: TaskStatusParam):
             "message": "error",
             "data": None
         }
-    result, status,error_reason = db_res
+    result, status,error_reason,search_keyword = db_res
     if result:
         return {
             "code": 0,
@@ -89,7 +91,8 @@ def get_decode_result(param: TaskStatusParam):
             "data": {
                 "result": result,
                 "status": status,
-                "error":error_reason
+                "error":error_reason,
+                "searchKeyword":search_keyword
             }
         }
     else:
@@ -99,3 +102,35 @@ def get_decode_result(param: TaskStatusParam):
             "data": None
         }
 
+@app.post("/evaluate/create")
+def evaluate_video(param:EvaluateListParam):
+
+    evaluate_list = param.evaluate_list
+
+    data_list = []
+    for evaluate in evaluate_list:
+        evaluate_id = str(uuid.uuid4())
+        task_id = evaluate.task_id
+        channel_content_id = evaluate.channel_content_id
+        EvaluateRecord(
+            evaluate_id=evaluate_id,
+            task_id=task_id,
+            search_result= json.dumps(evaluate.search_result),
+            evaluate_result=None,   
+            error_reason=None,
+            status = 0
+        ).save()
+        data_list.append({
+            "task_id": task_id,
+            "evaluate_id": evaluate_id,
+            "channel_content_id":channel_content_id
+        })
+    return {
+        "code": 0,
+        "message": "success",
+        "data": data_list
+    }
+
+
+
+

+ 10 - 9
models/decode_result_record.py → models/evaluate_record.py

@@ -6,15 +6,16 @@ from typing_extensions import Annotated
 from utils.general import get_now_ts
 from utils.sync_mysql_help import mysql
 
-class DecodeResultRecord(BaseModel):
-    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_result_record'
-
-    task_id:          Annotated[str, Field(description='任务ID')]
-    model:            Annotated[str, Field(description='模型名称')]
-    task_params:      Annotated[Optional[str], Field(description='任务参数', default=None)]
-    task_status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 1: 进行中, 2: 已完成, 3: 失败
-    task_result:      Annotated[Optional[str], Field(description='任务结果', default=None)]
-    create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
+class EvaluateRecord(BaseModel):
+    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'evaluate_record'
+    
+    evaluate_id:          Annotated[str, Field(description='任务ID', default=None)]
+    task_id:          Annotated[Optional[str], Field(description='任务ID', default=None)]
+    status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 1: 进行中, 2: 已完成, 3: 失败
+    search_result:      Annotated[Optional[str], Field(description='关键词结果', default=None)]
+    evaluate_result:      Annotated[Optional[str], Field(description='评估结果', default=None)]
+    error_reason:      Annotated[Optional[str], Field(description='失败原因', default=None)]
+    created_at: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
 
     def save(self):
 

+ 10 - 1
utils/params.py

@@ -17,4 +17,13 @@ class DecodeWorkflowParam(BaseModel):
     task_id: str
 
 class DecodeListParam(BaseModel):
-    video_list: List[DecodeParam]
+    video_list: List[DecodeParam]
+
+class EvaluateWorkflowParam(BaseModel):
+    video_id: str
+    video_url: str
+    task_id: str
+    status: int
+    channel_video_url: str
+class EvaluateListParam(BaseModel):
+    evaluate_list: List[any]