topicTask.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import json
  2. import os
  3. from loguru import logger
  4. import sys
  5. import time
  6. from utils.sync_mysql_help import mysql
  7. import utils.params as DecodeWorkflowParam
  8. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  9. def update_topic_result_by_id(param:DecodeWorkflowParam):
  10. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  11. tasks = mysql.fetchone(sql, (param.task_id,))
  12. if not tasks:
  13. logger.info(f"task_id = {param.task_id} , 任务不存在")
  14. return None
  15. result, status,error_reason,video_url,title = tasks['result'], tasks['task_status'],tasks['error_reason'],tasks['video_url'],tasks['title']
  16. sql = "UPDATE decode_workflow SET video_url = %s, task_status = %s, title = %s WHERE task_id = %s"
  17. mysql.execute(sql, (param.video_url, param.status, param.title, param.task_id))
  18. return result, status,error_reason,video_url,title
  19. def get_topic_result_by_id(task_id:str):
  20. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  21. task = mysql.fetchone(sql, (task_id,))
  22. if not task:
  23. logger.info(f"task_id = {task_id} , 任务不存在")
  24. return None
  25. return task
  26. def search_topic_list(param=None):
  27. base_sql = "SELECT * FROM decode_workflow"
  28. conditions = []
  29. values = []
  30. if isinstance(param, dict):
  31. v_id = param.get("video_id")
  32. v_url = param.get("video_url")
  33. title = param.get("title")
  34. status = param.get("task_status")
  35. if v_id:
  36. conditions.append("video_id = %s")
  37. values.append(v_id)
  38. if v_url:
  39. conditions.append("video_url = %s")
  40. values.append(v_url)
  41. if title:
  42. conditions.append("title = %s")
  43. values.append(title)
  44. if status is not None:
  45. conditions.append("task_status = %s")
  46. values.append(status)
  47. sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions)
  48. tasks = mysql.fetchall(sql, tuple(values))
  49. if not tasks:
  50. logger.info(f"任务不存在")
  51. return None
  52. return tasks
  53. # if __name__ == "__main__":
  54. # decode_task_status_handler()