main.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import os
  2. import sys
  3. import threading
  4. import schedule
  5. import time
  6. sys.path.append(os.getcwd())
  7. from common.db import MysqlHelper
  8. from video_capture.douyin.douyin_author.douyin_author import douyinAuthor
  9. from video_stitching.video_stitching import VideoStitching
  10. from datetime import datetime, timedelta
  11. def get_count():
  12. current_time = datetime.now()
  13. previous_time = current_time - timedelta(minutes=10)
  14. previous_time_str = previous_time.strftime("%Y-%m-%d %H:%M")
  15. select_user_sql = f"""select video_id from video_url where time >="{previous_time_str}";"""
  16. count = MysqlHelper.get_values(select_user_sql, "prod")
  17. if count:
  18. return True
  19. else:
  20. return False
  21. def check_token():
  22. while True:
  23. count = get_count()
  24. if count:
  25. time.sleep(1200) # 每20分钟检查一次
  26. else:
  27. douyinAuthor.get_token()
  28. # 创建并启动线程
  29. token_thread = threading.Thread(target=check_token)
  30. token_thread.start()
  31. # 每天下午4点定时启动 video_stitching 方法
  32. schedule.every().day.at("16:00").do(VideoStitching.video_stitching())
  33. while True:
  34. schedule.run_pending()
  35. time.sleep(1)