import asyncio from fastapi import FastAPI from pydantic import BaseModel from google_ai.generativeai_video import main app = FastAPI() api_keys = [ {'key': 'AIzaSyAa4H7YzIOiqA0QIDuO6kNErFBoWT-BEtc', 'locked': False}, {'key': 'AIzaSyC-IxUvJhbiKWA7uN1RTtVbOjt3pj_-Apc', 'locked': False}, {'key': 'AIzaSyB43Pvugb-CMkEXJUSubXEAxKxYflCPPNU', 'locked': False} ] lock = asyncio.Lock() class VideoRequest( BaseModel ): video_path: str async def get_available_api_key(): """获取一个未锁定的 API key,如果没有可用的则等待 60 秒后重试""" while True: # 无限循环,直到找到可用的 key async with lock: for key_data in api_keys: if not key_data['locked']: key_data['locked'] = True # 锁定该 key return key_data['key'] # 如果没有找到可用的 key,等待 60 秒后重试 print( "没有可用的 API key,等待 60 秒后重试..." ) await asyncio.sleep( 60 ) async def release_api_key(api_key): """释放已锁定的 API key""" async with lock: for key_data in api_keys: if key_data['key'] == api_key: key_data['locked'] = False # 释放该 key break @app.post( "/process_video/" ) async def process_video(request: VideoRequest): """处理视频请求""" video_path = request.video_path # 获取一个可用的 API key api_key = await get_available_api_key() try: print( "来一个请求,使用 API key:", api_key ) result, mark = await main( video_path, api_key ) return { "code": 0, "message": "视频处理成功", "result": result, "mark": mark } except Exception as e: print( f"视频处理失败: {str( e )}" ) return { "code": 1, "message": f"视频处理失败: {e}", "result": f"视频处理失败: {e}", "mark": f"视频处理失败: {e}" } finally: # 释放 API key,确保后续请求可以使用 await release_api_key( api_key )