| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- import json
- import os
- from loguru import logger
- import sys
- import time
- from utils.sync_mysql_help import mysql
- import utils.params as DecodeWorkflowParam
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- def update_topic_result_by_id(param:DecodeWorkflowParam):
- sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
- tasks = mysql.fetchone(sql, (param.task_id,))
- if not tasks:
- logger.info(f"task_id = {param.task_id} , 任务不存在")
- return None
- result, status,error_reason,video_url,title = tasks['result'], tasks['task_status'],tasks['error_reason'],tasks['video_url'],tasks['title']
- sql = "UPDATE decode_workflow SET video_url = %s, task_status = %s, title = %s WHERE task_id = %s"
- mysql.execute(sql, (param.video_url, param.status, param.title, param.task_id))
- return result, status,error_reason,video_url,title
- def get_topic_result_by_id(task_id:str):
- sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
- tasks = mysql.fetchone(sql, (task_id,))
- if not tasks:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
- return tasks['result'], tasks['task_status'],tasks['error_reason']
- def search_topic_list(param:DecodeWorkflowParam):
- sql = "SELECT * FROM decode_workflow WHERE video_id = %s AND video_url = %s AND title = %s AND task_status = %s"
- tasks = mysql.fetchall(sql, (param.video_id,param.video_url,param.title,param.task_status))
- if not tasks:
- logger.info(f"任务不存在")
- return None
- return tasks
- # if __name__ == "__main__":
- # decode_task_status_handler()
-
|