| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- import sys
- import os
- import asyncio
- import json
- import base64
- import re
- # 将项目根目录加入,方便导入内部包
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
- from agent.tools.builtin.toolhub import toolhub_call
- from agent.llm.gemini import create_gemini_llm_call
- from dotenv import load_dotenv
- load_dotenv()
- try:
- gemini_llm_call = create_gemini_llm_call()
- except ValueError as e:
- print(f"初始化 Gemini 失败: {e},请检查 .env。")
- sys.exit(1)
- from agent.tools.builtin.search import search_posts
- # -----------------
- # Utility Functions
- # -----------------
- def encode_image(image_path: str) -> str:
- with open(image_path, "rb") as image_file:
- return base64.b64encode(image_file.read()).decode('utf-8')
- def get_base64_url(image_path: str) -> str:
- b64_data = encode_image(image_path)
- ext = image_path.split('.')[-1].lower()
- if ext == 'jpg': ext = 'jpeg'
- return f"data:image/{ext};base64,{b64_data}"
- # -----------------
- # Tools definitions
- # -----------------
- async def call_banana_tool(prompt: str, aspect_ratio: str = None, reference_image: str = None, is_final: bool = True) -> str:
- """包装 call_banana.py 生成图片,返回一张图的路径"""
- print(f"\n[Tool] ✨ 正在调用 call_banana 生成图片 (is_final={is_final}), Prompt: {prompt[:50]}...")
- script_path = os.path.join(os.path.dirname(__file__), "call_banana.py")
-
- env = os.environ.copy()
- env["PYTHONIOENCODING"] = "utf-8"
-
- cmd_args = [sys.executable, script_path, "-p", prompt]
- if aspect_ratio:
- cmd_args.extend(["-a", aspect_ratio])
- if reference_image:
- cmd_args.extend(["-i", reference_image])
-
- process = await asyncio.create_subprocess_exec(
- *cmd_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=env
- )
- stdout, stderr = await process.communicate()
- output = stdout.decode('utf-8', errors='replace')
- err_output = stderr.decode('utf-8', errors='replace')
- if err_output:
- output += "\n" + err_output
-
- match = re.search(r"已保存到本地 -> (.+)", output)
- if match:
- path = match.group(1).strip()
- print(f"[Tool] ✅ call_banana 返回图片路径: {path}")
- return path
- else:
- print(f"[Tool] ❌ call_banana 执行失败:\n{output}")
- return f"Tool Execution Failed. output:\n{output}"
- async def search_tool(keyword: str) -> str:
- print(f"\n[Tool] 🔍 启动小红书调研, 关键词: {keyword}")
- try:
- result = await search_posts(keyword=keyword, channel="xhs", max_count=3)
- return result.output
- except Exception as e:
- return f"查询失败: {e}"
- def get_agent_tools():
- return [
- {
- "type": "function",
- "function": {
- "name": "search_tool",
- "description": "如果需要了解某个风格如何写 Prompt(例如“写实风格提示词”),调用此工具进行小红书全网搜索,返回总结经验以更新你的参数。",
- "parameters": {
- "type": "object",
- "properties": {
- "keyword": {
- "type": "string",
- "description": "搜索关键词"
- }
- },
- "required": ["keyword"]
- }
- }
- },
- {
- "type": "function",
- "function": {
- "name": "call_banana_tool",
- "description": "使用此工具通过给定的详细提示词生成图片。工具将返回生成图片的本地保存路径。",
- "parameters": {
- "type": "object",
- "properties": {
- "prompt": {
- "type": "string",
- "description": "英语或中文详细的生图提示词"
- },
- "aspect_ratio": {
- "type": "string",
- "description": "(可选)你期望生成的图片宽高比,例如 3:4, 16:9, 1:1,请根据目标参考图的比例传入该参数"
- },
- "reference_image": {
- "type": "string",
- "description": "(动作控制底图)如果你在这一步设 is_final=true,请将你在上一阶段生成的【辅助骨架素材(is_final=false)】产生的本地路径填入此处。绝对禁止传入原始目标照片!"
- },
- "is_final": {
- "type": "boolean",
- "description": "指示本次生成是否是本轮次的最终产物。如果你需要先生成一张『白底火柴人/3D骨架』作为辅助垫图素材,请设为 false;拿到素材后,你必须继续将它的本地路径填给 `reference_image` 并使用最终 Prompt 和 is_final=true 完成最后合成。"
- }
- },
- "required": ["prompt"]
- }
- }
- }
- ]
- # -----------------
- # Main Workflow Loop
- # -----------------
- def get_base64_url(image_path: str) -> str:
- with open(image_path, "rb") as image_file:
- b64_data = base64.b64encode(image_file.read()).decode('utf-8')
- ext = image_path.split('.')[-1].lower()
- if ext == 'jpg': ext = 'jpeg'
- return f"data:image/{ext};base64,{b64_data}"
- async def main():
- import argparse
- import os
- import json
-
- default_target = os.path.join(os.path.dirname(os.path.abspath(__file__)), "input", "img_1.png")
- parser = argparse.ArgumentParser(description="多智能体画图自动优化 Workflow")
- parser.add_argument("-t", "--target", default=default_target, help="你想逼近的目标参考图本地路径")
- parser.add_argument("-p", "--pose", default=None, help="你提供的姿势参考图(如果有的话,给 Agent 用来走捷径垫底)")
- parser.add_argument("-m", "--max_loops", type=int, default=15, help="优化的最大迭代论调")
- parser.add_argument("-r", "--resume", action="store_true", help="是否从上次的 history.json 继续运行")
- args = parser.parse_args()
-
- target_image = args.target
- pose_image = args.pose
- print("\n" + "="*50)
- print("🤖 启动双 Agent 生图闭环工作流 (纯 Vision-Language 架构)")
- print("="*50)
-
- if not os.path.exists(target_image):
- print(f"⚠️ 找不到目标图片: {target_image}")
- print("提示: 系统依然会运行寻找文件,但 Agent 2 将无法给出评估。可随便放一个图片来模拟。")
-
- sys_content = f"你是一个高度自治的闭环生图优化 AI 架构师。你的目标是:生成一张与【目标参考图】在主角姿势、整体结构上无限接近的图片。\n你拥有极强的视觉反思能力和 Prompt 编写能力。\n\n【核心工作流与防坑指南】:\n- 你会看到你的【目标参考图】和你的【往期历史尝试与生成结果】。\n- 请你先利用你的**多模态火眼金睛**,无情地对自己上一轮生成的图片进行找茬。绝不允许说客套话!重点对比人物骨架、姿势和构图的偏离程度。\n- 紧接着,请在反思的基础上,直接重构或调整你的 Prompt,并在一次回复中调用 `call_banana_tool` 下发生图指令!\n- 【防作弊铁律】:你**绝对禁止**直接将【目标参考图】的路径传进 `reference_image` 来作弊!如果你想用图生图垫出完美动作,必须使用【中间素材战法】亲手画一张骨架出来垫。\n- 【中间素材战法】:如果原图姿态过于刁钻复杂,**要求你必须**分两步走:\n 第一步:设置 `is_final=false` 并写一段专门用于抽出单一维度的动作骨架/白模 Prompt(如: \"a generic white 3d mannequin jumping in mid-air, clean white background, high contrast skeleton\"),专门用于抽出干净的辅助骨架。\n 第二步:拿到这只纯净骨架的本地路径后,在同回合的下一次调用中,把这只骨架当做 `reference_image` 垫进去,配合你华丽的最终描述(如: \"a neon cyberpunk assassin jumping\"),设置 `is_final=true` 完成高阶对齐兼防污染! \n\n"
-
- if pose_image and os.path.exists(pose_image):
- sys_content += f"【🔥终极开挂特权】:\n天啊!用户居然为你额外提供了一张极致完美的【姿势参考图】!既然有了这张现成的动作骨架底图,你**立刻抛弃**两步走去抽骨架的方法。你应当直接使用特权,将这张姿势参考图的绝对物理路径 `{os.path.abspath(pose_image)}` 作为 `reference_image` 无脑传给引擎,配合你的终极词汇,并在第一回合内设置 `is_final=true` 完成终极绝杀生成!\n\n"
- sys_content += "流程要求:\n1. 仔细分析差异,在你的纯文本回复段落写出【犀利的反思和执行步骤】。\n2. 反思结束后,使用工具发号施令。\n3. 当调用 `is_final=true` 时,视为你的本轮彻底结束。"
- system_msg = {
- "role": "system",
- "content": sys_content
- }
- max_loops = args.max_loops
- current_generation_loop_count = 0
- last_gen_info = None
- prompt_history = [] # 记录完整的历史 Prompt 轨迹,防止反复抽卡
-
- history_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "history.json")
- if args.resume and os.path.exists(history_file):
- try:
- with open(history_file, "r", encoding="utf-8") as f:
- prompt_history = json.load(f)
- if prompt_history:
- current_generation_loop_count = len(prompt_history)
- last_gen_info = prompt_history[-1]
- print(f"✅ [状态恢复] 已成功从 history.json 加载 {current_generation_loop_count} 轮历史,即将开始第 {current_generation_loop_count + 1} 轮...")
- except Exception as e:
- print(f"⚠️ [状态恢复失败] 读取历史记录报错: {e},将重新开始第一轮。")
- prompt_history = []
- while current_generation_loop_count < max_loops:
- print(f"\n" + "="*40)
- print(f"🔄 优化循环: 第 {current_generation_loop_count + 1}/{max_loops} 轮")
- print("="*40)
-
- # 每轮重置上下文,只保留 system message 和含有"上次结果"的 initial user message
- messages = [system_msg]
-
- if last_gen_info is None:
- try:
- target_b64_url = get_base64_url(target_image)
- content_list = [
- {"type": "text", "text": "【首轮启动】\n这是你需要逼近的【目标参考图】。现在请你仔细观察它,提炼出一份初步生图 Prompt。\n因为是第一轮,请直接凭借直觉观察,并使用 call_banana_tool 生成原型。"},
- {"type": "image_url", "image_url": {"url": target_b64_url}}
- ]
-
- if pose_image and os.path.exists(pose_image):
- content_list.append({"type": "text", "text": "并且,下面是用户良心为你提供的【开挂级·姿势参考图】!你可以直接在接下来的提示词工具调用中将此图拿去垫图!"})
- content_list.append({"type": "image_url", "image_url": {"url": get_base64_url(pose_image)}})
-
- messages.append({
- "role": "user",
- "content": content_list
- })
- except Exception as e:
- messages.append({
- "role": "user",
- "content": f"目标图片读取失败({e}),请盲猜一个初始 Prompt 用 call_banana_tool 生成。"
- })
- else:
- try:
- target_b64_url = get_base64_url(target_image)
- user_content = [
- {"type": "text", "text": "【持续干预闭环】\n这是不可动摇的【目标参考图】,它是一切评判的唯一基准:"},
- {"type": "image_url", "image_url": {"url": target_b64_url}}
- ]
-
- if pose_image and os.path.exists(pose_image):
- user_content.append({"type": "text", "text": "【外挂辅助】\n这是不可动摇的【姿势参考图】,请毫不犹豫地拿它去填进 reference_image 控制动作:"})
- user_content.append({"type": "image_url", "image_url": {"url": get_base64_url(pose_image)}})
-
- user_content.append({"type": "text", "text": "\n==== 【你的历史试错轨迹】 ====\n为了防止你在这场试错过程中来回打转(所谓的废卡反复抽卡),我为你列出了你*从古至今*所有的失败作品和对应的提示词!请认真观察下面每一张你过去的废片:\n"})
-
- for i, record in enumerate(prompt_history):
- user_content.append({"type": "text", "text": f"-- 第 {i+1} 轮 --\n[上次使用的 Prompt]:\n{record['prompt']}\n[此轮的废片结果]:"})
-
- try:
- img_path = record.get("image_paths", [record.get("image_path")])[0]
- # 节约上下文 Token 和视觉注意力:只渲染第一张(由于打底盲测)和最近一次的历史原图,中间的全部折叠仅保留反思文本
- if i == 0 or i == len(prompt_history) - 1:
- user_content.append({"type": "image_url", "image_url": {"url": get_base64_url(img_path)}})
- else:
- user_content.append({"type": "text", "text": "*(由于历史过于久远,中间轮次图片已省去展示,请聚焦于下面你对它的纯文本反思)*"})
- except:
- pass
-
- if record.get("feedback"):
- user_content.append({"type": "text", "text": f"[你在本轮结束后的反思]:\n{record['feedback']}\n"})
-
- user_content.append({"type": "text", "text": "====================\n\n现在,结合上述轨迹与那张【目标参考图】,请在回复中写出最新的【极度苛刻自我反思】,然后立马调用工具生成这轮新的 Prompt!"})
-
- messages.append({"role": "user", "content": user_content})
-
- except Exception as e:
- messages.append({"role": "user", "content": f"上下文读取失败 ({e})。请重试用 call_banana_tool 生成。"})
- # Agent 1 内部工具调研微循环 (Agent 1 minor logic loop)
- agent1_finished_generation = False
- consecutive_empty = 0
-
- while not agent1_finished_generation:
- print(f"---\n💬 正在请求 Agent 1 (Prompt 师)...")
- # 这里 Agent 1 也换成 qwen-vl-max,这样它才能看到传给它的上一轮图片
- response = await gemini_llm_call(
- messages=messages,
- model="gemini-3.1-pro-preview",
- tools=get_agent_tools()
- )
-
- content = response.get("content", "")
- tool_calls = response.get("tool_calls")
-
- if content:
- print(f"\n[Agent 1 思考]:\n{content}")
-
- if not tool_calls and not content:
- consecutive_empty += 1
- if consecutive_empty >= 3:
- print("Agent 连续多次无有意义输出,强制跳出本轮。")
- break
- else:
- consecutive_empty = 0
- # 保持上下文
- assistant_reply = {"role": "assistant"}
- if content: assistant_reply["content"] = content
- if tool_calls: assistant_reply["tool_calls"] = tool_calls
- if "raw_gemini_parts" in response: assistant_reply["raw_gemini_parts"] = response["raw_gemini_parts"]
- messages.append(assistant_reply)
- if tool_calls:
- for tc in tool_calls:
- func_name = tc["function"]["name"]
- args_dict = json.loads(tc["function"]["arguments"])
- tc_id = tc["id"]
-
- if func_name == "search_tool":
- res = await search_tool(**args_dict)
- messages.append({
- "role": "tool",
- "tool_call_id": tc_id,
- "content": str(res)
- })
-
- elif func_name == "call_banana_tool":
- is_final = args_dict.get("is_final", True)
- print(f"\n⚙️ 节点发起了生图请求 (是否为终极图: {is_final})!")
- gen_path = await call_banana_tool(**args_dict)
-
- if os.path.exists(gen_path):
- ext = gen_path.split('.')[-1]
- import shutil
- if is_final:
- new_gen_path = f"gen_loop_{current_generation_loop_count + 1}.{ext}"
- else:
- import uuid
- new_gen_path = f"gen_loop_{current_generation_loop_count + 1}_material_{str(uuid.uuid4())[:8]}.{ext}"
- shutil.move(gen_path, new_gen_path)
- gen_path = new_gen_path
- print(f"[文件管理] 生图结果已重命名并保存为: {new_gen_path}")
-
- prompt_used = args_dict.get("prompt", "")
-
- messages.append({
- "role": "tool",
- "tool_call_id": tc_id,
- "content": f"已成功生成,图片路径: {os.path.abspath(gen_path)}"
- })
-
- if is_final:
- agent1_finished_generation = True
- current_generation_loop_count += 1
-
- last_gen_info = {
- "prompt": prompt_used,
- "image_path": gen_path,
- "feedback": content if content else "无反思内容"
- }
-
- prompt_history.append(last_gen_info)
- try:
- with open(history_file, "w", encoding="utf-8") as f:
- json.dump(prompt_history, f, ensure_ascii=False, indent=2)
- except Exception as e:
- print(f"[警告] 历史记录保存失败: {e}")
- break # 跳出 tool_calls for loop 并进入下一大轮
- else:
- print(f"[战术回馈] 这是辅助素材,已将路径返回给 Agent1 继续思考。")
- else:
- # 没调工具
- print("\n[控制中心] Agent 1 没有继续使用任何工具。结束其周期。")
- agent1_finished_generation = True
- break
-
- print("\n🎉 工作流闭环成功完成或达到了最大迭代次数。")
-
- # 最后由评估专家出具一份最完善的多维度最终报告
- if len(prompt_history) > 0 and os.path.exists(target_image):
- print("\n" + "="*50)
- print("🏆 正在生成【专家最终多维度反馈报告】...")
- print("="*50)
-
- first_gen_record = prompt_history[0]
- last_gen_record = prompt_history[-1]
-
- # 兼容旧版本的单图记录和新版本的多图记录
- first_gen = first_gen_record.get("image_paths", [first_gen_record.get("image_path")])[0]
- last_gen = last_gen_record.get("image_paths", [last_gen_record.get("image_path")])[0]
-
- if first_gen and last_gen and os.path.exists(first_gen) and os.path.exists(last_gen):
- try:
- target_b64 = encode_image(target_image)
- first_b64 = encode_image(first_gen)
- last_b64 = encode_image(last_gen)
- target_ext = target_image.split('.')[-1].lower()
- first_ext = first_gen.split('.')[-1].lower()
- last_ext = last_gen.split('.')[-1].lower()
-
- # 构建供最终分析的文字轨迹
- full_history_text = "【历次 Prompt 与专家反馈的演进轨迹】\n"
- for i, record in enumerate(prompt_history):
- full_history_text += f"-- 第 {i+1} 轮 --\n[Prompt]: {record['prompt']}\n[反馈]: {record['feedback']}\n\n"
- final_messages = [
- {
- "role": "system",
- "content": "你是首席AI打样架构师。目前的生图迭代优化工作流已拉下帷幕。你不需要拘泥于打分,而是要通过回顾整个演进历程,总结出‘最好用的 Prompt 模板’和‘最精准的评估反馈维度模板’。"
- },
- {
- "role": "user",
- "content": [
- {"type": "text", "text": "【目标参考图(原图)】:"},
- {"type": "image_url", "image_url": {"url": f"data:image/{target_ext if target_ext != 'jpg' else 'jpeg'};base64,{target_b64}"}},
- {"type": "text", "text": "这是最初第1轮盲试的生成图:"},
- {"type": "image_url", "image_url": {"url": f"data:image/{first_ext if first_ext != 'jpg' else 'jpeg'};base64,{first_b64}"}},
- {"type": "text", "text": f"这是经过迭代后的【最终生成图】:"},
- {"type": "image_url", "image_url": {"url": f"data:image/{last_ext if last_ext != 'jpg' else 'jpeg'};base64,{last_b64}"}},
- {"type": "text", "text": f"下面是 {len(prompt_history)} 轮迭代中,Prompt 和专家反馈的完整变迁记录:\n\n{full_history_text}\n\n请结合首尾图片的巨大差异以及中间的踩坑过程,深度复盘:\n1. 在构建生图 Prompt 时,哪些描述方式、句型或结构最能有效命中模型?请提炼出一个【最终版高转化率 Prompt 语法模板】。\n2. 在进行视觉反馈时,哪些维度的批评和建议对 Prompt 师是最具指导意义的?请提炼出一个【最终版高维度视觉评估反馈模板】。\n这两个模版需要具备极强的通用性和实战复用价值!"}
- ]
- }
- ]
-
- response = await gemini_llm_call(
- messages=final_messages,
- model="gemini-3.1-pro-preview"
- )
- print(f"\n[Agent 2] 📋 【最终多维度评估报告】:\n{response['content']}\n")
- except Exception as e:
- print(f"最终报告生成失败: {e}")
- if __name__ == "__main__":
- asyncio.run(main())
|