|  | @@ -1,30 +1,44 @@
 | 
	
		
			
				|  |  | -import asyncio
 | 
	
		
			
				|  |  |  import time
 | 
	
		
			
				|  |  | +from concurrent.futures import ThreadPoolExecutor, as_completed
 | 
	
		
			
				|  |  |  from common.redis import get_pq_id
 | 
	
		
			
				|  |  |  from video_cover_method.cover_method import CoverMethod
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -semaphore = asyncio.Semaphore(10)  # 限制并发为 10 个
 | 
	
		
			
				|  |  | +# 限制最大线程数为 10
 | 
	
		
			
				|  |  | +max_workers = 10
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -async def get_video_id():
 | 
	
		
			
				|  |  | -    while True:
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def get_video_id():
 | 
	
		
			
				|  |  | +    video_ids = []
 | 
	
		
			
				|  |  | +    for i in range(10):
 | 
	
		
			
				|  |  |          video_id = get_pq_id()
 | 
	
		
			
				|  |  |          if video_id:
 | 
	
		
			
				|  |  | -            return video_id
 | 
	
		
			
				|  |  | +            video_ids.append( video_id )
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  | -            print("没有获取待更改封面的视频ID,等待5秒")
 | 
	
		
			
				|  |  | -            await asyncio.sleep(5)
 | 
	
		
			
				|  |  | +           return video_ids
 | 
	
		
			
				|  |  | +    return video_ids
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def process_video_cover(video_id):
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +        CoverMethod.cover_method( int( video_id ) )
 | 
	
		
			
				|  |  | +    except Exception as e:
 | 
	
		
			
				|  |  | +        print( "处理任务时出现异常:", e)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -async def process_video_cover(video_id):
 | 
	
		
			
				|  |  | -    async with semaphore:  # 限制并发
 | 
	
		
			
				|  |  | -        try:
 | 
	
		
			
				|  |  | -            await CoverMethod.cover_method(int(video_id))
 | 
	
		
			
				|  |  | -        except Exception as e:
 | 
	
		
			
				|  |  | -            print("处理任务时出现异常:", e)
 | 
	
		
			
				|  |  | +def video_cover_task_start():
 | 
	
		
			
				|  |  | +    with ThreadPoolExecutor( max_workers=max_workers) as executor:
 | 
	
		
			
				|  |  | +        futures = []
 | 
	
		
			
				|  |  | +        while True:
 | 
	
		
			
				|  |  | +            video_ids = get_video_id()
 | 
	
		
			
				|  |  | +            if not video_ids:
 | 
	
		
			
				|  |  | +                print("没有数据等待30秒")
 | 
	
		
			
				|  |  | +                time.sleep(30)
 | 
	
		
			
				|  |  | +                continue
 | 
	
		
			
				|  |  | +            for video_id in video_ids:
 | 
	
		
			
				|  |  | +                futures.append( executor.submit( process_video_cover, video_id ) )
 | 
	
		
			
				|  |  | +                # 等待完成的任务并清理
 | 
	
		
			
				|  |  | +            for future in as_completed( futures ):
 | 
	
		
			
				|  |  | +                futures.remove( future )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -async def video_cover_task_start():
 | 
	
		
			
				|  |  | -    while True:
 | 
	
		
			
				|  |  | -        video_id = await get_video_id()
 | 
	
		
			
				|  |  | -        asyncio.create_task(process_video_cover(video_id))  # 创建异步任务
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  if __name__ == '__main__':
 | 
	
		
			
				|  |  | -    asyncio.run(video_cover_task_start())
 | 
	
		
			
				|  |  | +    video_cover_task_start()
 |