topicTask.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import json
  2. import os
  3. from loguru import logger
  4. import sys
  5. import time
  6. from utils.sync_mysql_help import mysql
  7. import utils.params as DecodeWorkflowParam
  8. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  9. def update_topic_result_by_id(param):
  10. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  11. tasks = mysql.fetchone(sql, (param.task_id,))
  12. if not tasks:
  13. logger.info(f"task_id = {param.task_id} , 任务不存在")
  14. return None
  15. # 取旧值用于返回
  16. result, status, error_reason, video_url, title = tasks['result'], tasks['task_status'], tasks['error_reason'], tasks['video_url'], tasks['title']
  17. # 动态更新可传字段
  18. fields = []
  19. values = []
  20. if getattr(param, 'video_id', None):
  21. fields.append("video_id = %s")
  22. values.append(param.video_id)
  23. if getattr(param, 'video_url', None):
  24. fields.append("video_url = %s")
  25. values.append(param.video_url)
  26. if getattr(param, 'title', None) is not None:
  27. fields.append("title = %s")
  28. values.append(param.title)
  29. if getattr(param, 'task_status', None) is not None:
  30. fields.append("task_status = %s")
  31. values.append(param.task_status)
  32. if fields:
  33. sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s"
  34. values.append(param.task_id)
  35. mysql.execute(sql, tuple(values))
  36. return result, status, error_reason, video_url, title
  37. def get_topic_result_by_id(task_id:str):
  38. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  39. task = mysql.fetchone(sql, (task_id,))
  40. if not task:
  41. logger.info(f"task_id = {task_id} , 任务不存在")
  42. return None
  43. return task
  44. def search_topic_list(param=None):
  45. base_sql = "SELECT * FROM decode_workflow"
  46. conditions = []
  47. values = []
  48. if isinstance(param, dict):
  49. v_id = param.get("video_id")
  50. v_url = param.get("video_url")
  51. title = param.get("title")
  52. status = param.get("task_status")
  53. if v_id:
  54. conditions.append("video_id = %s")
  55. values.append(v_id)
  56. if v_url:
  57. conditions.append("video_url = %s")
  58. values.append(v_url)
  59. if title:
  60. conditions.append("title = %s")
  61. values.append(title)
  62. if status is not None:
  63. conditions.append("task_status = %s")
  64. values.append(status)
  65. sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions)
  66. tasks = mysql.fetchall(sql, tuple(values))
  67. if not tasks:
  68. logger.info(f"任务不存在")
  69. return None
  70. return tasks
  71. # if __name__ == "__main__":
  72. # decode_task_status_handler()