#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 简单的 Gemini 处理器 用于满足导入需求,实际功能可以根据需要扩展 """ import os import json from typing import Any, Dict, Optional from dotenv import load_dotenv import google.generativeai as genai class GeminiProcessor: """Gemini API 处理器""" def __init__(self): # 加载环境变量 load_dotenv() # 获取API密钥 self.api_key = os.getenv('GEMINI_API_KEY') if not self.api_key: raise ValueError("未找到GEMINI_API_KEY环境变量") # 配置Gemini genai.configure(api_key=self.api_key) def process(self, content: Any, system_prompt: str) -> Dict[str, Any]: try: # 处理输入内容格式 if isinstance(content, dict): # 将字典转换为JSON字符串 formatted_content = json.dumps(content, ensure_ascii=False) else: formatted_content = content # 创建带有 system_instruction 的模型实例 model_with_system = genai.GenerativeModel( 'gemini-2.5-flash', system_instruction=system_prompt ) # 调用 Gemini API response = model_with_system.generate_content( contents=formatted_content ) # 尝试解析 JSON 响应 return response.text except Exception as e: print(f"Gemini API 调用失败: {e}") return {"error": str(e), "content": content} def batch_process(self, contents: list, system_prompt: str) -> list: results = [] for content in contents: result = self.process(content, system_prompt) results.append(result) return results