Преглед на файлове

feat(解码工作流): 添加搜索主题列表功能

新增search_topic_list函数用于根据条件查询解码工作流任务
在main.py中添加/decodeWorkflow/list接口调用该功能
max_liu преди 1 ден
родител
ревизия
81dfd3686a
променени са 2 файла, в които са добавени 67 реда и са изтрити 1 реда
  1. 10 0
      decode_task/topicTask.py
  2. 57 1
      main.py

+ 10 - 0
decode_task/topicTask.py

@@ -33,6 +33,16 @@ def get_topic_result_by_id(task_id:str):
     return tasks['result'], tasks['task_status'],tasks['error_reason']
 
 
+
+def search_topic_list(param:DecodeWorkflowParam):
+    sql = "SELECT * FROM decode_workflow WHERE video_id = %s AND video_url = %s AND title = %s AND task_status = %s"
+    tasks = mysql.fetchall(sql, (param.video_id,param.video_url,param.title,param.task_status))
+    if not tasks:
+        logger.info(f"任务不存在")
+        return None
+    return tasks  
+
+
 # if __name__ == "__main__":
 #     decode_task_status_handler()
    

+ 57 - 1
main.py

@@ -12,7 +12,9 @@ from dotenv import load_dotenv, find_dotenv
 from decode_task.decodeTask import get_decode_result_by_id as get_decode_result_by_id_db
 from decode_task.evaluateTask import get_evaluate_result_by_id as get_evaluate_result_by_id_db
 from decode_task.topicTask import get_topic_result_by_id as get_topic_result_by_id_db
+from decode_task.topicTask import get_topic_list as get_topic_list_db
 from decode_task.topicTask import update_topic_result_by_id as update_topic_result_by_id_db
+from decode_task.topicTask import search_topic_list as search_topic_list_db
 
 
 
@@ -54,6 +56,44 @@ def startup_event():
 
 
 
+@app.post("/decodeWorkflow/create")
+def decode_list():
+    logger.info(f"数据池数据 = {video_list}")
+    data_list = []
+
+
+    for video in video_list:
+        video_id = video.video_id
+        video_url = video.video_url
+        video_title = video.title
+        task_id = str(uuid.uuid4())
+        DecodeWorkflow(
+            task_id=task_id,
+            video_id=video_id,
+            video_url=video_url,
+            title=video_title,
+            type=param.type,
+            result=None,
+        ).save()
+        data_list.append({
+            "task_id": task_id,
+            "video_id": video_id,
+            "video_url": video_url,
+            "title": video_title,
+            "task_status": 0,
+            "type": param.type,
+        })
+    return {
+        "code": 0,
+        "message": "success",
+        "data":{
+            "type": param.type,
+            "tasks": data_list
+        } 
+    }
+
+
+
 @app.post("/decodeWorkflow/create")
 def decode_topic(param:TopicListParam):
     video_list = param.video_list
@@ -136,7 +176,23 @@ def get_topic_result(param: TaskStatusParam):
             "error":error_reason,
         } 
     }
-    
+  
+
+
+@app.post("/decodeWorkflow/list")
+def search_topic_list(param:DecodeWorkflowParam):
+    tasks = search_topic_list_db(param)
+    if not tasks:
+        return {
+            "code": -1,
+            "message": "任务不存在",
+            "data": []
+        }
+    return {
+        "code": 0,
+        "message": "success",
+        "data": tasks
+    }