job_cover_method.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import re
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, wait
  4. from common.redis import get_pq_id
  5. from video_cover_method.cover_method import CoverMethod
  6. # 限制最大线程数为 10
  7. max_workers = 10
  8. def get_video_id():
  9. video_ids = set() # 使用集合去重
  10. for i in range(10):
  11. video_id = get_pq_id()
  12. if video_id:
  13. video_ids.add(video_id)
  14. else:
  15. return list( video_ids )
  16. return list( video_ids )
  17. def process_video_cover(video_id):
  18. try:
  19. # video_id = int(video_id)
  20. video_id = str(video_id).strip( "b'" ) # 去掉前面的 b 和两边的引号
  21. CoverMethod.cover_method(str(video_id))
  22. except Exception as e:
  23. print( f"视频ID:{video_id}处理任务时出现异常:{e}")
  24. def video_cover_task_start():
  25. with ThreadPoolExecutor( max_workers=max_workers) as executor:
  26. while True:
  27. try:
  28. video_ids = get_video_id()
  29. if not video_ids:
  30. print("没有数据等待30秒")
  31. time.sleep(30)
  32. continue
  33. # 提交所有任务并等待完成
  34. futures = [executor.submit( process_video_cover, video_id ) for video_id in video_ids]
  35. wait( futures ) # 等待所有任务完成
  36. except Exception as e:
  37. print(f"异常信息{e}")
  38. time.sleep(3)
  39. continue
  40. if __name__ == '__main__':
  41. video_cover_task_start()