Explorar o código

refactor(workflow): 优化解码工作流参数和更新逻辑

将DecodeWorkflowParam的字段改为可选并添加task_id
在update_topic_result中增加字段回退逻辑
重构update_topic_result_by_id以支持动态更新字段
max_liu hai 1 día
pai
achega
e5e8b0a682
Modificáronse 3 ficheiros con 33 adicións e 12 borrados
  1. 23 5
      decode_task/topicTask.py
  2. 5 3
      main.py
  3. 5 4
      utils/params.py

+ 23 - 5
decode_task/topicTask.py

@@ -12,16 +12,34 @@ import utils.params as DecodeWorkflowParam
 
 logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
 
-def update_topic_result_by_id(param:DecodeWorkflowParam):
+def update_topic_result_by_id(param):
     sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
     tasks = mysql.fetchone(sql, (param.task_id,))
     if not tasks:
         logger.info(f"task_id = {param.task_id} , 任务不存在")
         return None
-    result, status,error_reason,video_url,title = tasks['result'], tasks['task_status'],tasks['error_reason'],tasks['video_url'],tasks['title']
-    sql = "UPDATE decode_workflow SET video_url = %s, task_status = %s, title = %s WHERE task_id = %s"
-    mysql.execute(sql, (param.video_url, param.status, param.title, param.task_id))
-    return result, status,error_reason,video_url,title
+    # 取旧值用于返回
+    result, status, error_reason, video_url, title = tasks['result'], tasks['task_status'], tasks['error_reason'], tasks['video_url'], tasks['title']
+    # 动态更新可传字段
+    fields = []
+    values = []
+    if getattr(param, 'video_id', None):
+        fields.append("video_id = %s")
+        values.append(param.video_id)
+    if getattr(param, 'video_url', None):
+        fields.append("video_url = %s")
+        values.append(param.video_url)
+    if getattr(param, 'title', None) is not None:
+        fields.append("title = %s")
+        values.append(param.title)
+    if getattr(param, 'task_status', None) is not None:
+        fields.append("task_status = %s")
+        values.append(param.task_status)
+    if fields:
+        sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s"
+        values.append(param.task_id)
+        mysql.execute(sql, tuple(values))
+    return result, status, error_reason, video_url, title
 
 
 def get_topic_result_by_id(task_id:str):

+ 5 - 3
main.py

@@ -107,13 +107,15 @@ def update_topic_result(param: DecodeWorkflowParam):
     result, status,error_reason,video_url,title = db_res
     return {
         "code": 0,
-        "message": status == 2 and "success" or error_reason,
+        "message": status == 2 and "success" or (error_reason or "success"),
         "data": {
             "result": result,
             "status": status,
             "error":error_reason,
-            "video_url":video_url,
-            "title":title,
+            "video_url": param.video_url or video_url,
+            "title": param.title or title,
+            "task_status": param.task_status if param.task_status is not None else status,
+            "video_id": param.video_id,
         }   
     }
     

+ 5 - 4
utils/params.py

@@ -14,10 +14,11 @@ class DecodeParam(BaseModel):
     title: str
 
 class DecodeWorkflowParam(BaseModel):
-    video_id: str
-    video_url: str
-    title: str
-    task_status: Optional[int] = 0
+    task_id: Optional[str] = None
+    video_id: Optional[str] = None
+    video_url: Optional[str] = None
+    title: Optional[str] = None
+    task_status: Optional[int] = None
 
 
 class DecodeListParam(BaseModel):