topicTask.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import json
  2. import os
  3. from loguru import logger
  4. import sys
  5. import time
  6. from utils.sync_mysql_help import mysql
  7. import utils.params as DecodeWorkflowParam
  8. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  9. def update_topic_result_by_id(param):
  10. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  11. tasks = mysql.fetchone(sql, (param.task_id,))
  12. if not tasks:
  13. logger.info(f"task_id = {param.task_id} , 任务不存在")
  14. return None
  15. # 取旧值用于返回
  16. status, video_id, video_url, title = tasks['task_status'], tasks['video_id'], tasks['video_url'], tasks['title']
  17. # 动态更新可传字段
  18. fields = []
  19. values = []
  20. if getattr(param, 'video_id', None):
  21. fields.append("video_id = %s")
  22. values.append(param.video_id)
  23. if getattr(param, 'video_url', None):
  24. fields.append("video_url = %s")
  25. values.append(param.video_url)
  26. if getattr(param, 'title', None) is not None:
  27. fields.append("title = %s")
  28. values.append(param.title)
  29. if getattr(param, 'task_status', None) is not None:
  30. fields.append("task_status = %s")
  31. values.append(param.task_status)
  32. if fields:
  33. sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s"
  34. values.append(param.task_id)
  35. mysql.execute(sql, tuple(values))
  36. return video_id, status, video_url, title
  37. def get_user_count(name: str):
  38. now = int(time.time())
  39. lt = time.localtime(now)
  40. start_of_day = time.mktime((lt.tm_year, lt.tm_mon, lt.tm_mday, 0, 0, 0, lt.tm_wday, lt.tm_yday, lt.tm_isdst))
  41. start_ts = int(start_of_day)
  42. end_ts = start_ts + 86400
  43. sql = "SELECT COUNT(*) FROM decode_workflow WHERE account = %s AND created_at >= %s AND created_at < %s"
  44. count = mysql.fetchall(sql, (name, start_ts, end_ts))
  45. if not count:
  46. logger.info(f"account = {name} , 任务不存在")
  47. return None
  48. return count[0][0]
  49. def get_topic_result_by_id(task_id:str):
  50. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  51. task = mysql.fetchone(sql, (task_id,))
  52. if not task:
  53. logger.info(f"task_id = {task_id} , 任务不存在")
  54. return None
  55. return task
  56. def search_topic_list(param=None):
  57. base_sql = "SELECT * FROM decode_workflow"
  58. conditions = []
  59. values = []
  60. if isinstance(param, dict):
  61. v_id = param.get("video_id")
  62. v_url = param.get("video_url")
  63. title = param.get("title")
  64. status = param.get("task_status")
  65. if v_id:
  66. conditions.append("video_id = %s")
  67. values.append(v_id)
  68. if v_url:
  69. conditions.append("video_url = %s")
  70. values.append(v_url)
  71. if title:
  72. conditions.append("title = %s")
  73. values.append(title)
  74. if status is not None:
  75. conditions.append("task_status = %s")
  76. values.append(status)
  77. sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions)
  78. tasks = mysql.fetchall(sql, tuple(values))
  79. if not tasks:
  80. logger.info(f"任务不存在")
  81. return None
  82. return tasks
  83. # if __name__ == "__main__":
  84. # decode_task_status_handler()