| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import json
- import os
- from loguru import logger
- import sys
- import time
- from utils.sync_mysql_help import mysql
- from datetime import datetime
- logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
- def update_topic_result_by_id(param):
- sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
- tasks = mysql.fetchone(sql, (param.task_id,))
- if not tasks:
- logger.info(f"task_id = {param.task_id} , 任务不存在")
- return None
- # 取旧值用于返回
- status, video_id, video_url, title = tasks['task_status'], tasks['video_id'], tasks['video_url'], tasks['title']
- # 动态更新可传字段
- fields = []
- values = []
- if getattr(param, 'video_id', None):
- fields.append("video_id = %s")
- values.append(param.video_id)
- if getattr(param, 'video_url', None):
- fields.append("video_url = %s")
- values.append(param.video_url)
- if getattr(param, 'title', None) is not None:
- fields.append("title = %s")
- values.append(param.title)
- if getattr(param, 'task_status', None) is not None:
- fields.append("task_status = %s")
- values.append(param.task_status)
- if fields:
- sql = f"UPDATE decode_workflow SET {', '.join(fields)} WHERE task_id = %s"
- values.append(param.task_id)
- mysql.execute(sql, tuple(values))
- return video_id, status, video_url, title
- def get_user_count(name: str, day: str = None):
- if day:
- dt = datetime.strptime(day, "%Y-%m-%d")
- start_ts = int(dt.timestamp())
- else:
- now = int(time.time())
- lt = time.localtime(now)
- start_ts = int(time.mktime((lt.tm_year, lt.tm_mon, lt.tm_mday, 0, 0, 0, lt.tm_wday, lt.tm_yday, lt.tm_isdst)))
- end_ts = start_ts + 86400
- sql1 = (
- "SELECT COUNT(*) AS cnt FROM decode_workflow "
- "WHERE account = %s AND created_at IS NOT NULL AND created_at >= %s AND created_at < %s"
- )
- row1 = mysql.fetchone(sql1, (name, start_ts * 1000, end_ts * 1000))
- cnt1 = int(row1.get('cnt', 0)) if row1 else 0
- sql_has_create = (
- "SELECT 1 AS ok FROM information_schema.COLUMNS "
- "WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'decode_workflow' AND COLUMN_NAME = 'create'"
- )
- has_create = mysql.fetchone(sql_has_create)
- cnt2 = 0
- if has_create:
- sql2 = (
- "SELECT COUNT(*) AS cnt FROM decode_workflow "
- "WHERE account = %s AND `create` IS NOT NULL AND `create` >= %s AND `create` < %s"
- )
- row2 = mysql.fetchone(sql2, (name, start_ts, end_ts))
- cnt2 = int(row2.get('cnt', 0)) if row2 else 0
- cnt = cnt1 + cnt2
- logger.info(f"account = {name} , 任务数量 = {cnt}")
- return cnt
- def get_topic_result_by_id(task_id:str):
- sql = "SELECT * FROM decode_workflow WHERE task_id = %s"
- task = mysql.fetchone(sql, (task_id,))
- if not task:
- logger.info(f"task_id = {task_id} , 任务不存在")
- return None
- return task
- def search_topic_list(param=None):
- base_sql = "SELECT * FROM decode_workflow"
- conditions = []
- values = []
- if isinstance(param, dict):
- v_id = param.get("video_id")
- v_url = param.get("video_url")
- title = param.get("title")
- status = param.get("task_status")
- if v_id:
- conditions.append("video_id = %s")
- values.append(v_id)
- if v_url:
- conditions.append("video_url = %s")
- values.append(v_url)
- if title:
- conditions.append("title = %s")
- values.append(title)
- if status is not None:
- conditions.append("task_status = %s")
- values.append(status)
- sql = base_sql if not conditions else f"{base_sql} WHERE " + " AND ".join(conditions)
- tasks = mysql.fetchall(sql, tuple(values))
- if not tasks:
- logger.info(f"任务不存在")
- return None
- return tasks
- if __name__ == "__main__":
- get_user_count("liubin")
-
|