import uvicorn import asyncio from fastapi import FastAPI from pydantic import BaseModel from google_ai.generativeai_video import main app = FastAPI() api_keys = [ {'key': 'AIzaSyB2kjF2-S2B5cJiosx_LpApd227w33CVvs', 'locked': False}, {'key': 'AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8', 'locked': False}, ] lock = asyncio.Lock() class VideoRequest(BaseModel): video_path: str prompt: str mark: str sample_data: str async def get_available_api_key(): """获取一个未锁定的 API key,如果没有可用的则等待 5 秒后重试""" while True: async with lock: for key_data in api_keys: if not key_data['locked']: key_data['locked'] = True # 锁定该 key return key_data['key'] print( "没有可用的 API key,等待 5 秒后重试..." ) await asyncio.sleep( 5 ) async def release_api_key(api_key): """释放已锁定的 API key,并将其放到列表末尾""" async with lock: for i, key_data in enumerate( api_keys ): if key_data['key'] == api_key: key_data['locked'] = False # 释放该 key # 将释放的 key 移动到列表末尾 api_keys.append( api_keys.pop( i ) ) break @app.post("/process_video/") async def process_video(request: VideoRequest): """处理视频请求""" global api_key_index # 引用全局索引 video_path = request.video_path prompt = request.prompt mark = request.mark sample_data = request.sample_data api_key = await get_available_api_key() try: print("来一个请求,使用 API key:", api_key) result = await main(video_path, api_key, prompt, sample_data) return { "code": 0, "message": "视频处理成功", "result": result, "mark": str(mark) } except Exception as e: print(f"处理失败: {str(e)}") return { "code": 1, "message": f"处理失败: {e}", "result": f"处理失败: {e}", "mark": f"处理失败: {e}" } finally: await release_api_key( api_key ) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)