| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- """
- 一个使用 google-genai SDK 向 Gemini API 发送请求的客户端。
- 该脚本提供了一个与Gemini模型交互的函数,支持文本提示和多种文件输入
- (例如本地文件路径、文件字节流、网络URL)。
- **依赖库:**
- - google-genai: Google官方最新的GenAI Python SDK。
- 安装命令: pip install google-genai
- - filetype: 用于从文件内容识别MIME类型。
- 安装命令: pip install filetype
- """
- import os
- import sys
- import logging
- import urllib.request
- from typing import List, Union, Optional, Dict, Any
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from google import genai
- from google.genai import types
- from google.genai.errors import APIError
- import filetype
- current_dir = os.path.dirname(os.path.abspath(__file__))
- root_dir = os.path.dirname(current_dir)
- sys.path.insert(0, root_dir)
- from utils import llm_account_helper
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- def generate_text(
- prompt: str,
- model: str = 'gemini-2.5-flash',
- files_input: Optional[List[Union[str, bytes]]] = None
- ) -> str:
- """
- 向Gemini API发送一个包含提示和可选文件(本地路径、URL或字节流)的请求。
- Args:
- prompt (str): 必需的,给模型的文本提示。
- model (str): 可选的,要使用的模型名称。默认为 'gemini-2.5-flash'。
- files_input (Optional[List[Union[str, bytes]]]): 可选的,请求中要包含的文件列表。
- 列表中的每一项可以是:
- - 字符串(str): 本地文件路径,或者以 http://, https:// 开头的网络URL。
- - 字节(bytes): 内存中的文件内容。
- Returns:
- str: 从Gemini模型返回的文本响应。
- """
- api_key = llm_account_helper.get_api_key('gemini')
- client = genai.Client(api_key=api_key)
- uploaded_files_to_delete = []
- try:
- contents = []
- if files_input:
- logging.info(f"正在处理 {len(files_input)} 个文件输入...")
- for file_item in files_input:
- if isinstance(file_item, str):
- # --- 情况 A: 处理网络 URL ---
- if file_item.lower().startswith(('http://', 'https://')):
- logging.info(f"检测到 URL,正在下载: {file_item}")
- try:
- # 使用标准库 urllib 下载。添加 User-Agent 以避免部分服务器拒绝请求。
- req = urllib.request.Request(
- file_item,
- headers={'User-Agent': 'Mozilla/5.0 (Compatible; GeminiClient/1.0)'}
- )
- with urllib.request.urlopen(req) as response:
- file_data = response.read()
- # 下载成功后,复用下方的字节流处理逻辑
- mime_type = filetype.guess_mime(file_data)
- logging.info(f"URL 下载完成 (MIME: {mime_type}),已添加到请求。")
- contents.append(types.Part.from_bytes(data=file_data, mime_type=mime_type))
- except Exception as e:
- logging.error(f"下载 URL 失败: {e}")
- raise ValueError(f"无法处理 URL '{file_item}': {e}")
- continue # 处理完 URL 后跳过当前循环,避免进入本地文件判断
- # --- 情况 B: 处理本地文件路径 ---
- if not os.path.exists(file_item):
- raise FileNotFoundError(f"本地文件 '{file_item}' 不存在。")
- logging.info(f"正在上传本地文件: {file_item}")
- # 使用 File API 上传本地文件 (适合大文件)
- uploaded_file = client.files.upload(file=file_item)
- contents.append(uploaded_file)
- uploaded_files_to_delete.append(uploaded_file)
- elif isinstance(file_item, bytes):
- # --- 情况 C: 处理内存字节流 ---
- mime_type = filetype.guess_mime(file_item)
- logging.info(f"正在处理内存字节流 (MIME: {mime_type})")
- # 直接将小文件数据内嵌到请求中
- contents.append(types.Part.from_bytes(data=file_item, mime_type=mime_type))
- else:
- raise ValueError(
- f"不支持的输入类型: {type(file_item)}。仅支持本地路径(str)、URL(str)或字节流(bytes)。"
- )
- contents.append(prompt)
- logging.info(f"正在向模型 '{model}' 发送请求...")
- response = client.models.generate_content(
- model=model,
- contents=contents
- )
- return response.text
- except APIError as e:
- logging.error(f"Gemini API 调用错误: {e}")
- raise
- except Exception as e:
- logging.error(f"执行过程中发生未知错误: {e}")
- raise
- finally:
- # 清理通过 File API 上传的文件
- if uploaded_files_to_delete:
- logging.info(f"正在清理 {len(uploaded_files_to_delete)} 个已上传的临时文件...")
- for f in uploaded_files_to_delete:
- try:
- client.files.delete(name=f.name)
- except Exception as e:
- logging.warning(f"清理文件 {f.name} 失败: {e}")
- def concurrent_generate_text(
- prompt_file_pairs: List[Dict[str, Union[str, Optional[List[Union[str, bytes]]]]]],
- model: str = 'gemini-2.5-flash',
- max_workers: int = 10
- ) -> List[Dict[str, Any]]:
- """
- 并发执行多个Gemini请求,每个请求对应一个prompt和文件数组。
- Args:
- prompt_file_pairs (List[Dict[str, Union[str, Optional[List[Union[str, bytes]]]]]]):
- 包含多个prompt和对应文件列表的字典对象列表,每个字典应包含'prompt'和'files'键
- model (str): 要使用的模型名称。默认为 'gemini-2.5-flash'
- max_workers (int): 最大并发线程数。默认为 10
- Returns:
- List[Dict[str, Any]]: 每个元素是包含成功失败状态、失败原因、返回数据的字典对象
- 结果数组中元素的位置与输入的prompt_file_pairs位置一一对应
- """
- results = [None] * len(prompt_file_pairs) # 预先分配结果列表,保持与输入顺序一致
- def process_single_request(pair_idx: int, prompt: str, files: Optional[List[Union[str, bytes]]]) -> Dict[str, Any]:
- try:
- response_text = generate_text(prompt=prompt, model=model, files_input=files)
- return {
- "success": True,
- "data": response_text,
- "error_message": None
- }
- except Exception as e:
- return {
- "success": False,
- "data": None,
- "error_message": str(e)
- }
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- future_to_index = {
- executor.submit(process_single_request, i, prompt_file_pairs[i]['prompt'], prompt_file_pairs[i].get('files')): i
- for i in range(len(prompt_file_pairs))
- }
- # 收集结果并按原始顺序放置
- for future in as_completed(future_to_index):
- idx = future_to_index[future]
- try:
- result = future.result()
- results[idx] = result
- except Exception as e:
- # 即使future执行出现异常,也确保在正确位置设置结果
- results[idx] = {
- "success": False,
- "data": None,
- "error_message": str(e)
- }
- return results
- if __name__ == '__main__':
- print("--- Gemini 请求客户端演示 (已支持 URL) ---")
- # try:
- # # 示例: 使用网络图片 URL
- # print("\n--- 运行示例: 多模态提示 (使用网络 URL) ---")
- # image_url = "https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png"
- # prompt = "这张图片里的 logo 是什么公司的?"
- #
- # print(f"[提示]: {prompt}")
- # print(f"[输入 URL]: {image_url}")
- #
- # response = generate_text(
- # prompt=prompt,
- # files_input=[image_url]
- # )
- # print(f"[Gemini 回复]:\n{response}")
- #
- # except Exception as e:
- # print(f"\n演示运行失败: {e}")
- # 示例: 使用并发请求
- print("\n--- 运行示例: 并发请求 ---")
- prompt_file_pairs = [
- {
- "prompt": "图片上是什么?",
- "files": ["https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png"] # 第一个请求不带文件
- },
- {
- "prompt": "图片中是谁?",
- "files": ["https://inews.gtimg.com/om_bt/O6k8mse8MT9ki8fba5c7RK1j1xLFqT-FFZ9RirryqjENkAA/641"] # 第二个请求不带文件
- }
- ]
- try:
- concurrent_results = concurrent_generate_text(
- prompt_file_pairs=prompt_file_pairs,
- model='gemini-2.5-flash'
- )
- for i, result in enumerate(concurrent_results):
- print(f"请求 {i+1}:")
- if result['success']:
- print(f" 成功: {result['data'][:50]}...") # 只显示前50个字符
- else:
- print(f" 失败: {result['error_message']}")
- except Exception as e:
- print(f"\n并发请求演示失败: {e}")
|