import json import os from loguru import logger import sys import time from utils.sync_mysql_help import mysql import utils.params as DecodeWorkflowParam logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def update_topic_result_by_id(param:DecodeWorkflowParam): sql = "SELECT * FROM decode_workflow WHERE task_id = %s" tasks = mysql.fetchone(sql, (param.task_id,)) if not tasks: logger.info(f"task_id = {param.task_id} , 任务不存在") return None result, status,error_reason,video_url,title = tasks['result'], tasks['task_status'],tasks['error_reason'],tasks['video_url'],tasks['title'] sql = "UPDATE decode_workflow SET video_url = %s, task_status = %s, title = %s WHERE task_id = %s" mysql.execute(sql, (param.video_url, param.status, param.title, param.task_id)) return result, status,error_reason,video_url,title def get_topic_result_by_id(task_id:str): sql = "SELECT * FROM decode_workflow WHERE task_id = %s" task = mysql.fetchone(sql, (task_id,)) if not task: logger.info(f"task_id = {task_id} , 任务不存在") return None return task def search_topic_list(param=None): base_sql = "SELECT * FROM decode_workflow" conditions = [] values = [] if isinstance(param, dict): v_id = param.get("video_id") v_url = param.get("video_url") title = param.get("title") status = param.get("task_status") if v_id: conditions.append("video_id = %s") values.append(v_id) if v_url: conditions.append("video_url = %s") values.append(v_url) if title: conditions.append("title = %s") values.append(title) if status is not None: conditions.append("task_status = %s") values.append(status) sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions) tasks = mysql.fetchall(sql, tuple(values)) if not tasks: logger.info(f"任务不存在") return None return tasks # if __name__ == "__main__": # decode_task_status_handler()