12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- import os
- import sys
- import threading
- import schedule
- import time
- import functools
- sys.path.append(os.getcwd())
- from common.db import MysqlHelper
- from video_capture.douyin.douyin_author.douyin_author import douyinAuthor
- from video_stitching.video_stitching import VideoStitching
- from datetime import datetime, timedelta
- def get_count():
- current_time = datetime.now()
- previous_time = current_time - timedelta(minutes=10)
- previous_time_str = previous_time.strftime("%Y-%m-%d %H:%M")
- select_user_sql = f"""select video_id from video_url where time >="{previous_time_str}";"""
- count = MysqlHelper.get_values(select_user_sql, "prod")
- if count:
- return True
- else:
- return False
- def check_token():
- while True:
- count = get_count()
- if count:
- time.sleep(1200) # 每20分钟检查一次
- else:
- douyinAuthor.get_token()
- # 创建并启动线程
- token_thread = threading.Thread(target=check_token)
- token_thread.start()
- # 每天下午4点定时启动 video_stitching 方法
- schedule.every().day.at("16:00").do(functools.partial(VideoStitching.video_stitching))
- while True:
- schedule.run_pending()
- time.sleep(1)
|