Переглянути джерело

feat(解码工作流): 添加解码工作流相关功能及接口

新增解码工作流模型、任务处理逻辑和API接口
移除旧的定时任务配置
添加TopicListParam和DecodeWorkflowParam参数模型
实现工作流任务的创建、更新和结果查询功能
max_liu 1 місяць тому
батько
коміт
dfd889bf41
5 змінених файлів з 175 додано та 18 видалено
  1. 38 0
      decode_task/topicTask.py
  2. 85 6
      main.py
  3. 34 0
      models/decode_workflow.py
  4. 9 9
      task_schedule.py
  5. 9 3
      utils/params.py

+ 38 - 0
decode_task/topicTask.py

@@ -0,0 +1,38 @@
+import json
+import os
+from loguru import logger
+import sys
+import time
+from utils.sync_mysql_help import mysql
+import utils.params as DecodeWorkflowParam
+
+
+
+
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+def update_topic_result_by_id(param:DecodeWorkflowParam):
+    sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
+    tasks = mysql.fetchone(sql, (param.task_id,))
+    if not tasks:
+        logger.info(f"task_id = {param.task_id} , 任务不存在")
+        return None
+    result, status,error_reason,video_url,title = tasks['result'], tasks['task_status'],tasks['error_reason'],tasks['video_url'],tasks['title']
+    sql = "UPDATE decode_workflow SET video_url = %s, task_status = %s, title = %s WHERE task_id = %s"
+    mysql.execute(sql, (param.video_url, param.status, param.title, param.task_id))
+    return result, status,error_reason,video_url,title
+
+
+def get_topic_result_by_id(task_id:str):
+    sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
+    tasks = mysql.fetchone(sql, (task_id,))
+    if not tasks:
+        logger.info(f"task_id = {task_id} , 任务不存在")
+        return None
+    return tasks['result'], tasks['task_status'],tasks['error_reason']
+
+
+# if __name__ == "__main__":
+#     decode_task_status_handler()
+   

+ 85 - 6
main.py

@@ -4,18 +4,21 @@ import uuid
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import JSONResponse
 from pydantic import BaseModel
-from utils.params import TaskStatusParam, DecodeListParam,EvaluateListParam,EvaluateStatusParam
+from utils.params import TaskStatusParam, DecodeListParam,TopicListParam,EvaluateListParam,EvaluateStatusParam,DecodeWorkflowParam
 from dotenv import load_dotenv, find_dotenv
 
 
 from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db
 from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db
+from decode_task.topicTask import get_topic_result_by_id as get_topic_result_by_id_db
+from decode_task.topicTask import update_topic_result_by_id as update_topic_result_by_id_db
 
 
 
 from typing import List
 from models.decode_record import DecodeRecord
 from models.evaluate_record import EvaluateRecord
+from models.decode_workflow import DecodeWorkflow
 
 from task_schedule import TaskScheduler
 
@@ -43,8 +46,88 @@ def startup_event():
 
 
 
+@app.post("/decodeWorkflow/create")
+def decode_topic(param:TopicListParam):
+    video_list = param.video_list
+    logger.info(f"数据池数据 = {video_list}")
+    data_list = []
+    for video in video_list:
+        video_id = video.channel_content_id
+        video_url = video.video
+        video_title = video.title
+        task_id = str(uuid.uuid4())
+        DecodeWorkflow(
+            task_id=task_id,
+            video_id=video_id,
+            video_url=video_url,
+            title=video_title,
+            type=param.type,
+            result=None,
+            task_status = 0
+        ).save()
+        data_list.append({
+            "task_id": task_id,
+            "video_id": video_id,
+            "video_url": video_url,
+            "title": video_title,
+            
+        })
+    return {
+        "code": 0,
+        "message": "success",
+        "data":{
+            "type": param.type,
+            "tasks": data_list
+        } 
+    }
+
+@app.post("/decodeWorkflow/update")
+def update_topic_result(param: DecodeWorkflowParam):
+    db_res = update_topic_result_by_id_db(param)
+    logger.info(f"\n查询结构结果的task_id = {param.task_id}")
+
+    if not db_res:
+        return {
+            "code": -1,
+            "message": '任务不存在',
+            "data": None
+        }
+    result, status,error_reason,video_url,title = db_res
+    return {
+        "code": 0,
+        "message": status == 2 and "success" or error_reason,
+        "data": {
+            "result": result,
+            "status": status,
+            "error":error_reason,
+            "video_url":video_url,
+            "title":title,
+        }   
+    }
+    
+@app.post("/decodeWorkflow/result")
+def get_topic_result(param: TaskStatusParam):
+    db_res = get_topic_result_by_id_db(param.task_id)
+    logger.info(f"\n查询结构结果的task_id = {param.task_id}")
+
+    if not db_res:
+        return {
+            "code": -1,
+            "message": '任务不存在',
+            "data": None
+        }
+    result, status,error_reason = db_res
+    return {
+        "code": 0,
+        "message": status == 2 and "success" or error_reason,
+        "data": {
+            "result": result,
+            "status": status,
+            "error":error_reason,
+        } 
+    }
+    
 
-  
 
 
 @app.post("/decodeVideo/create")
@@ -73,9 +156,6 @@ def decode_video(param:DecodeListParam):
         "data": data_list
     }
 
-
-
-
 @app.post("/decode/result")
 def get_decode_result(param: TaskStatusParam):
     db_res = get_decode_result_by_id_db(param.task_id)
@@ -99,7 +179,6 @@ def get_decode_result(param: TaskStatusParam):
         } 
     }
     
-
 @app.post("/evaluate/create")
 def evaluate_video(param:EvaluateListParam):
 

+ 34 - 0
models/decode_workflow.py

@@ -0,0 +1,34 @@
+from typing import Optional
+
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from utils.general import get_now_ts
+from utils.sync_mysql_help import mysql
+
+class DecodeWorkflow(BaseModel):
+    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_workflow'
+
+    task_id:          Annotated[str, Field(description='任务ID')]
+    video_id:         Annotated[Optional[str], Field(description='视频ID')]
+    video_url:        Annotated[str, Field(description='视频地址')]
+    title:            Annotated[Optional[str], Field(description='视频标题')]
+    task_status:      Annotated[Optional[int], Field(description='任务状态', default=0)]  # 0:待执行  /   1:执行中  /  2:执行成功  3:执行失败
+    result:           Annotated[Optional[str], Field(description='任务结果')]
+    create_at:        Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
+    error_reason:     Annotated[Optional[str], Field(description='错误原因')]
+    type:             Annotated[Optional[int], Field(description='任务类型', default=1)]  # 0:解码任务  /  1:评估任务
+
+    def save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        mysql.execute(sql, tuple([record[key] for key in keys]))
+
+    async def async_save(self):
+    
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        await mysql.execute(sql, tuple([record[key] for key in keys]))

+ 9 - 9
task_schedule.py

@@ -1,6 +1,6 @@
 from apscheduler.schedulers.background import BackgroundScheduler
-from apscheduler.triggers.interval import IntervalTrigger
-from decode_task.decodeTask import  decode_task_status_handler
+# from apscheduler.triggers.interval import IntervalTrigger
+# from decode_task.decodeTask import  decode_task_status_handler
 from loguru import logger
 
 
@@ -12,13 +12,13 @@ class TaskScheduler:
     def setup_jobs(self):
         """设置定时任务"""
 
-        self.scheduler.add_job(
-            func=decode_task_status_handler,
-            trigger=IntervalTrigger(seconds=30),
-            id='decode_task_handler',
-            name='decode_task任务刷新',
-            max_instances=1  # 防止重复执行
-        )
+        # self.scheduler.add_job(
+        #     func=decode_task_status_handler,
+        #     trigger=IntervalTrigger(seconds=30),
+        #     id='decode_task_handler',
+        #     name='decode_task任务刷新',
+        #     max_instances=1  # 防止重复执行
+        # )
 
         # self.scheduler.add_job(
         #     func=script_task_status_handler,

+ 9 - 3
utils/params.py

@@ -1,5 +1,5 @@
 from pydantic import BaseModel
-from typing import List
+from typing import List, Optional
 
 
 
@@ -16,11 +16,17 @@ class DecodeParam(BaseModel):
 class DecodeWorkflowParam(BaseModel):
     video_id: str
     video_url: str
-    task_id: str
+    title: str
+    task_status: Optional[int] = 0
+
 
 class DecodeListParam(BaseModel):
     video_list: List[DecodeParam]
 
+class TopicListParam(BaseModel):
+    video_list: List[DecodeWorkflowParam]
+    type: Optional[int] = 1
+
 class EvaluateWorkflowParam(BaseModel):
     video_id: str
     channel_content_id: str
@@ -28,4 +34,4 @@ class EvaluateWorkflowParam(BaseModel):
     status: int
     result: str
 class EvaluateListParam(BaseModel):
-    evaluate_list: List[EvaluateWorkflowParam]
+    evaluate_list: List[EvaluateWorkflowParam]