topicTask.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import json
  2. import os
  3. from loguru import logger
  4. import sys
  5. import time
  6. from utils.sync_mysql_help import mysql
  7. from datetime import datetime
  8. logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
  9. def update_topic_result_by_id(param):
  10. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  11. tasks = mysql.fetchone(sql, (param.task_id,))
  12. if not tasks:
  13. logger.info(f"task_id = {param.task_id} , 任务不存在")
  14. return None
  15. # 取旧值用于返回
  16. status, video_id, video_url, title = tasks['task_status'], tasks['video_id'], tasks['video_url'], tasks['title']
  17. # 动态更新可传字段
  18. fields = []
  19. values = []
  20. if getattr(param, 'video_id', None):
  21. fields.append("video_id = %s")
  22. values.append(param.video_id)
  23. if getattr(param, 'video_url', None):
  24. fields.append("video_url = %s")
  25. values.append(param.video_url)
  26. if getattr(param, 'title', None) is not None:
  27. fields.append("title = %s")
  28. values.append(param.title)
  29. if getattr(param, 'task_status', None) is not None:
  30. fields.append("task_status = %s")
  31. values.append(param.task_status)
  32. if fields:
  33. sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s"
  34. values.append(param.task_id)
  35. mysql.execute(sql, tuple(values))
  36. return video_id, status, video_url, title
  37. def get_user_count(name: str, day: str = None):
  38. if day:
  39. dt = datetime.strptime(day, "%Y-%m-%d")
  40. start_ts = int(dt.timestamp())
  41. else:
  42. now = int(time.time())
  43. lt = time.localtime(now)
  44. start_ts = int(time.mktime((lt.tm_year, lt.tm_mon, lt.tm_mday, 0, 0, 0, lt.tm_wday, lt.tm_yday, lt.tm_isdst)))
  45. end_ts = start_ts + 86400
  46. sql1 = (
  47. "SELECT COUNT(*) AS cnt FROM decode_workflow "
  48. "WHERE account = %s AND created_at IS NOT NULL AND created_at >= %s AND created_at < %s"
  49. )
  50. row1 = mysql.fetchone(sql1, (name, start_ts * 1000, end_ts * 1000))
  51. cnt1 = int(row1.get('cnt', 0)) if row1 else 0
  52. sql_has_create = (
  53. "SELECT 1 AS ok FROM information_schema.COLUMNS "
  54. "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'decode_workflow' AND COLUMN_NAME = 'create'"
  55. )
  56. has_create = mysql.fetchone(sql_has_create)
  57. cnt2 = 0
  58. if has_create:
  59. sql2 = (
  60. "SELECT COUNT(*) AS cnt FROM decode_workflow "
  61. "WHERE account = %s AND `create` IS NOT NULL AND `create` >= %s AND `create` < %s"
  62. )
  63. row2 = mysql.fetchone(sql2, (name, start_ts, end_ts))
  64. cnt2 = int(row2.get('cnt', 0)) if row2 else 0
  65. cnt = cnt1 + cnt2
  66. logger.info(f"account = {name} , 任务数量 = {cnt}")
  67. return cnt
  68. def get_topic_result_by_id(task_id:str):
  69. sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
  70. task = mysql.fetchone(sql, (task_id,))
  71. if not task:
  72. logger.info(f"task_id = {task_id} , 任务不存在")
  73. return None
  74. return task
  75. def search_topic_list(param=None):
  76. base_sql = "SELECT * FROM decode_workflow"
  77. conditions = []
  78. values = []
  79. if isinstance(param, dict):
  80. v_id = param.get("video_id")
  81. v_url = param.get("video_url")
  82. title = param.get("title")
  83. status = param.get("task_status")
  84. if v_id:
  85. conditions.append("video_id = %s")
  86. values.append(v_id)
  87. if v_url:
  88. conditions.append("video_url = %s")
  89. values.append(v_url)
  90. if title:
  91. conditions.append("title = %s")
  92. values.append(title)
  93. if status is not None:
  94. conditions.append("task_status = %s")
  95. values.append(status)
  96. sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions)
  97. tasks = mysql.fetchall(sql, tuple(values))
  98. if not tasks:
  99. logger.info(f"任务不存在")
  100. return None
  101. return tasks
  102. if __name__ == "__main__":
  103. get_user_count("liubin")