""" 一个使用 google-genai SDK 向 Gemini API 发送请求的客户端。 该脚本提供了一个与Gemini模型交互的函数,支持文本提示和多种文件输入 (例如本地文件路径、文件字节流、网络URL)。 **依赖库:** - google-genai: Google官方最新的GenAI Python SDK。 安装命令: pip install google-genai - filetype: 用于从文件内容识别MIME类型。 安装命令: pip install filetype """ import os import sys import logging import urllib.request from typing import List, Union, Optional, Dict, Any from concurrent.futures import ThreadPoolExecutor, as_completed from google import genai from google.genai import types from google.genai.errors import APIError import filetype current_dir = os.path.dirname(os.path.abspath(__file__)) root_dir = os.path.dirname(current_dir) sys.path.insert(0, root_dir) from utils import llm_account_helper logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def generate_text( prompt: str, model: str = 'gemini-2.5-flash', files_input: Optional[List[Union[str, bytes]]] = None ) -> str: """ 向Gemini API发送一个包含提示和可选文件(本地路径、URL或字节流)的请求。 Args: prompt (str): 必需的,给模型的文本提示。 model (str): 可选的,要使用的模型名称。默认为 'gemini-2.5-flash'。 files_input (Optional[List[Union[str, bytes]]]): 可选的,请求中要包含的文件列表。 列表中的每一项可以是: - 字符串(str): 本地文件路径,或者以 http://, https:// 开头的网络URL。 - 字节(bytes): 内存中的文件内容。 Returns: str: 从Gemini模型返回的文本响应。 """ api_key = llm_account_helper.get_api_key('gemini') client = genai.Client(api_key=api_key) uploaded_files_to_delete = [] try: contents = [] if files_input: logging.info(f"正在处理 {len(files_input)} 个文件输入...") for file_item in files_input: if isinstance(file_item, str): # --- 情况 A: 处理网络 URL --- if file_item.lower().startswith(('http://', 'https://')): logging.info(f"检测到 URL,正在下载: {file_item}") try: # 使用标准库 urllib 下载。添加 User-Agent 以避免部分服务器拒绝请求。 req = urllib.request.Request( file_item, headers={'User-Agent': 'Mozilla/5.0 (Compatible; GeminiClient/1.0)'} ) with urllib.request.urlopen(req) as response: file_data = response.read() # 下载成功后,复用下方的字节流处理逻辑 mime_type = filetype.guess_mime(file_data) logging.info(f"URL 下载完成 (MIME: {mime_type}),已添加到请求。") contents.append(types.Part.from_bytes(data=file_data, mime_type=mime_type)) except Exception as e: logging.error(f"下载 URL 失败: {e}") raise ValueError(f"无法处理 URL '{file_item}': {e}") continue # 处理完 URL 后跳过当前循环,避免进入本地文件判断 # --- 情况 B: 处理本地文件路径 --- if not os.path.exists(file_item): raise FileNotFoundError(f"本地文件 '{file_item}' 不存在。") logging.info(f"正在上传本地文件: {file_item}") # 使用 File API 上传本地文件 (适合大文件) uploaded_file = client.files.upload(file=file_item) contents.append(uploaded_file) uploaded_files_to_delete.append(uploaded_file) elif isinstance(file_item, bytes): # --- 情况 C: 处理内存字节流 --- mime_type = filetype.guess_mime(file_item) logging.info(f"正在处理内存字节流 (MIME: {mime_type})") # 直接将小文件数据内嵌到请求中 contents.append(types.Part.from_bytes(data=file_item, mime_type=mime_type)) else: raise ValueError( f"不支持的输入类型: {type(file_item)}。仅支持本地路径(str)、URL(str)或字节流(bytes)。" ) contents.append(prompt) logging.info(f"正在向模型 '{model}' 发送请求...") response = client.models.generate_content( model=model, contents=contents ) return response.text except APIError as e: logging.error(f"Gemini API 调用错误: {e}") raise except Exception as e: logging.error(f"执行过程中发生未知错误: {e}") raise finally: # 清理通过 File API 上传的文件 if uploaded_files_to_delete: logging.info(f"正在清理 {len(uploaded_files_to_delete)} 个已上传的临时文件...") for f in uploaded_files_to_delete: try: client.files.delete(name=f.name) except Exception as e: logging.warning(f"清理文件 {f.name} 失败: {e}") def concurrent_generate_text( prompt_file_pairs: List[Dict[str, Union[str, Optional[List[Union[str, bytes]]]]]], model: str = 'gemini-2.5-flash', max_workers: int = 10 ) -> List[Dict[str, Any]]: """ 并发执行多个Gemini请求,每个请求对应一个prompt和文件数组。 Args: prompt_file_pairs (List[Dict[str, Union[str, Optional[List[Union[str, bytes]]]]]]): 包含多个prompt和对应文件列表的字典对象列表,每个字典应包含'prompt'和'files'键 model (str): 要使用的模型名称。默认为 'gemini-2.5-flash' max_workers (int): 最大并发线程数。默认为 10 Returns: List[Dict[str, Any]]: 每个元素是包含成功失败状态、失败原因、返回数据的字典对象 结果数组中元素的位置与输入的prompt_file_pairs位置一一对应 """ results = [None] * len(prompt_file_pairs) # 预先分配结果列表,保持与输入顺序一致 def process_single_request(pair_idx: int, prompt: str, files: Optional[List[Union[str, bytes]]]) -> Dict[str, Any]: try: response_text = generate_text(prompt=prompt, model=model, files_input=files) return { "success": True, "data": response_text, "error_message": None } except Exception as e: return { "success": False, "data": None, "error_message": str(e) } with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_index = { executor.submit(process_single_request, i, prompt_file_pairs[i]['prompt'], prompt_file_pairs[i].get('files')): i for i in range(len(prompt_file_pairs)) } # 收集结果并按原始顺序放置 for future in as_completed(future_to_index): idx = future_to_index[future] try: result = future.result() results[idx] = result except Exception as e: # 即使future执行出现异常,也确保在正确位置设置结果 results[idx] = { "success": False, "data": None, "error_message": str(e) } return results if __name__ == '__main__': print("--- Gemini 请求客户端演示 (已支持 URL) ---") # try: # # 示例: 使用网络图片 URL # print("\n--- 运行示例: 多模态提示 (使用网络 URL) ---") # image_url = "https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png" # prompt = "这张图片里的 logo 是什么公司的?" # # print(f"[提示]: {prompt}") # print(f"[输入 URL]: {image_url}") # # response = generate_text( # prompt=prompt, # files_input=[image_url] # ) # print(f"[Gemini 回复]:\n{response}") # # except Exception as e: # print(f"\n演示运行失败: {e}") # 示例: 使用并发请求 print("\n--- 运行示例: 并发请求 ---") prompt_file_pairs = [ { "prompt": "图片上是什么?", "files": ["https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png"] # 第一个请求不带文件 }, { "prompt": "图片中是谁?", "files": ["https://inews.gtimg.com/om_bt/O6k8mse8MT9ki8fba5c7RK1j1xLFqT-FFZ9RirryqjENkAA/641"] # 第二个请求不带文件 } ] try: concurrent_results = concurrent_generate_text( prompt_file_pairs=prompt_file_pairs, model='gemini-2.5-flash' ) for i, result in enumerate(concurrent_results): print(f"请求 {i+1}:") if result['success']: print(f" 成功: {result['data'][:50]}...") # 只显示前50个字符 else: print(f" 失败: {result['error_message']}") except Exception as e: print(f"\n并发请求演示失败: {e}")