import asyncio import uvicorn from datetime import datetime, timedelta from fastapi import FastAPI from pydantic import BaseModel import random from google_ai.generativeai_video import main app = FastAPI() # 初始化 API key 列表,每个 key 包含 '错误计数' 和 '锁定直到' 时间 api_keys = [ {'key': 'AIzaSyDs4FWRuwrEnQzu1M_Skio6NII6Mp4whAw', 'locked': False, 'locked_until': None, 'error_count': 0}, {'key': 'AIzaSyC-IxUvJhbiKWA7uN1RTtVbOjt3pj_-Apc', 'locked': False, 'locked_until': None, 'error_count': 0}, {'key': 'AIzaSyB43Pvugb-CMkEXJUSubXEAxKxYflCPPNU', 'locked': False, 'locked_until': None, 'error_count': 0}, {'key': 'AIzaSyAYgOeHS2NUlVuVe14rgNDaxXRNRt4smuY', 'locked': False, 'locked_until': None, 'error_count': 0}, {'key': 'AIzaSyAa4H7YzIOiqA0QIDuO6kNErFBoWT-BEtc', 'locked': False, 'locked_until': None, 'error_count': 0}, {'key': 'AIzaSyAUvBSpjFcm7b8FsgRUTG6anzoalDp9gYg', 'locked': False, 'locked_until': None, 'error_count': 0}, {'key': 'AIzaSyB43Pvugb-CMkEXJUSubXEAxKxYflCPPNU', 'locked': False, 'locked_until': None, 'error_count': 0} ] lock = asyncio.Lock() class VideoRequest(BaseModel): video_path: str async def get_available_api_key(): """获取一个未锁定且没有过期的 API key,如果没有可用的则随机解锁一个""" while True: async with lock: now = datetime.now() available_key = None for key_data in api_keys: if not key_data['locked'] and (key_data['locked_until'] is None or key_data['locked_until'] <= now): available_key = key_data['key'] key_data['locked'] = True break if available_key: return available_key else: # 所有 key 都被锁定,随机解锁一个,并重置错误计数 print("所有 API key 都被锁定,随机解锁一个并重置错误计数...") random_key_data = random.choice(api_keys) random_key_data['locked'] = False random_key_data['locked_until'] = None # 解除时间锁定 random_key_data['error_count'] = 0 # 清空错误计数 random_key_data['locked'] = True return random_key_data['key'] async def release_api_key(api_key, lock_until=None, reset_error_count=False): """释放已锁定的 API key,支持设置锁定时间并重置错误计数""" async with lock: for key_data in api_keys: if key_data['key'] == api_key: key_data['locked'] = False key_data['locked_until'] = lock_until # 设置锁定时间 if reset_error_count: key_data['error_count'] = 0 # 重置错误计数 break def get_next_midnight(): """计算下一天的 0 点""" now = datetime.now() next_midnight = (now + timedelta(days=1)).replace(hour=1, minute=0, second=0, microsecond=0) return next_midnight @app.post("/process_video/") async def process_video(request: VideoRequest): """处理视频请求""" video_path = request.video_path # 获取一个可用的 API key api_key = await get_available_api_key() try: print(f"来一个请求,使用 API key: {api_key}") result, mark = await main(video_path, api_key) if "too many values to unpack" in result: async with lock: for key_data in api_keys: if key_data['key'] == api_key: key_data['error_count'] += 1 print( f"API key {api_key} 的错误计数增加到 {key_data['error_count']}" ) # 如果错误计数达到 5 次,锁定该 key 到第二天 00:00 if key_data['error_count'] >= 5: lock_until = get_next_midnight() print( f"API key {api_key} 连续返回错误 5 次,锁定至 {lock_until}" ) await release_api_key( api_key, lock_until ) else: await release_api_key( api_key ) break return { "code": 0, "message": "视频处理失败: too many values to unpack", "result": result, "mark": mark } else: # 视频处理成功,重置错误计数 await release_api_key( api_key, reset_error_count=True ) return { "code": 0, "message": "视频处理成功", "result": result, "mark": mark } except ValueError as e: error_message = str(e) if "too many values to unpack" in error_message: # 增加该 key 的错误计数 async with lock: for key_data in api_keys: if key_data['key'] == api_key: key_data['error_count'] += 1 print(f"API key {api_key} 的错误计数增加到 {key_data['error_count']}") # 如果错误计数达到 10 次,锁定该 key 到第二天 0 点 if key_data['error_count'] >= 10: lock_until = get_next_midnight() print(f"API key {api_key} 连续返回错误 5 次,锁定至 {lock_until}") await release_api_key(api_key, lock_until) else: await release_api_key(api_key) break else: await release_api_key(api_key) return { "code": 1, "message": f"视频处理失败: {e}", "result": f"视频处理失败: {e}", "mark": f"视频处理失败: {e}" } except Exception as e: print(f"视频处理失败: {str(e)}") await release_api_key(api_key) return { "code": 1, "message": f"视频处理失败: {e}", "result": f"视频处理失败: {e}", "mark": f"视频处理失败: {e}" } finally: await release_api_key(api_key) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)