timed_task.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. import threading
  2. import schedule
  3. import time
  4. from common.db import MysqlHelper
  5. from video_capture.douyin.douyin_author.douyin_author import douyinAuthor
  6. from video_stitching.video_stitching import VideoStitching
  7. from datetime import datetime, timedelta
  8. def get_count():
  9. current_time = datetime.now()
  10. previous_time = current_time - timedelta(minutes=5)
  11. previous_time_str = previous_time.strftime("%Y-%m-%d %H:%M")
  12. select_user_sql = f"""select video_id from video_url where time >="{previous_time_str}";"""
  13. count = MysqlHelper.get_values(select_user_sql, "prod")
  14. if count:
  15. return True
  16. else:
  17. return False
  18. def check_token():
  19. while True:
  20. count = get_count()
  21. if count:
  22. time.sleep(600) # 每10分钟检查一次
  23. else:
  24. douyinAuthor.get_token()
  25. def start_video_stitching():
  26. VideoStitching.video_stitching()
  27. # 创建并启动线程
  28. token_thread = threading.Thread(target=check_token)
  29. token_thread.start()
  30. # 每天下午4点定时启动 video_stitching 方法
  31. schedule.every().day.at("16:00").do(start_video_stitching)
  32. while True:
  33. schedule.run_pending()
  34. time.sleep(1)