| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import json
- import os
- from loguru import logger
- import sys
- import time
- from utils.sync_mysql_help import mysql
- import utils.params as DecodeWorkflowParam
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- def update_topic_result_by_id(param):
- sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
- tasks = mysql.fetchone(sql, (param.task_id,))
- if not tasks:
- logger.info(f"task_id = {param.task_id} , 任务不存在")
- return None
- # 取旧值用于返回
- status, video_id, video_url, title = tasks['task_status'], tasks['video_id'], tasks['video_url'], tasks['title']
- # 动态更新可传字段
- fields = []
- values = []
- if getattr(param, 'video_id', None):
- fields.append("video_id = %s")
- values.append(param.video_id)
- if getattr(param, 'video_url', None):
- fields.append("video_url = %s")
- values.append(param.video_url)
- if getattr(param, 'title', None) is not None:
- fields.append("title = %s")
- values.append(param.title)
- if getattr(param, 'task_status', None) is not None:
- fields.append("task_status = %s")
- values.append(param.task_status)
- if fields:
- sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s"
- values.append(param.task_id)
- mysql.execute(sql, tuple(values))
- return video_id, status, video_url, title
- def get_user_count(name: str):
- now = int(time.time())
- lt = time.localtime(now)
- start_of_day = time.mktime((lt.tm_year, lt.tm_mon, lt.tm_mday, 0, 0, 0, lt.tm_wday, lt.tm_yday, lt.tm_isdst))
- start_ts = int(start_of_day)
- end_ts = start_ts + 86400
- sql = "SELECT COUNT(*) FROM decode_workflow WHERE account = %s AND created_at >= %s AND created_at < %s"
- count = mysql.fetchall(sql, (name, start_ts, end_ts))
- if not count:
- logger.info(f"account = {name} , 任务不存在")
- return None
- return count[0][0]
- def get_topic_result_by_id(task_id:str):
- sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
- task = mysql.fetchone(sql, (task_id,))
- if not task:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
- return task
- def search_topic_list(param=None):
- base_sql = "SELECT * FROM decode_workflow"
- conditions = []
- values = []
- if isinstance(param, dict):
- v_id = param.get("video_id")
- v_url = param.get("video_url")
- title = param.get("title")
- status = param.get("task_status")
- if v_id:
- conditions.append("video_id = %s")
- values.append(v_id)
- if v_url:
- conditions.append("video_url = %s")
- values.append(v_url)
- if title:
- conditions.append("title = %s")
- values.append(title)
- if status is not None:
- conditions.append("task_status = %s")
- values.append(status)
- sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions)
- tasks = mysql.fetchall(sql, tuple(values))
- if not tasks:
- logger.info(f"任务不存在")
- return None
- return tasks
- # if __name__ == "__main__":
- # decode_task_status_handler()
-
|