import os import sys import threading import schedule import time import functools sys.path.append(os.getcwd()) from common.db import MysqlHelper from video_capture.douyin.douyin_author.douyin_author import douyinAuthor from video_stitching.video_stitching import VideoStitching from datetime import datetime, timedelta def get_count(): current_time = datetime.now() previous_time = current_time - timedelta(minutes=10) previous_time_str = previous_time.strftime("%Y-%m-%d %H:%M") select_user_sql = f"""select video_id from video_url where time >="{previous_time_str}";""" count = MysqlHelper.get_values(select_user_sql, "prod") if count: return True else: return False def check_token(): while True: count = get_count() if count: time.sleep(1200) # 每20分钟检查一次 else: douyinAuthor.get_token() # 创建并启动线程 token_thread = threading.Thread(target=check_token) token_thread.start() # 每天下午4点定时启动 video_stitching 方法 schedule.every().day.at("16:00").do(functools.partial(VideoStitching.video_stitching)) while True: schedule.run_pending() time.sleep(1)