import json import os from loguru import logger import sys import time from utils.sync_mysql_help import mysql from datetime import datetime logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True) def update_topic_result_by_id(param): sql = "SELECT * FROM decode_workflow WHERE task_id = %s" tasks = mysql.fetchone(sql, (param.task_id,)) if not tasks: logger.info(f"task_id = {param.task_id} , 任务不存在") return None # 取旧值用于返回 status, video_id, video_url, title = tasks['task_status'], tasks['video_id'], tasks['video_url'], tasks['title'] # 动态更新可传字段 fields = [] values = [] if getattr(param, 'video_id', None): fields.append("video_id = %s") values.append(param.video_id) if getattr(param, 'video_url', None): fields.append("video_url = %s") values.append(param.video_url) if getattr(param, 'title', None) is not None: fields.append("title = %s") values.append(param.title) if getattr(param, 'task_status', None) is not None: fields.append("task_status = %s") values.append(param.task_status) if fields: sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s" values.append(param.task_id) mysql.execute(sql, tuple(values)) return video_id, status, video_url, title def get_user_count(name: str, day: str = None): if day: dt = datetime.strptime(day, "%Y-%m-%d") start_ts = int(dt.timestamp()) else: now = int(time.time()) lt = time.localtime(now) start_ts = int(time.mktime((lt.tm_year, lt.tm_mon, lt.tm_mday, 0, 0, 0, lt.tm_wday, lt.tm_yday, lt.tm_isdst))) end_ts = start_ts + 86400 sql1 = ( "SELECT COUNT(*) AS cnt FROM decode_workflow " "WHERE account = %s AND created_at IS NOT NULL AND created_at >= %s AND created_at < %s" ) row1 = mysql.fetchone(sql1, (name, start_ts * 1000, end_ts * 1000)) cnt1 = int(row1.get('cnt', 0)) if row1 else 0 sql_has_create = ( "SELECT 1 AS ok FROM information_schema.COLUMNS " "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'decode_workflow' AND COLUMN_NAME = 'create'" ) has_create = mysql.fetchone(sql_has_create) cnt2 = 0 if has_create: sql2 = ( "SELECT COUNT(*) AS cnt FROM decode_workflow " "WHERE account = %s AND `create` IS NOT NULL AND `create` >= %s AND `create` < %s" ) row2 = mysql.fetchone(sql2, (name, start_ts, end_ts)) cnt2 = int(row2.get('cnt', 0)) if row2 else 0 cnt = cnt1 + cnt2 logger.info(f"account = {name} , 任务数量 = {cnt}") return cnt def get_topic_result_by_id(task_id:str): sql = "SELECT * FROM decode_workflow WHERE task_id = %s" task = mysql.fetchone(sql, (task_id,)) if not task: logger.info(f"task_id = {task_id} , 任务不存在") return None return task def search_topic_list(param=None): base_sql = "SELECT * FROM decode_workflow" conditions = [] values = [] if isinstance(param, dict): v_id = param.get("video_id") v_url = param.get("video_url") title = param.get("title") status = param.get("task_status") if v_id: conditions.append("video_id = %s") values.append(v_id) if v_url: conditions.append("video_url = %s") values.append(v_url) if title: conditions.append("title = %s") values.append(title) if status is not None: conditions.append("task_status = %s") values.append(status) sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions) tasks = mysql.fetchall(sql, tuple(values)) if not tasks: logger.info(f"任务不存在") return None return tasks if __name__ == "__main__": get_user_count("liubin")