workflow_loop.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import sys
  2. import os
  3. import asyncio
  4. import json
  5. import base64
  6. import re
  7. # 将项目根目录加入,方便导入内部包
  8. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
  9. from agent.tools.builtin.toolhub import toolhub_call
  10. from agent.llm.gemini import create_gemini_llm_call
  11. from dotenv import load_dotenv
  12. load_dotenv()
  13. try:
  14. gemini_llm_call = create_gemini_llm_call()
  15. except ValueError as e:
  16. print(f"初始化 Gemini 失败: {e},请检查 .env。")
  17. sys.exit(1)
  18. from agent.tools.builtin.search import search_posts
  19. # -----------------
  20. # Utility Functions
  21. # -----------------
  22. def encode_image(image_path: str) -> str:
  23. with open(image_path, "rb") as image_file:
  24. return base64.b64encode(image_file.read()).decode('utf-8')
  25. def get_base64_url(image_path: str) -> str:
  26. b64_data = encode_image(image_path)
  27. ext = image_path.split('.')[-1].lower()
  28. if ext == 'jpg': ext = 'jpeg'
  29. return f"data:image/{ext};base64,{b64_data}"
  30. # -----------------
  31. # Tools definitions
  32. # -----------------
  33. async def call_banana_tool(prompt: str, aspect_ratio: str = None, reference_image: str = None, is_final: bool = True) -> str:
  34. """包装 call_banana.py 生成图片,返回一张图的路径"""
  35. print(f"\n[Tool] ✨ 正在调用 call_banana 生成图片 (is_final={is_final}), Prompt: {prompt[:50]}...")
  36. script_path = os.path.join(os.path.dirname(__file__), "call_banana.py")
  37. env = os.environ.copy()
  38. env["PYTHONIOENCODING"] = "utf-8"
  39. cmd_args = [sys.executable, script_path, "-p", prompt]
  40. if aspect_ratio:
  41. cmd_args.extend(["-a", aspect_ratio])
  42. if reference_image:
  43. cmd_args.extend(["-i", reference_image])
  44. process = await asyncio.create_subprocess_exec(
  45. *cmd_args,
  46. stdout=asyncio.subprocess.PIPE,
  47. stderr=asyncio.subprocess.PIPE,
  48. env=env
  49. )
  50. stdout, stderr = await process.communicate()
  51. output = stdout.decode('utf-8', errors='replace')
  52. err_output = stderr.decode('utf-8', errors='replace')
  53. if err_output:
  54. output += "\n" + err_output
  55. match = re.search(r"已保存到本地 -> (.+)", output)
  56. if match:
  57. path = match.group(1).strip()
  58. print(f"[Tool] ✅ call_banana 返回图片路径: {path}")
  59. return path
  60. else:
  61. print(f"[Tool] ❌ call_banana 执行失败:\n{output}")
  62. return f"Tool Execution Failed. output:\n{output}"
  63. async def search_tool(keyword: str) -> str:
  64. print(f"\n[Tool] 🔍 启动小红书调研, 关键词: {keyword}")
  65. try:
  66. result = await search_posts(keyword=keyword, channel="xhs", max_count=3)
  67. return result.output
  68. except Exception as e:
  69. return f"查询失败: {e}"
  70. def get_agent_tools():
  71. return [
  72. {
  73. "type": "function",
  74. "function": {
  75. "name": "search_tool",
  76. "description": "如果需要了解某个风格如何写 Prompt(例如“写实风格提示词”),调用此工具进行小红书全网搜索,返回总结经验以更新你的参数。",
  77. "parameters": {
  78. "type": "object",
  79. "properties": {
  80. "keyword": {
  81. "type": "string",
  82. "description": "搜索关键词"
  83. }
  84. },
  85. "required": ["keyword"]
  86. }
  87. }
  88. },
  89. {
  90. "type": "function",
  91. "function": {
  92. "name": "call_banana_tool",
  93. "description": "使用此工具通过给定的详细提示词生成图片。工具将返回生成图片的本地保存路径。",
  94. "parameters": {
  95. "type": "object",
  96. "properties": {
  97. "prompt": {
  98. "type": "string",
  99. "description": "英语或中文详细的生图提示词"
  100. },
  101. "aspect_ratio": {
  102. "type": "string",
  103. "description": "(可选)你期望生成的图片宽高比,例如 3:4, 16:9, 1:1,请根据目标参考图的比例传入该参数"
  104. },
  105. "reference_image": {
  106. "type": "string",
  107. "description": "(动作控制底图)如果你在这一步设 is_final=true,请将你在上一阶段生成的【辅助骨架素材(is_final=false)】产生的本地路径填入此处。绝对禁止传入原始目标照片!"
  108. },
  109. "is_final": {
  110. "type": "boolean",
  111. "description": "指示本次生成是否是本轮次的最终产物。如果你需要先生成一张『白底火柴人/3D骨架』作为辅助垫图素材,请设为 false;拿到素材后,你必须继续将它的本地路径填给 `reference_image` 并使用最终 Prompt 和 is_final=true 完成最后合成。"
  112. }
  113. },
  114. "required": ["prompt"]
  115. }
  116. }
  117. }
  118. ]
  119. # -----------------
  120. # Main Workflow Loop
  121. # -----------------
  122. def get_base64_url(image_path: str) -> str:
  123. with open(image_path, "rb") as image_file:
  124. b64_data = base64.b64encode(image_file.read()).decode('utf-8')
  125. ext = image_path.split('.')[-1].lower()
  126. if ext == 'jpg': ext = 'jpeg'
  127. return f"data:image/{ext};base64,{b64_data}"
  128. async def main():
  129. import argparse
  130. import os
  131. import json
  132. default_target = os.path.join(os.path.dirname(os.path.abspath(__file__)), "input", "img_1.png")
  133. parser = argparse.ArgumentParser(description="多智能体画图自动优化 Workflow")
  134. parser.add_argument("-t", "--target", default=default_target, help="你想逼近的目标参考图本地路径")
  135. parser.add_argument("-p", "--pose", default=None, help="你提供的姿势参考图(如果有的话,给 Agent 用来走捷径垫底)")
  136. parser.add_argument("-m", "--max_loops", type=int, default=15, help="优化的最大迭代论调")
  137. parser.add_argument("-r", "--resume", action="store_true", help="是否从上次的 history.json 继续运行")
  138. args = parser.parse_args()
  139. target_image = args.target
  140. pose_image = args.pose
  141. print("\n" + "="*50)
  142. print("🤖 启动双 Agent 生图闭环工作流 (纯 Vision-Language 架构)")
  143. print("="*50)
  144. if not os.path.exists(target_image):
  145. print(f"⚠️ 找不到目标图片: {target_image}")
  146. print("提示: 系统依然会运行寻找文件,但 Agent 2 将无法给出评估。可随便放一个图片来模拟。")
  147. sys_content = f"你是一个高度自治的闭环生图优化 AI 架构师。你的目标是:生成一张与【目标参考图】在主角姿势、整体结构上无限接近的图片。\n你拥有极强的视觉反思能力和 Prompt 编写能力。\n\n【核心工作流与防坑指南】:\n- 你会看到你的【目标参考图】和你的【往期历史尝试与生成结果】。\n- 请你先利用你的**多模态火眼金睛**,无情地对自己上一轮生成的图片进行找茬。绝不允许说客套话!重点对比人物骨架、姿势和构图的偏离程度。\n- 紧接着,请在反思的基础上,直接重构或调整你的 Prompt,并在一次回复中调用 `call_banana_tool` 下发生图指令!\n- 【防作弊铁律】:你**绝对禁止**直接将【目标参考图】的路径传进 `reference_image` 来作弊!如果你想用图生图垫出完美动作,必须使用【中间素材战法】亲手画一张骨架出来垫。\n- 【中间素材战法】:如果原图姿态过于刁钻复杂,**要求你必须**分两步走:\n 第一步:设置 `is_final=false` 并写一段专门用于抽出单一维度的动作骨架/白模 Prompt(如: \"a generic white 3d mannequin jumping in mid-air, clean white background, high contrast skeleton\"),专门用于抽出干净的辅助骨架。\n 第二步:拿到这只纯净骨架的本地路径后,在同回合的下一次调用中,把这只骨架当做 `reference_image` 垫进去,配合你华丽的最终描述(如: \"a neon cyberpunk assassin jumping\"),设置 `is_final=true` 完成高阶对齐兼防污染! \n\n"
  148. if pose_image and os.path.exists(pose_image):
  149. sys_content += f"【🔥终极开挂特权】:\n天啊!用户居然为你额外提供了一张极致完美的【姿势参考图】!既然有了这张现成的动作骨架底图,你**立刻抛弃**两步走去抽骨架的方法。你应当直接使用特权,将这张姿势参考图的绝对物理路径 `{os.path.abspath(pose_image)}` 作为 `reference_image` 无脑传给引擎,配合你的终极词汇,并在第一回合内设置 `is_final=true` 完成终极绝杀生成!\n\n"
  150. sys_content += "流程要求:\n1. 仔细分析差异,在你的纯文本回复段落写出【犀利的反思和执行步骤】。\n2. 反思结束后,使用工具发号施令。\n3. 当调用 `is_final=true` 时,视为你的本轮彻底结束。"
  151. system_msg = {
  152. "role": "system",
  153. "content": sys_content
  154. }
  155. max_loops = args.max_loops
  156. current_generation_loop_count = 0
  157. last_gen_info = None
  158. prompt_history = [] # 记录完整的历史 Prompt 轨迹,防止反复抽卡
  159. history_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "history.json")
  160. if args.resume and os.path.exists(history_file):
  161. try:
  162. with open(history_file, "r", encoding="utf-8") as f:
  163. prompt_history = json.load(f)
  164. if prompt_history:
  165. current_generation_loop_count = len(prompt_history)
  166. last_gen_info = prompt_history[-1]
  167. print(f"✅ [状态恢复] 已成功从 history.json 加载 {current_generation_loop_count} 轮历史,即将开始第 {current_generation_loop_count + 1} 轮...")
  168. except Exception as e:
  169. print(f"⚠️ [状态恢复失败] 读取历史记录报错: {e},将重新开始第一轮。")
  170. prompt_history = []
  171. while current_generation_loop_count < max_loops:
  172. print(f"\n" + "="*40)
  173. print(f"🔄 优化循环: 第 {current_generation_loop_count + 1}/{max_loops} 轮")
  174. print("="*40)
  175. # 每轮重置上下文,只保留 system message 和含有"上次结果"的 initial user message
  176. messages = [system_msg]
  177. if last_gen_info is None:
  178. try:
  179. target_b64_url = get_base64_url(target_image)
  180. content_list = [
  181. {"type": "text", "text": "【首轮启动】\n这是你需要逼近的【目标参考图】。现在请你仔细观察它,提炼出一份初步生图 Prompt。\n因为是第一轮,请直接凭借直觉观察,并使用 call_banana_tool 生成原型。"},
  182. {"type": "image_url", "image_url": {"url": target_b64_url}}
  183. ]
  184. if pose_image and os.path.exists(pose_image):
  185. content_list.append({"type": "text", "text": "并且,下面是用户良心为你提供的【开挂级·姿势参考图】!你可以直接在接下来的提示词工具调用中将此图拿去垫图!"})
  186. content_list.append({"type": "image_url", "image_url": {"url": get_base64_url(pose_image)}})
  187. messages.append({
  188. "role": "user",
  189. "content": content_list
  190. })
  191. except Exception as e:
  192. messages.append({
  193. "role": "user",
  194. "content": f"目标图片读取失败({e}),请盲猜一个初始 Prompt 用 call_banana_tool 生成。"
  195. })
  196. else:
  197. try:
  198. target_b64_url = get_base64_url(target_image)
  199. user_content = [
  200. {"type": "text", "text": "【持续干预闭环】\n这是不可动摇的【目标参考图】,它是一切评判的唯一基准:"},
  201. {"type": "image_url", "image_url": {"url": target_b64_url}}
  202. ]
  203. if pose_image and os.path.exists(pose_image):
  204. user_content.append({"type": "text", "text": "【外挂辅助】\n这是不可动摇的【姿势参考图】,请毫不犹豫地拿它去填进 reference_image 控制动作:"})
  205. user_content.append({"type": "image_url", "image_url": {"url": get_base64_url(pose_image)}})
  206. user_content.append({"type": "text", "text": "\n==== 【你的历史试错轨迹】 ====\n为了防止你在这场试错过程中来回打转(所谓的废卡反复抽卡),我为你列出了你*从古至今*所有的失败作品和对应的提示词!请认真观察下面每一张你过去的废片:\n"})
  207. for i, record in enumerate(prompt_history):
  208. user_content.append({"type": "text", "text": f"-- 第 {i+1} 轮 --\n[上次使用的 Prompt]:\n{record['prompt']}\n[此轮的废片结果]:"})
  209. try:
  210. img_path = record.get("image_paths", [record.get("image_path")])[0]
  211. # 节约上下文 Token 和视觉注意力:只渲染第一张(由于打底盲测)和最近一次的历史原图,中间的全部折叠仅保留反思文本
  212. if i == 0 or i == len(prompt_history) - 1:
  213. user_content.append({"type": "image_url", "image_url": {"url": get_base64_url(img_path)}})
  214. else:
  215. user_content.append({"type": "text", "text": "*(由于历史过于久远,中间轮次图片已省去展示,请聚焦于下面你对它的纯文本反思)*"})
  216. except:
  217. pass
  218. if record.get("feedback"):
  219. user_content.append({"type": "text", "text": f"[你在本轮结束后的反思]:\n{record['feedback']}\n"})
  220. user_content.append({"type": "text", "text": "====================\n\n现在,结合上述轨迹与那张【目标参考图】,请在回复中写出最新的【极度苛刻自我反思】,然后立马调用工具生成这轮新的 Prompt!"})
  221. messages.append({"role": "user", "content": user_content})
  222. except Exception as e:
  223. messages.append({"role": "user", "content": f"上下文读取失败 ({e})。请重试用 call_banana_tool 生成。"})
  224. # Agent 1 内部工具调研微循环 (Agent 1 minor logic loop)
  225. agent1_finished_generation = False
  226. consecutive_empty = 0
  227. while not agent1_finished_generation:
  228. print(f"---\n💬 正在请求 Agent 1 (Prompt 师)...")
  229. # 这里 Agent 1 也换成 qwen-vl-max,这样它才能看到传给它的上一轮图片
  230. response = await gemini_llm_call(
  231. messages=messages,
  232. model="gemini-3.1-pro-preview",
  233. tools=get_agent_tools()
  234. )
  235. content = response.get("content", "")
  236. tool_calls = response.get("tool_calls")
  237. if content:
  238. print(f"\n[Agent 1 思考]:\n{content}")
  239. if not tool_calls and not content:
  240. consecutive_empty += 1
  241. if consecutive_empty >= 3:
  242. print("Agent 连续多次无有意义输出,强制跳出本轮。")
  243. break
  244. else:
  245. consecutive_empty = 0
  246. # 保持上下文
  247. assistant_reply = {"role": "assistant"}
  248. if content: assistant_reply["content"] = content
  249. if tool_calls: assistant_reply["tool_calls"] = tool_calls
  250. if "raw_gemini_parts" in response: assistant_reply["raw_gemini_parts"] = response["raw_gemini_parts"]
  251. messages.append(assistant_reply)
  252. if tool_calls:
  253. for tc in tool_calls:
  254. func_name = tc["function"]["name"]
  255. args_dict = json.loads(tc["function"]["arguments"])
  256. tc_id = tc["id"]
  257. if func_name == "search_tool":
  258. res = await search_tool(**args_dict)
  259. messages.append({
  260. "role": "tool",
  261. "tool_call_id": tc_id,
  262. "content": str(res)
  263. })
  264. elif func_name == "call_banana_tool":
  265. is_final = args_dict.get("is_final", True)
  266. print(f"\n⚙️ 节点发起了生图请求 (是否为终极图: {is_final})!")
  267. gen_path = await call_banana_tool(**args_dict)
  268. if os.path.exists(gen_path):
  269. ext = gen_path.split('.')[-1]
  270. import shutil
  271. if is_final:
  272. new_gen_path = f"gen_loop_{current_generation_loop_count + 1}.{ext}"
  273. else:
  274. import uuid
  275. new_gen_path = f"gen_loop_{current_generation_loop_count + 1}_material_{str(uuid.uuid4())[:8]}.{ext}"
  276. shutil.move(gen_path, new_gen_path)
  277. gen_path = new_gen_path
  278. print(f"[文件管理] 生图结果已重命名并保存为: {new_gen_path}")
  279. prompt_used = args_dict.get("prompt", "")
  280. messages.append({
  281. "role": "tool",
  282. "tool_call_id": tc_id,
  283. "content": f"已成功生成,图片路径: {os.path.abspath(gen_path)}"
  284. })
  285. if is_final:
  286. agent1_finished_generation = True
  287. current_generation_loop_count += 1
  288. last_gen_info = {
  289. "prompt": prompt_used,
  290. "image_path": gen_path,
  291. "feedback": content if content else "无反思内容"
  292. }
  293. prompt_history.append(last_gen_info)
  294. try:
  295. with open(history_file, "w", encoding="utf-8") as f:
  296. json.dump(prompt_history, f, ensure_ascii=False, indent=2)
  297. except Exception as e:
  298. print(f"[警告] 历史记录保存失败: {e}")
  299. break # 跳出 tool_calls for loop 并进入下一大轮
  300. else:
  301. print(f"[战术回馈] 这是辅助素材,已将路径返回给 Agent1 继续思考。")
  302. else:
  303. # 没调工具
  304. print("\n[控制中心] Agent 1 没有继续使用任何工具。结束其周期。")
  305. agent1_finished_generation = True
  306. break
  307. print("\n🎉 工作流闭环成功完成或达到了最大迭代次数。")
  308. # 最后由评估专家出具一份最完善的多维度最终报告
  309. if len(prompt_history) > 0 and os.path.exists(target_image):
  310. print("\n" + "="*50)
  311. print("🏆 正在生成【专家最终多维度反馈报告】...")
  312. print("="*50)
  313. first_gen_record = prompt_history[0]
  314. last_gen_record = prompt_history[-1]
  315. # 兼容旧版本的单图记录和新版本的多图记录
  316. first_gen = first_gen_record.get("image_paths", [first_gen_record.get("image_path")])[0]
  317. last_gen = last_gen_record.get("image_paths", [last_gen_record.get("image_path")])[0]
  318. if first_gen and last_gen and os.path.exists(first_gen) and os.path.exists(last_gen):
  319. try:
  320. target_b64 = encode_image(target_image)
  321. first_b64 = encode_image(first_gen)
  322. last_b64 = encode_image(last_gen)
  323. target_ext = target_image.split('.')[-1].lower()
  324. first_ext = first_gen.split('.')[-1].lower()
  325. last_ext = last_gen.split('.')[-1].lower()
  326. # 构建供最终分析的文字轨迹
  327. full_history_text = "【历次 Prompt 与专家反馈的演进轨迹】\n"
  328. for i, record in enumerate(prompt_history):
  329. full_history_text += f"-- 第 {i+1} 轮 --\n[Prompt]: {record['prompt']}\n[反馈]: {record['feedback']}\n\n"
  330. final_messages = [
  331. {
  332. "role": "system",
  333. "content": "你是首席AI打样架构师。目前的生图迭代优化工作流已拉下帷幕。你不需要拘泥于打分,而是要通过回顾整个演进历程,总结出‘最好用的 Prompt 模板’和‘最精准的评估反馈维度模板’。"
  334. },
  335. {
  336. "role": "user",
  337. "content": [
  338. {"type": "text", "text": "【目标参考图(原图)】:"},
  339. {"type": "image_url", "image_url": {"url": f"data:image/{target_ext if target_ext != 'jpg' else 'jpeg'};base64,{target_b64}"}},
  340. {"type": "text", "text": "这是最初第1轮盲试的生成图:"},
  341. {"type": "image_url", "image_url": {"url": f"data:image/{first_ext if first_ext != 'jpg' else 'jpeg'};base64,{first_b64}"}},
  342. {"type": "text", "text": f"这是经过迭代后的【最终生成图】:"},
  343. {"type": "image_url", "image_url": {"url": f"data:image/{last_ext if last_ext != 'jpg' else 'jpeg'};base64,{last_b64}"}},
  344. {"type": "text", "text": f"下面是 {len(prompt_history)} 轮迭代中,Prompt 和专家反馈的完整变迁记录:\n\n{full_history_text}\n\n请结合首尾图片的巨大差异以及中间的踩坑过程,深度复盘:\n1. 在构建生图 Prompt 时,哪些描述方式、句型或结构最能有效命中模型?请提炼出一个【最终版高转化率 Prompt 语法模板】。\n2. 在进行视觉反馈时,哪些维度的批评和建议对 Prompt 师是最具指导意义的?请提炼出一个【最终版高维度视觉评估反馈模板】。\n这两个模版需要具备极强的通用性和实战复用价值!"}
  345. ]
  346. }
  347. ]
  348. response = await gemini_llm_call(
  349. messages=final_messages,
  350. model="gemini-3.1-pro-preview"
  351. )
  352. print(f"\n[Agent 2] 📋 【最终多维度评估报告】:\n{response['content']}\n")
  353. except Exception as e:
  354. print(f"最终报告生成失败: {e}")
  355. if __name__ == "__main__":
  356. asyncio.run(main())