import asyncio import uvicorn from fastapi import FastAPI from pydantic import BaseModel from google_ai.generativeai_video import main app = FastAPI() api_keys = [ {'key': 'AIzaSyDs4FWRuwrEnQzu1M_Skio6NII6Mp4whAw', 'locked': False}, {'key': 'AIzaSyC-IxUvJhbiKWA7uN1RTtVbOjt3pj_-Apc', 'locked': False}, {'key': 'AIzaSyB43Pvugb-CMkEXJUSubXEAxKxYflCPPNU', 'locked': False}, {'key': 'AIzaSyAYgOeHS2NUlVuVe14rgNDaxXRNRt4smuY', 'locked': False}, {'key': 'AIzaSyAa4H7YzIOiqA0QIDuO6kNErFBoWT-BEtc', 'locked': False}, {'key': 'AIzaSyAUvBSpjFcm7b8FsgRUTG6anzoalDp9gYg', 'locked': False}, {'key': 'AIzaSyB43Pvugb-CMkEXJUSubXEAxKxYflCPPNU', 'locked': False} ] lock = asyncio.Lock() class VideoRequest(BaseModel): video_path: str prompt: str mark: str async def get_available_api_key(): """获取一个未锁定的 API key,如果没有可用的则等待 60 秒后重试""" while True: async with lock: for key_data in api_keys: if not key_data['locked']: key_data['locked'] = True # 锁定该 key return key_data['key'] print("没有可用的 API key,等待 60 秒后重试...") await asyncio.sleep(60) async def release_api_key(api_key): """释放已锁定的 API key,并将其放到列表末尾""" async with lock: for i, key_data in enumerate(api_keys): if key_data['key'] == api_key: key_data['locked'] = False # 释放该 key # 将释放的 key 移动到列表末尾 api_keys.append(api_keys.pop(i)) break @app.post("/process_video/") async def process_video(request: VideoRequest): """处理视频请求""" video_path = request.video_path prompt = request.prompt mark = request.mark # 获取一个可用的 API key # api_key = await get_available_api_key() api_key = "AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8" try: print("来一个请求,使用 API key:", api_key) result, mark = await main(video_path, api_key, prompt, mark) return { "code": 0, "message": "视频处理成功", "result": result, "mark": mark } except Exception as e: print(f"视频处理失败: {str(e)}") return { "code": 1, "message": f"视频处理失败: {e}", "result": f"视频处理失败: {e}", "mark": f"视频处理失败: {e}" } # finally: # # 释放 API key,确保后续请求可以使用 # await release_api_key(api_key) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)