|
@@ -370,8 +370,12 @@ RUNNING_LOCK = asyncio.Lock()
|
|
|
|
|
|
def process_single_item(args):
|
|
def process_single_item(args):
|
|
"""处理单个数据项的函数,用于多进程 (模块级,便于pickle)"""
|
|
"""处理单个数据项的函数,用于多进程 (模块级,便于pickle)"""
|
|
- idx, item, request_id = args
|
|
|
|
|
|
+ idx, item, request_id, api_key = args
|
|
try:
|
|
try:
|
|
|
|
+ # 临时设置环境变量以使用指定的API密钥
|
|
|
|
+ original_api_key = os.getenv('GEMINI_API_KEY')
|
|
|
|
+ os.environ['GEMINI_API_KEY'] = api_key
|
|
|
|
+
|
|
crawl_data = item.get('crawl_data') or {}
|
|
crawl_data = item.get('crawl_data') or {}
|
|
content_id = item.get('content_id') or ''
|
|
content_id = item.get('content_id') or ''
|
|
task_id = item.get('task_id') or ''
|
|
task_id = item.get('task_id') or ''
|
|
@@ -461,6 +465,13 @@ def process_single_item(args):
|
|
"status": 3,
|
|
"status": 3,
|
|
"success": False
|
|
"success": False
|
|
}
|
|
}
|
|
|
|
+ finally:
|
|
|
|
+ # 恢复原始API密钥
|
|
|
|
+ if 'original_api_key' in locals():
|
|
|
|
+ if original_api_key is not None:
|
|
|
|
+ os.environ['GEMINI_API_KEY'] = original_api_key
|
|
|
|
+ else:
|
|
|
|
+ os.environ.pop('GEMINI_API_KEY', None)
|
|
|
|
|
|
|
|
|
|
def create_langgraph_workflow():
|
|
def create_langgraph_workflow():
|
|
@@ -503,10 +514,24 @@ def create_langgraph_workflow():
|
|
state["status"] = "completed"
|
|
state["status"] = "completed"
|
|
return state
|
|
return state
|
|
|
|
|
|
- # 准备多进程参数
|
|
|
|
- process_args = [(idx, item, state["request_id"]) for idx, item in enumerate(items, start=1)]
|
|
|
|
|
|
+ # 获取7个不同的GEMINI API密钥
|
|
|
|
+ api_keys = []
|
|
|
|
+ for i in range(1, 8): # GEMINI_API_KEY_1 到 GEMINI_API_KEY_7
|
|
|
|
+ api_key = os.getenv(f'GEMINI_API_KEY_{i}')
|
|
|
|
+ if api_key:
|
|
|
|
+ api_keys.append(api_key)
|
|
|
|
+ else:
|
|
|
|
+ logger.warning(f"未找到 GEMINI_API_KEY_{i},使用默认 GEMINI_API_KEY")
|
|
|
|
+ api_keys.append(os.getenv('GEMINI_API_KEY'))
|
|
|
|
+
|
|
|
|
+ # 准备多进程参数,为每个任务分配API密钥
|
|
|
|
+ process_args = []
|
|
|
|
+ for idx, item in enumerate(items, start=1):
|
|
|
|
+ # 循环使用7个API密钥
|
|
|
|
+ api_key = api_keys[(idx - 1) % 7]
|
|
|
|
+ process_args.append((idx, item, state["request_id"], api_key))
|
|
|
|
|
|
- # 使用3个进程并行处理,添加多进程保护
|
|
|
|
|
|
+ # 使用7个进程并行处理,添加多进程保护
|
|
if __name__ == '__main__' or multiprocessing.current_process().name == 'MainProcess':
|
|
if __name__ == '__main__' or multiprocessing.current_process().name == 'MainProcess':
|
|
# 设置多进程启动方法为 'spawn' 以避免 gRPC fork 问题
|
|
# 设置多进程启动方法为 'spawn' 以避免 gRPC fork 问题
|
|
original_start_method = multiprocessing.get_start_method()
|
|
original_start_method = multiprocessing.get_start_method()
|
|
@@ -515,8 +540,8 @@ def create_langgraph_workflow():
|
|
except RuntimeError:
|
|
except RuntimeError:
|
|
pass # 如果已经设置过,忽略错误
|
|
pass # 如果已经设置过,忽略错误
|
|
|
|
|
|
- with multiprocessing.Pool(processes=2) as pool:
|
|
|
|
- logger.info(f"开始多进程处理: 数量={len(process_args)}")
|
|
|
|
|
|
+ with multiprocessing.Pool(processes=7) as pool:
|
|
|
|
+ logger.info(f"开始多进程处理: 数量={len(process_args)}, 使用7个进程")
|
|
results = pool.map(process_single_item, process_args)
|
|
results = pool.map(process_single_item, process_args)
|
|
|
|
|
|
# 恢复原始启动方法
|
|
# 恢复原始启动方法
|