|
@@ -1,9 +1,8 @@
|
|
|
"""
|
|
"""
|
|
|
-NanoBanana Tool - 图像特征提取与图像生成
|
|
|
|
|
|
|
+NanoBanana Tool - 图像生成
|
|
|
|
|
|
|
|
-该工具可以提取图片中的特征,也可以根据描述生成图片。
|
|
|
|
|
-支持通过 OpenRouter 调用多模态模型,提取结构化的图像特征并保存为 JSON,
|
|
|
|
|
-或基于输入图像生成新的图像。
|
|
|
|
|
|
|
+通用图像生成工具,可以接受自然语言描述和/或图像输入,生成新的图像。
|
|
|
|
|
+支持通过 OpenRouter 调用 Gemini 2.5 Flash Image 模型。
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
import base64
|
|
import base64
|
|
@@ -22,23 +21,10 @@ from agent.tools import tool, ToolResult
|
|
|
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
|
|
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
|
|
|
DEFAULT_TIMEOUT = 120.0
|
|
DEFAULT_TIMEOUT = 120.0
|
|
|
|
|
|
|
|
-DEFAULT_EXTRACTION_PROMPT = (
|
|
|
|
|
- "请从这张图像中提取跨场景相对稳定、可复用的视觉不变特征。"
|
|
|
|
|
- "输出严格 JSON,字段包含:identity_features、pose_features、appearance_features、"
|
|
|
|
|
- "material_features、style_features、uncertainty、notes。"
|
|
|
|
|
- "每个字段给出简洁要点,避免臆测。"
|
|
|
|
|
-)
|
|
|
|
|
-
|
|
|
|
|
-DEFAULT_IMAGE_PROMPT = (
|
|
|
|
|
- "基于输入图像生成一张保留主体身份与关键视觉特征的新图像。"
|
|
|
|
|
- "保持人物核心特征一致,同时提升清晰度与可用性。"
|
|
|
|
|
-)
|
|
|
|
|
|
|
+DEFAULT_IMAGE_PROMPT = "根据输入生成图像。"
|
|
|
|
|
|
|
|
DEFAULT_IMAGE_MODEL_CANDIDATES = [
|
|
DEFAULT_IMAGE_MODEL_CANDIDATES = [
|
|
|
"google/gemini-2.5-flash-image",
|
|
"google/gemini-2.5-flash-image",
|
|
|
- # "google/gemini-3-pro-image-preview",
|
|
|
|
|
- # "black-forest-labs/flux.2-flex",
|
|
|
|
|
- # "black-forest-labs/flux.2-pro",
|
|
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
@@ -214,84 +200,76 @@ def _normalize_model_id(model_id: str) -> str:
|
|
|
return m
|
|
return m
|
|
|
|
|
|
|
|
|
|
|
|
|
-@tool(description="可以提取图片中的特征,也可以根据描述生成图片")
|
|
|
|
|
|
|
+@tool(description="通用图像生成工具,可以接受自然语言描述和/或图像输入,生成新的图像")
|
|
|
async def nanobanana(
|
|
async def nanobanana(
|
|
|
image_path: str = "",
|
|
image_path: str = "",
|
|
|
image_paths: Optional[List[str]] = None,
|
|
image_paths: Optional[List[str]] = None,
|
|
|
- output_file: Optional[str] = None,
|
|
|
|
|
prompt: Optional[str] = None,
|
|
prompt: Optional[str] = None,
|
|
|
model: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
|
max_tokens: int = 1200,
|
|
max_tokens: int = 1200,
|
|
|
- generate_image: bool = False,
|
|
|
|
|
image_output_path: Optional[str] = None,
|
|
image_output_path: Optional[str] = None,
|
|
|
) -> ToolResult:
|
|
) -> ToolResult:
|
|
|
"""
|
|
"""
|
|
|
- 可以提取图片中的特征,也可以根据描述生成图片。
|
|
|
|
|
|
|
+ 通用图像生成工具,可以接受自然语言描述和/或图像输入,生成新的图像。
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
image_path: 输入图片路径(单图模式,可选)
|
|
image_path: 输入图片路径(单图模式,可选)
|
|
|
- image_paths: 输入图片路径列表(多图整体模式,可选)
|
|
|
|
|
- output_file: 输出 JSON 文件路径(可选,用于特征提取模式)
|
|
|
|
|
- prompt: 自定义提取指令或生成描述(可选)
|
|
|
|
|
- model: OpenRouter 模型名(可选,默认读取 NANOBANANA_MODEL 或使用 Gemini 视觉模型)
|
|
|
|
|
|
|
+ image_paths: 输入图片路径列表(多图模式,可选)
|
|
|
|
|
+ prompt: 自定义生成描述(可选,默认使用通用prompt)
|
|
|
|
|
+ model: OpenRouter 模型名(可选,默认使用 gemini-2.5-flash-image)
|
|
|
max_tokens: 最大输出 token
|
|
max_tokens: 最大输出 token
|
|
|
- generate_image: 是否生成图片(False=提取特征,True=生成图片)
|
|
|
|
|
- image_output_path: 生成图片保存路径(generate_image=True 时可选)
|
|
|
|
|
|
|
+ image_output_path: 生成图片保存路径(可选)
|
|
|
|
|
|
|
|
Returns:
|
|
Returns:
|
|
|
- ToolResult: 包含结构化特征和输出文件路径,或生成的图片路径
|
|
|
|
|
|
|
+ ToolResult: 包含生成的图片路径
|
|
|
"""
|
|
"""
|
|
|
raw_paths: List[str] = []
|
|
raw_paths: List[str] = []
|
|
|
if image_paths:
|
|
if image_paths:
|
|
|
raw_paths.extend(image_paths)
|
|
raw_paths.extend(image_paths)
|
|
|
if image_path:
|
|
if image_path:
|
|
|
raw_paths.append(image_path)
|
|
raw_paths.append(image_path)
|
|
|
- if not raw_paths:
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 提取失败",
|
|
|
|
|
- output="",
|
|
|
|
|
- error="未提供输入图片,请传入 image_path 或 image_paths",
|
|
|
|
|
- )
|
|
|
|
|
|
|
|
|
|
- # 去重并检查路径
|
|
|
|
|
- unique_raw: List[str] = []
|
|
|
|
|
- seen = set()
|
|
|
|
|
- for p in raw_paths:
|
|
|
|
|
- if p and p not in seen:
|
|
|
|
|
- unique_raw.append(p)
|
|
|
|
|
- seen.add(p)
|
|
|
|
|
-
|
|
|
|
|
- input_paths: List[Path] = [Path(p) for p in unique_raw]
|
|
|
|
|
- invalid = [str(p) for p in input_paths if (not p.exists() or not p.is_file())]
|
|
|
|
|
- if invalid:
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 提取失败",
|
|
|
|
|
- output="",
|
|
|
|
|
- error=f"以下图片不存在或不可读: {invalid}",
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ # 图像输入是可选的,但如果提供了就需要验证
|
|
|
|
|
+ input_paths: List[Path] = []
|
|
|
|
|
+ if raw_paths:
|
|
|
|
|
+ # 去重并检查路径
|
|
|
|
|
+ unique_raw: List[str] = []
|
|
|
|
|
+ seen = set()
|
|
|
|
|
+ for p in raw_paths:
|
|
|
|
|
+ if p and p not in seen:
|
|
|
|
|
+ unique_raw.append(p)
|
|
|
|
|
+ seen.add(p)
|
|
|
|
|
+
|
|
|
|
|
+ input_paths = [Path(p) for p in unique_raw]
|
|
|
|
|
+ invalid = [str(p) for p in input_paths if (not p.exists() or not p.is_file())]
|
|
|
|
|
+ if invalid:
|
|
|
|
|
+ return ToolResult(
|
|
|
|
|
+ title="NanoBanana 生成失败",
|
|
|
|
|
+ output="",
|
|
|
|
|
+ error=f"以下图片不存在或不可读: {invalid}",
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
api_key = _resolve_api_key()
|
|
api_key = _resolve_api_key()
|
|
|
if not api_key:
|
|
if not api_key:
|
|
|
return ToolResult(
|
|
return ToolResult(
|
|
|
- title="NanoBanana 提取失败",
|
|
|
|
|
|
|
+ title="NanoBanana 生成失败",
|
|
|
output="",
|
|
output="",
|
|
|
error="未找到 OpenRouter API Key,请设置 OPENROUTER_API_KEY 或 OPEN_ROUTER_API_KEY",
|
|
error="未找到 OpenRouter API Key,请设置 OPENROUTER_API_KEY 或 OPEN_ROUTER_API_KEY",
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- if generate_image:
|
|
|
|
|
- user_prompt = prompt or DEFAULT_IMAGE_PROMPT
|
|
|
|
|
- else:
|
|
|
|
|
- chosen_model = model or os.getenv("NANOBANANA_MODEL") or "google/gemini-2.5-flash"
|
|
|
|
|
- user_prompt = prompt or DEFAULT_EXTRACTION_PROMPT
|
|
|
|
|
|
|
+ user_prompt = prompt or DEFAULT_IMAGE_PROMPT
|
|
|
|
|
|
|
|
- try:
|
|
|
|
|
- image_data_urls = [_image_to_data_url(p) for p in input_paths]
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 提取失败",
|
|
|
|
|
- output="",
|
|
|
|
|
- error=f"图片编码失败: {e}",
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ # 编码图像(如果有)
|
|
|
|
|
+ image_data_urls = []
|
|
|
|
|
+ if input_paths:
|
|
|
|
|
+ try:
|
|
|
|
|
+ image_data_urls = [_image_to_data_url(p) for p in input_paths]
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return ToolResult(
|
|
|
|
|
+ title="NanoBanana 生成失败",
|
|
|
|
|
+ output="",
|
|
|
|
|
+ error=f"图片编码失败: {e}",
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
user_content: List[Dict[str, Any]] = [{"type": "text", "text": user_prompt}]
|
|
user_content: List[Dict[str, Any]] = [{"type": "text", "text": user_prompt}]
|
|
|
for u in image_data_urls:
|
|
for u in image_data_urls:
|
|
@@ -301,11 +279,7 @@ async def nanobanana(
|
|
|
"messages": [
|
|
"messages": [
|
|
|
{
|
|
{
|
|
|
"role": "system",
|
|
"role": "system",
|
|
|
- "content": (
|
|
|
|
|
- "你是视觉助手。"
|
|
|
|
|
- "当任务为特征提取时输出 JSON 对象,不要输出 markdown。"
|
|
|
|
|
- "当任务为图像生成时请返回图像。"
|
|
|
|
|
- ),
|
|
|
|
|
|
|
+ "content": "你是图像生成助手。请根据用户的描述和/或输入图像生成新的图像。",
|
|
|
},
|
|
},
|
|
|
{
|
|
{
|
|
|
"role": "user",
|
|
"role": "user",
|
|
@@ -314,9 +288,8 @@ async def nanobanana(
|
|
|
],
|
|
],
|
|
|
"temperature": 0.2,
|
|
"temperature": 0.2,
|
|
|
"max_tokens": max_tokens,
|
|
"max_tokens": max_tokens,
|
|
|
|
|
+ "modalities": ["image", "text"],
|
|
|
}
|
|
}
|
|
|
- if generate_image:
|
|
|
|
|
- payload["modalities"] = ["image", "text"]
|
|
|
|
|
|
|
|
|
|
headers = {
|
|
headers = {
|
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Authorization": f"Bearer {api_key}",
|
|
@@ -327,33 +300,28 @@ async def nanobanana(
|
|
|
|
|
|
|
|
endpoint = f"{OPENROUTER_BASE_URL}/chat/completions"
|
|
endpoint = f"{OPENROUTER_BASE_URL}/chat/completions"
|
|
|
|
|
|
|
|
- # 图像生成模式:自动尝试多个可用模型,减少 404/invalid model 影响
|
|
|
|
|
- if generate_image:
|
|
|
|
|
- candidates: List[str] = []
|
|
|
|
|
- if model:
|
|
|
|
|
- candidates.append(_normalize_model_id(model))
|
|
|
|
|
- if env_model := os.getenv("NANOBANANA_IMAGE_MODEL"):
|
|
|
|
|
- candidates.append(_normalize_model_id(env_model))
|
|
|
|
|
- candidates.extend([_normalize_model_id(x) for x in DEFAULT_IMAGE_MODEL_CANDIDATES])
|
|
|
|
|
- # 去重并保持顺序
|
|
|
|
|
- dedup: List[str] = []
|
|
|
|
|
- seen = set()
|
|
|
|
|
- for m in candidates:
|
|
|
|
|
- if m and m not in seen:
|
|
|
|
|
- dedup.append(m)
|
|
|
|
|
- seen.add(m)
|
|
|
|
|
- candidates = dedup
|
|
|
|
|
- else:
|
|
|
|
|
- candidates = [chosen_model]
|
|
|
|
|
|
|
+ # 自动尝试多个可用模型,减少 404/invalid model 影响
|
|
|
|
|
+ candidates: List[str] = []
|
|
|
|
|
+ if model:
|
|
|
|
|
+ candidates.append(_normalize_model_id(model))
|
|
|
|
|
+ if env_model := os.getenv("NANOBANANA_IMAGE_MODEL"):
|
|
|
|
|
+ candidates.append(_normalize_model_id(env_model))
|
|
|
|
|
+ candidates.extend([_normalize_model_id(x) for x in DEFAULT_IMAGE_MODEL_CANDIDATES])
|
|
|
|
|
+ # 去重并保持顺序
|
|
|
|
|
+ dedup: List[str] = []
|
|
|
|
|
+ seen = set()
|
|
|
|
|
+ for m in candidates:
|
|
|
|
|
+ if m and m not in seen:
|
|
|
|
|
+ dedup.append(m)
|
|
|
|
|
+ seen.add(m)
|
|
|
|
|
+ candidates = dedup
|
|
|
|
|
|
|
|
data: Optional[Dict[str, Any]] = None
|
|
data: Optional[Dict[str, Any]] = None
|
|
|
used_model: Optional[str] = None
|
|
used_model: Optional[str] = None
|
|
|
errors: List[Dict[str, Any]] = []
|
|
errors: List[Dict[str, Any]] = []
|
|
|
|
|
|
|
|
for cand in candidates:
|
|
for cand in candidates:
|
|
|
- modality_attempts: List[Optional[List[str]]] = [None]
|
|
|
|
|
- if generate_image:
|
|
|
|
|
- modality_attempts = [["image", "text"], ["image"], None]
|
|
|
|
|
|
|
+ modality_attempts: List[Optional[List[str]]] = [["image", "text"], ["image"], None]
|
|
|
|
|
|
|
|
for mods in modality_attempts:
|
|
for mods in modality_attempts:
|
|
|
trial_payload = dict(payload)
|
|
trial_payload = dict(payload)
|
|
@@ -392,9 +360,8 @@ async def nanobanana(
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
if data is None:
|
|
if data is None:
|
|
|
- title = "NanoBanana 生成失败" if generate_image else "NanoBanana 提取失败"
|
|
|
|
|
return ToolResult(
|
|
return ToolResult(
|
|
|
- title=title,
|
|
|
|
|
|
|
+ title="NanoBanana 生成失败",
|
|
|
output=json.dumps({"attempted_models": candidates, "errors": errors}, ensure_ascii=False, indent=2),
|
|
output=json.dumps({"attempted_models": candidates, "errors": errors}, ensure_ascii=False, indent=2),
|
|
|
long_term_memory="All candidate models failed for this request",
|
|
long_term_memory="All candidate models failed for this request",
|
|
|
metadata={"attempted_models": candidates, "errors": errors},
|
|
metadata={"attempted_models": candidates, "errors": errors},
|
|
@@ -405,168 +372,115 @@ async def nanobanana(
|
|
|
choices = data.get("choices") or []
|
|
choices = data.get("choices") or []
|
|
|
message = choices[0].get("message", {}) if choices else {}
|
|
message = choices[0].get("message", {}) if choices else {}
|
|
|
|
|
|
|
|
- # 图像生成分支
|
|
|
|
|
- if generate_image:
|
|
|
|
|
- refs = _extract_image_refs(choices[0] if choices else {}, message)
|
|
|
|
|
- if not refs:
|
|
|
|
|
- content = message.get("content")
|
|
|
|
|
- preview = ""
|
|
|
|
|
- if isinstance(content, str):
|
|
|
|
|
- preview = content[:500]
|
|
|
|
|
- elif isinstance(content, list):
|
|
|
|
|
- preview = json.dumps(content[:3], ensure_ascii=False)[:500]
|
|
|
|
|
|
|
+ # 提取生成的图像
|
|
|
|
|
+ refs = _extract_image_refs(choices[0] if choices else {}, message)
|
|
|
|
|
+ if not refs:
|
|
|
|
|
+ content = message.get("content")
|
|
|
|
|
+ preview = ""
|
|
|
|
|
+ if isinstance(content, str):
|
|
|
|
|
+ preview = content[:500]
|
|
|
|
|
+ elif isinstance(content, list):
|
|
|
|
|
+ preview = json.dumps(content[:3], ensure_ascii=False)[:500]
|
|
|
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 生成失败",
|
|
|
|
|
- output=json.dumps(data, ensure_ascii=False, indent=2),
|
|
|
|
|
- error="模型未返回可解析图片(未在 message.images/choice.images/content 中发现图片)",
|
|
|
|
|
- metadata={
|
|
|
|
|
- "model": chosen_model,
|
|
|
|
|
- "choice_keys": list((choices[0] if choices else {}).keys()),
|
|
|
|
|
- "message_keys": list(message.keys()) if isinstance(message, dict) else [],
|
|
|
|
|
- "content_preview": preview,
|
|
|
|
|
- },
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ return ToolResult(
|
|
|
|
|
+ title="NanoBanana 生成失败",
|
|
|
|
|
+ output=json.dumps(data, ensure_ascii=False, indent=2),
|
|
|
|
|
+ error="模型未返回可解析图片(未在 message.images/choice.images/content 中发现图片)",
|
|
|
|
|
+ metadata={
|
|
|
|
|
+ "model": chosen_model,
|
|
|
|
|
+ "choice_keys": list((choices[0] if choices else {}).keys()),
|
|
|
|
|
+ "message_keys": list(message.keys()) if isinstance(message, dict) else [],
|
|
|
|
|
+ "content_preview": preview,
|
|
|
|
|
+ },
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
- output_paths: List[str] = []
|
|
|
|
|
- if image_output_path:
|
|
|
|
|
- base_path = Path(image_output_path)
|
|
|
|
|
|
|
+ output_paths: List[str] = []
|
|
|
|
|
+ if image_output_path:
|
|
|
|
|
+ base_path = Path(image_output_path)
|
|
|
|
|
+ else:
|
|
|
|
|
+ if len(input_paths) > 1:
|
|
|
|
|
+ base_path = input_paths[0].parent / "set_generated.png"
|
|
|
else:
|
|
else:
|
|
|
- if len(input_paths) > 1:
|
|
|
|
|
- base_path = input_paths[0].parent / "set_generated.png"
|
|
|
|
|
- else:
|
|
|
|
|
- base_path = input_paths[0].parent / f"{input_paths[0].stem}_generated.png"
|
|
|
|
|
- base_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
-
|
|
|
|
|
- for idx, ref in enumerate(refs):
|
|
|
|
|
- kind = ref.get("kind", "")
|
|
|
|
|
- mime_type = "image/png"
|
|
|
|
|
- raw_bytes: Optional[bytes] = None
|
|
|
|
|
-
|
|
|
|
|
- if kind == "data_url":
|
|
|
|
|
- m = re.match(r"^data:([^;]+);base64,(.+)$", ref.get("value", ""), flags=re.DOTALL)
|
|
|
|
|
- if not m:
|
|
|
|
|
- continue
|
|
|
|
|
- mime_type = m.group(1)
|
|
|
|
|
- raw_bytes = base64.b64decode(m.group(2))
|
|
|
|
|
- elif kind == "base64":
|
|
|
|
|
- mime_type = ref.get("mime_type", "image/png")
|
|
|
|
|
- raw_bytes = base64.b64decode(ref.get("value", ""))
|
|
|
|
|
- elif kind == "url":
|
|
|
|
|
- url = ref.get("value", "")
|
|
|
|
|
- try:
|
|
|
|
|
- with httpx.Client(timeout=DEFAULT_TIMEOUT) as client:
|
|
|
|
|
- r = client.get(url)
|
|
|
|
|
- r.raise_for_status()
|
|
|
|
|
- raw_bytes = r.content
|
|
|
|
|
- mime_type = r.headers.get("content-type", "image/png").split(";")[0]
|
|
|
|
|
- except Exception:
|
|
|
|
|
- continue
|
|
|
|
|
- else:
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ base_path = input_paths[0].parent / f"{input_paths[0].stem}_generated.png"
|
|
|
|
|
+ base_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
- if not raw_bytes:
|
|
|
|
|
- continue
|
|
|
|
|
|
|
+ for idx, ref in enumerate(refs):
|
|
|
|
|
+ kind = ref.get("kind", "")
|
|
|
|
|
+ mime_type = "image/png"
|
|
|
|
|
+ raw_bytes: Optional[bytes] = None
|
|
|
|
|
|
|
|
- ext = _mime_to_ext(mime_type)
|
|
|
|
|
- if len(refs) == 1:
|
|
|
|
|
- target = base_path
|
|
|
|
|
- if target.suffix.lower() not in [".png", ".jpg", ".jpeg", ".webp"]:
|
|
|
|
|
- target = target.with_suffix(ext)
|
|
|
|
|
- else:
|
|
|
|
|
- stem = base_path.stem
|
|
|
|
|
- target = base_path.with_name(f"{stem}_{idx+1}{ext}")
|
|
|
|
|
|
|
+ if kind == "data_url":
|
|
|
|
|
+ m = re.match(r"^data:([^;]+);base64,(.+)$", ref.get("value", ""), flags=re.DOTALL)
|
|
|
|
|
+ if not m:
|
|
|
|
|
+ continue
|
|
|
|
|
+ mime_type = m.group(1)
|
|
|
|
|
+ raw_bytes = base64.b64decode(m.group(2))
|
|
|
|
|
+ elif kind == "base64":
|
|
|
|
|
+ mime_type = ref.get("mime_type", "image/png")
|
|
|
|
|
+ raw_bytes = base64.b64decode(ref.get("value", ""))
|
|
|
|
|
+ elif kind == "url":
|
|
|
|
|
+ url = ref.get("value", "")
|
|
|
try:
|
|
try:
|
|
|
- target.write_bytes(raw_bytes)
|
|
|
|
|
- output_paths.append(str(target))
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 生成失败",
|
|
|
|
|
- output="",
|
|
|
|
|
- error=f"写入生成图片失败: {e}",
|
|
|
|
|
- metadata={"model": chosen_model},
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- if not output_paths:
|
|
|
|
|
|
|
+ with httpx.Client(timeout=DEFAULT_TIMEOUT) as client:
|
|
|
|
|
+ r = client.get(url)
|
|
|
|
|
+ r.raise_for_status()
|
|
|
|
|
+ raw_bytes = r.content
|
|
|
|
|
+ mime_type = r.headers.get("content-type", "image/png").split(";")[0]
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ continue
|
|
|
|
|
+ else:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if not raw_bytes:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ ext = _mime_to_ext(mime_type)
|
|
|
|
|
+ if len(refs) == 1:
|
|
|
|
|
+ target = base_path
|
|
|
|
|
+ if target.suffix.lower() not in [".png", ".jpg", ".jpeg", ".webp"]:
|
|
|
|
|
+ target = target.with_suffix(ext)
|
|
|
|
|
+ else:
|
|
|
|
|
+ stem = base_path.stem
|
|
|
|
|
+ target = base_path.with_name(f"{stem}_{idx+1}{ext}")
|
|
|
|
|
+ try:
|
|
|
|
|
+ target.write_bytes(raw_bytes)
|
|
|
|
|
+ output_paths.append(str(target))
|
|
|
|
|
+ except Exception as e:
|
|
|
return ToolResult(
|
|
return ToolResult(
|
|
|
title="NanoBanana 生成失败",
|
|
title="NanoBanana 生成失败",
|
|
|
- output=json.dumps(data, ensure_ascii=False, indent=2),
|
|
|
|
|
- error="检测到图片引用但写入失败(可能是无效 base64 或 URL 不可访问)",
|
|
|
|
|
- metadata={"model": chosen_model, "ref_count": len(refs)},
|
|
|
|
|
|
|
+ output="",
|
|
|
|
|
+ error=f"写入生成图片失败: {e}",
|
|
|
|
|
+ metadata={"model": chosen_model},
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- usage = data.get("usage", {})
|
|
|
|
|
- prompt_tokens = usage.get("prompt_tokens") or usage.get("input_tokens", 0)
|
|
|
|
|
- completion_tokens = usage.get("completion_tokens") or usage.get("output_tokens", 0)
|
|
|
|
|
- summary = {
|
|
|
|
|
- "model": chosen_model,
|
|
|
|
|
- "input_images": [str(p) for p in input_paths],
|
|
|
|
|
- "input_count": len(input_paths),
|
|
|
|
|
- "generated_images": output_paths,
|
|
|
|
|
- "prompt_tokens": prompt_tokens,
|
|
|
|
|
- "completion_tokens": completion_tokens,
|
|
|
|
|
- }
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 图片生成完成",
|
|
|
|
|
- output=json.dumps({"summary": summary}, ensure_ascii=False, indent=2),
|
|
|
|
|
- long_term_memory=f"Generated {len(output_paths)} image(s) from {len(input_paths)} input image(s) using {chosen_model}",
|
|
|
|
|
- attachments=output_paths,
|
|
|
|
|
- metadata=summary,
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- content = message.get("content") or ""
|
|
|
|
|
- if not content:
|
|
|
|
|
|
|
+ if not output_paths:
|
|
|
return ToolResult(
|
|
return ToolResult(
|
|
|
- title="NanoBanana 提取失败",
|
|
|
|
|
|
|
+ title="NanoBanana 生成失败",
|
|
|
output=json.dumps(data, ensure_ascii=False, indent=2),
|
|
output=json.dumps(data, ensure_ascii=False, indent=2),
|
|
|
- error="模型未返回内容",
|
|
|
|
|
|
|
+ error="检测到图片引用但写入失败(可能是无效 base64 或 URL 不可访问)",
|
|
|
|
|
+ metadata={"model": chosen_model, "ref_count": len(refs)},
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- try:
|
|
|
|
|
- parsed = _safe_json_parse(content)
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- return ToolResult(
|
|
|
|
|
- title="NanoBanana 提取失败",
|
|
|
|
|
- output=content,
|
|
|
|
|
- error=f"模型返回非 JSON 内容,解析失败: {e}",
|
|
|
|
|
- metadata={"model": chosen_model},
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- if output_file:
|
|
|
|
|
- out_path = Path(output_file)
|
|
|
|
|
- else:
|
|
|
|
|
- if len(input_paths) > 1:
|
|
|
|
|
- out_path = input_paths[0].parent / "set_invariant_features.json"
|
|
|
|
|
- else:
|
|
|
|
|
- out_path = input_paths[0].parent / f"{input_paths[0].stem}_invariant_features.json"
|
|
|
|
|
-
|
|
|
|
|
- out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
- out_path.write_text(json.dumps(parsed, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
-
|
|
|
|
|
usage = data.get("usage", {})
|
|
usage = data.get("usage", {})
|
|
|
prompt_tokens = usage.get("prompt_tokens") or usage.get("input_tokens", 0)
|
|
prompt_tokens = usage.get("prompt_tokens") or usage.get("input_tokens", 0)
|
|
|
completion_tokens = usage.get("completion_tokens") or usage.get("output_tokens", 0)
|
|
completion_tokens = usage.get("completion_tokens") or usage.get("output_tokens", 0)
|
|
|
-
|
|
|
|
|
summary = {
|
|
summary = {
|
|
|
"model": chosen_model,
|
|
"model": chosen_model,
|
|
|
"input_images": [str(p) for p in input_paths],
|
|
"input_images": [str(p) for p in input_paths],
|
|
|
"input_count": len(input_paths),
|
|
"input_count": len(input_paths),
|
|
|
- "output_file": str(out_path),
|
|
|
|
|
|
|
+ "generated_images": output_paths,
|
|
|
"prompt_tokens": prompt_tokens,
|
|
"prompt_tokens": prompt_tokens,
|
|
|
"completion_tokens": completion_tokens,
|
|
"completion_tokens": completion_tokens,
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
return ToolResult(
|
|
return ToolResult(
|
|
|
- title="NanoBanana 不变特征提取完成",
|
|
|
|
|
- output=json.dumps(
|
|
|
|
|
- {
|
|
|
|
|
- "summary": summary,
|
|
|
|
|
- "features": parsed,
|
|
|
|
|
- },
|
|
|
|
|
- ensure_ascii=False,
|
|
|
|
|
- indent=2,
|
|
|
|
|
- ),
|
|
|
|
|
- long_term_memory=f"Extracted invariant features from {len(input_paths)} input image(s) using {chosen_model}",
|
|
|
|
|
- attachments=[str(out_path)],
|
|
|
|
|
|
|
+ title="NanoBanana 图片生成完成",
|
|
|
|
|
+ output=json.dumps({"summary": summary}, ensure_ascii=False, indent=2),
|
|
|
|
|
+ long_term_memory=f"Generated {len(output_paths)} image(s) from {len(input_paths)} input image(s) using {chosen_model}",
|
|
|
|
|
+ attachments=output_paths,
|
|
|
metadata=summary,
|
|
metadata=summary,
|
|
|
|
|
+ tool_usage={
|
|
|
|
|
+ "model": chosen_model,
|
|
|
|
|
+ "prompt_tokens": prompt_tokens,
|
|
|
|
|
+ "completion_tokens": completion_tokens,
|
|
|
|
|
+ }
|
|
|
)
|
|
)
|