Browse Source

add knowledge extract and inject

guantao 5 days ago
parent
commit
c67f78b775

+ 127 - 148
agent/core/runner.py

@@ -26,7 +26,7 @@ from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal,
 from agent.trace.models import Trace, Message
 from agent.trace.models import Trace, Message
 from agent.trace.protocols import TraceStore
 from agent.trace.protocols import TraceStore
 from agent.trace.goal_models import GoalTree
 from agent.trace.goal_models import GoalTree
-from agent.tools.builtin.knowledge import _get_structured_knowledge, _batch_update_knowledge
+from agent.tools.builtin.knowledge import _get_structured_knowledge, _batch_update_knowledge, save_knowledge
 from agent.trace.compaction import (
 from agent.trace.compaction import (
     CompressionConfig,
     CompressionConfig,
     filter_by_goal_status,
     filter_by_goal_status,
@@ -228,13 +228,8 @@ class AgentRunner:
         self.utility_llm_call = utility_llm_call
         self.utility_llm_call = utility_llm_call
         self.config = config or AgentConfig()
         self.config = config or AgentConfig()
         self.skills_dir = skills_dir
         self.skills_dir = skills_dir
-        # 确保 experiences_path 不为 None,且文件存在
+        # 保留 experiences_path 参数以向后兼容,但不再使用(经验已迁移到知识系统)
         self.experiences_path = experiences_path or "./.cache/experiences.md"
         self.experiences_path = experiences_path or "./.cache/experiences.md"
-        if not os.path.exists(self.experiences_path):
-            os.makedirs(os.path.dirname(self.experiences_path), exist_ok=True)
-            with open(self.experiences_path, "w", encoding="utf-8") as f:
-                f.write("")
-            logger.info(f"自动创建经验文件: {self.experiences_path}")
         self.goal_tree = goal_tree
         self.goal_tree = goal_tree
         self.debug = debug
         self.debug = debug
         self._cancel_events: Dict[str, asyncio.Event] = {}  # trace_id → cancel event
         self._cancel_events: Dict[str, asyncio.Event] = {}  # trace_id → cancel event
@@ -243,6 +238,9 @@ class AgentRunner:
         # 研究流程状态管理(每个 trace 独立)
         # 研究流程状态管理(每个 trace 独立)
         self._research_states: Dict[str, Dict[str, Any]] = {}  # trace_id → research_state
         self._research_states: Dict[str, Dict[str, Any]] = {}  # trace_id → research_state
 
 
+        # 知识保存跟踪(每个 trace 独立)
+        self._saved_knowledge_ids: Dict[str, List[str]] = {}  # trace_id → [knowledge_ids]
+
     # ===== 核心公开方法 =====
     # ===== 核心公开方法 =====
 
 
     async def run(
     async def run(
@@ -356,11 +354,16 @@ class AgentRunner:
             status = "failed"
             status = "failed"
             error = error or "Agent 没有产生 assistant 文本结果"
             error = error or "Agent 没有产生 assistant 文本结果"
 
 
+        # 获取保存的知识 ID
+        trace_id = final_trace.trace_id if final_trace else config.trace_id
+        saved_knowledge_ids = self._saved_knowledge_ids.get(trace_id, [])
+
         return {
         return {
             "status": status,
             "status": status,
             "summary": summary,
             "summary": summary,
-            "trace_id": final_trace.trace_id if final_trace else config.trace_id,
+            "trace_id": trace_id,
             "error": error,
             "error": error,
+            "saved_knowledge_ids": saved_knowledge_ids,  # 新增:返回保存的知识 ID
             "stats": {
             "stats": {
                 "total_messages": final_trace.total_messages if final_trace else 0,
                 "total_messages": final_trace.total_messages if final_trace else 0,
                 "total_tokens": final_trace.total_tokens if final_trace else 0,
                 "total_tokens": final_trace.total_tokens if final_trace else 0,
@@ -670,20 +673,18 @@ class AgentRunner:
         """
         """
         初始化研究流程状态
         初始化研究流程状态
 
 
-        研究流程阶段:
-        1. knowledge_search: 知识库检索
-        2. experience_search: 经验库检索
-        3. research_decision: 决定是否需要调研
-        4. research: 执行调研(如果需要)
-        5. planning: 制定计划
-        6. execution: 正常执行
+        研究流程阶段(已简化):
+        1. research_decision: 决定是否需要调研(知识和经验已自动注入到 GoalTree)
+        2. research: 执行调研(如果需要)
+        3. planning: 制定计划
+        4. execution: 正常执行
         """
         """
         # 提取任务描述
         # 提取任务描述
         task_desc = self._extract_task_description(messages)
         task_desc = self._extract_task_description(messages)
 
 
-        # 初始化研究状态
+        # 初始化研究状态(直接从 research_decision 开始,因为知识已自动注入)
         self._research_states[trace_id] = {
         self._research_states[trace_id] = {
-            "stage": "knowledge_search",  # 当前阶段
+            "stage": "research_decision",  # 直接进入决策阶段
             "task_desc": task_desc,
             "task_desc": task_desc,
             "knowledge_found": False,
             "knowledge_found": False,
             "experience_found": False,
             "experience_found": False,
@@ -692,9 +693,10 @@ class AgentRunner:
             "planning_completed": False,
             "planning_completed": False,
             "knowledge_results": [],
             "knowledge_results": [],
             "experience_results": [],
             "experience_results": [],
+            "decision_guide_injected": False,  # 防止重复注入决策引导
         }
         }
 
 
-        logger.info(f"[Research Flow] 初始化研究流程: {task_desc[:50]}...")
+        logger.info(f"[Research Flow] 初始化研究流程(知识已自动注入): {task_desc[:50]}...")
 
 
     def _extract_task_description(self, messages: List[Dict]) -> str:
     def _extract_task_description(self, messages: List[Dict]) -> str:
         """从消息中提取任务描述"""
         """从消息中提取任务描述"""
@@ -725,37 +727,7 @@ class AgentRunner:
         stage = research_state["stage"]
         stage = research_state["stage"]
         task_desc = research_state["task_desc"]
         task_desc = research_state["task_desc"]
 
 
-        if stage == "knowledge_search":
-            return f"""
-## 🔍 研究流程 - 阶段 1: 知识库检索
-
-当前任务: {task_desc}
-
-**请立即执行以下操作**:
-1. 使用 `search_knowledge(query="{task_desc[:100]}")` 检索知识库
-2. 检查返回的知识是否足够完成任务
-3. 如果找到相关知识,记录下来并继续下一阶段
-
-注意:这是强制步骤,必须先检索知识库再继续。
-"""
-
-        elif stage == "experience_search":
-            knowledge_results = research_state.get("knowledge_results", [])
-            knowledge_summary = f"找到 {len(knowledge_results)} 条相关知识" if knowledge_results else "未找到相关知识"
-
-            return f"""
-## 🔍 研究流程 - 阶段 2: 经验库检索
-
-知识库检索结果: {knowledge_summary}
-
-**请立即执行以下操作**:
-1. 使用 `get_experience(query="{task_desc[:100]}", k=3)` 检索经验库
-2. 结合知识库和经验库的结果,评估是否需要进行调研
-3. 如果经验和知识库不足时,必须调用子agent来执行调研
-注意:这是强制步骤,必须检索经验库。
-"""
-
-        elif stage == "research":
+        if stage == "research":
             # 读取 research.md 的内容
             # 读取 research.md 的内容
             research_skill_content = ""
             research_skill_content = ""
             research_skill_path = os.path.join(
             research_skill_path = os.path.join(
@@ -770,7 +742,7 @@ class AgentRunner:
                 research_skill_content = "(无法加载 research.md 内容)"
                 research_skill_content = "(无法加载 research.md 内容)"
 
 
             return f"""
             return f"""
-## 📚 研究流程 - 阶段 4: 执行调研
+## 📚 研究流程 - 执行调研
 
 
 现有信息不足,需要进行调研。
 现有信息不足,需要进行调研。
 
 
@@ -783,7 +755,7 @@ class AgentRunner:
 
 
         elif stage == "planning":
         elif stage == "planning":
             return f"""
             return f"""
-## 📋 研究流程 - 阶段 5: 制定计划
+## 📋 研究流程 - 制定计划
 
 
 调研已完成(或无需调研),现在请制定执行计划。
 调研已完成(或无需调研),现在请制定执行计划。
 
 
@@ -799,50 +771,38 @@ class AgentRunner:
         return ""
         return ""
 
 
     def _build_research_decision_guide(self, research_state: Dict[str, Any]) -> str:
     def _build_research_decision_guide(self, research_state: Dict[str, Any]) -> str:
-        """构建调研决策阶段的引导消息(追加在经验检索消息后)"""
-        knowledge_results = research_state.get("knowledge_results", [])
+        """构建调研决策阶段的引导消息(基于已自动注入的知识和经验)"""
         experience_results = research_state.get("experience_results", [])
         experience_results = research_state.get("experience_results", [])
         task_desc = research_state.get("task_desc", "")
         task_desc = research_state.get("task_desc", "")
 
 
-        # 构建知识库摘要
-        knowledge_summary = "### 知识库检索结果\n\n"
-        if knowledge_results:
-            knowledge_summary += f"找到 {len(knowledge_results)} 条相关知识:\n\n"
-            for i, item in enumerate(knowledge_results[:5], 1):  # 最多显示 5 条
-                tags = item.get("tags", [])
-                summary = item.get("summary", "")
-                knowledge_summary += f"{i}. [{', '.join(tags)}] {summary[:150]}...\n"
-            if len(knowledge_results) > 5:
-                knowledge_summary += f"\n(还有 {len(knowledge_results) - 5} 条知识未显示)\n"
-        else:
-            knowledge_summary += "❌ 未找到相关知识\n"
-
-        # 构建经验库摘要
-        experience_summary = "\n### 经验库检索结果\n\n"
+        # 构建经验摘要
+        experience_summary = ""
         if experience_results:
         if experience_results:
-            experience_summary += f"找到 {len(experience_results)} 条相关经验\n"
+            experience_summary = f"✅ 已自动检索到 {len(experience_results)} 条相关经验(见上方 GoalTree 中的「📚 相关知识」)\n"
         else:
         else:
-            experience_summary += "❌ 未找到相关经验\n"
+            experience_summary = "❌ 未找到相关经验\n"
 
 
         return f"""
         return f"""
 ---
 ---
 
 
 ## 🤔 调研决策
 ## 🤔 调研决策
 
 
-{knowledge_summary}
 {experience_summary}
 {experience_summary}
 
 
-### 决策选项
+### 决策指南
+
+**当前状态**:系统已自动检索知识库和经验库,相关内容已注入到上方的 GoalTree 中(查看 Current Goal 下的「📚 相关知识」部分)。
+
+**请根据已注入的知识和经验,选择下一步行动**:
 
 
-**选项 1: 无需调研,直接制定计划**
-- 如果现有知识和经验已经足够
+**选项 1: 知识充足,直接制定计划**
+- 如果上方显示的知识和经验已经足够完成任务
 - 直接使用 `goal` 工具制定执行计划
 - 直接使用 `goal` 工具制定执行计划
 
 
-**选项 2: 需要深入调研** ⭐ 
-- 如果现有知识不足,需要进行网络搜索、文档查阅等调研
+**选项 2: 知识不足,需要调研** ⭐
+- 如果上方没有显示相关知识,或现有知识不足以完成任务
 - **立即调用 `agent` 工具启动调研子任务**:
 - **立即调用 `agent` 工具启动调研子任务**:
-- 如果没有经验/知识,必须要先调研才可以继续执行
-- 无论任务描述是否确定。都需要先调研
+
 ```python
 ```python
 agent(
 agent(
     task=\"\"\"针对任务「{task_desc[:100]}」进行深入调研:
     task=\"\"\"针对任务「{task_desc[:100]}」进行深入调研:
@@ -860,12 +820,14 @@ agent(
    - strategy: 策略经验
    - strategy: 策略经验
 
 
 调研完成后,系统会自动进入计划阶段。
 调研完成后,系统会自动进入计划阶段。
-\"\"\"
+\"\"\",
+    skills=["research"]  # 注入调研指南
 )
 )
 ```
 ```
 
 
 **重要提示**:
 **重要提示**:
-- 如果知识库和经验库都没有找到相关内容,必须选择调研!但只需要简单调研,最多设立两个goal。
+- 如果 GoalTree 中没有显示「📚 相关知识」,说明知识库为空,必须先调研
+- 调研应该简洁高效,最多设立两个 goal
 """
 """
 
 
     async def _handle_research_flow_transition(
     async def _handle_research_flow_transition(
@@ -882,53 +844,13 @@ agent(
 
 
         stage = research_state["stage"]
         stage = research_state["stage"]
 
 
-        # 阶段 1: 知识库检索完成
-        if stage == "knowledge_search" and tool_name == "search_knowledge":
-            # 提取知识检索结果
-            # tool_result 现在总是 dict,包含 {"text": ..., "metadata": ...}
-            items = []
-            if isinstance(tool_result, dict):
-                metadata = tool_result.get("metadata", {})
-                items = metadata.get("items", [])
-
-            self._update_research_stage(
-                trace_id,
-                "experience_search",
-                knowledge_found=len(items) > 0,
-                knowledge_results=items
-            )
-            logger.info(f"[Research Flow] 知识检索完成,找到 {len(items)} 条知识")
-
-        # 阶段 2: 经验库检索完成
-        elif stage == "experience_search" and tool_name == "get_experience":
-            # 提取经验检索结果
-            experience_items = []
-            if isinstance(tool_result, dict):
-                experience_items = tool_result.get("items", [])
-            elif isinstance(tool_result, str):
-                # 解析字符串结果
-                try:
-                    import json
-                    parsed = json.loads(tool_result)
-                    experience_items = parsed.get("items", [])
-                except:
-                    pass
-
-            self._update_research_stage(
-                trace_id,
-                "research_decision",
-                experience_found=len(experience_items) > 0,
-                experience_results=experience_items
-            )
-            logger.info(f"[Research Flow] 经验检索完成,找到 {len(experience_items)} 条经验")
-
-        # 阶段 3: 调研决策(通过 assistant 的文本回复判断)
-        # 这个阶段的转换在 assistant 回复后处理,不在这里
+        # 阶段 1: 调研决策(通过 assistant 的文本回复或 agent 工具调用判断)
+        # 这个阶段的转换在 assistant 回复后处理,或检测到 agent 工具调用
 
 
-        # 阶段 4: 调研完成
+        # 阶段 2: 调研完成
         # 情况 1: 检测到 save_knowledge 调用(直接调研)
         # 情况 1: 检测到 save_knowledge 调用(直接调研)
         # 情况 2: 检测到 agent 工具执行完成(子 agent 调研)
         # 情况 2: 检测到 agent 工具执行完成(子 agent 调研)
-        elif stage == "research":
+        if stage == "research":
             if tool_name == "save_knowledge":
             if tool_name == "save_knowledge":
                 # 直接调研:检测到 save_knowledge 调用
                 # 直接调研:检测到 save_knowledge 调用
                 self._update_research_stage(
                 self._update_research_stage(
@@ -946,7 +868,7 @@ agent(
                 )
                 )
                 logger.info(f"[Research Flow] 调研完成(子 agent 调研),进入计划阶段")
                 logger.info(f"[Research Flow] 调研完成(子 agent 调研),进入计划阶段")
 
 
-        # 阶段 5: 计划完成(检测到 goal 工具调用)
+        # 阶段 3: 计划完成(检测到 goal 工具调用)
         elif stage == "planning" and tool_name == "goal":
         elif stage == "planning" and tool_name == "goal":
             # 检查是否创建了 goal tree
             # 检查是否创建了 goal tree
             if goal_tree and goal_tree.goals:
             if goal_tree and goal_tree.goals:
@@ -1150,9 +1072,14 @@ agent(
                             query_text=current_goal.description,
                             query_text=current_goal.description,
                             top_k=3,
                             top_k=3,
                             context={"runner": self},
                             context={"runner": self},
-                            tags_filter=["strategy"]  # 只检索经验(strategy 标签)
                         )
                         )
                         if relevant_exps:
                         if relevant_exps:
+                            # 保存到 goal 对象
+                            current_goal.knowledge = relevant_exps
+                            logger.info(f"[Knowledge Injection] 已将 {len(relevant_exps)} 条知识注入到 goal {current_goal.id}: {current_goal.description[:40]}")
+                            logger.debug(f"[Knowledge Injection] 注入的知识 IDs: {[exp.get('id') for exp in relevant_exps]}")
+                            # 持久化保存 goal_tree
+                            await self.trace_store.update_goal_tree(trace_id, goal_tree)
                             self.used_ex_ids = [exp['id'] for exp in relevant_exps]
                             self.used_ex_ids = [exp['id'] for exp in relevant_exps]
                             parts = [f"[{exp['id']}] {exp['content']}" for exp in relevant_exps]
                             parts = [f"[{exp['id']}] {exp['content']}" for exp in relevant_exps]
                             _cached_exp_text = "## 参考历史经验\n" + "\n\n".join(parts)
                             _cached_exp_text = "## 参考历史经验\n" + "\n\n".join(parts)
@@ -1163,6 +1090,10 @@ agent(
                                 self.used_ex_ids,
                                 self.used_ex_ids,
                             )
                             )
                         else:
                         else:
+                            current_goal.knowledge = []
+                            logger.info(f"[Knowledge Injection] goal {current_goal.id} 未找到相关知识")
+                            # 持久化保存 goal_tree
+                            await self.trace_store.update_goal_tree(trace_id, goal_tree)
                             _cached_exp_text = ""
                             _cached_exp_text = ""
                             logger.info(
                             logger.info(
                                 "经验检索: goal='%s', 未找到相关经验",
                                 "经验检索: goal='%s', 未找到相关经验",
@@ -1170,29 +1101,22 @@ agent(
                             )
                             )
                     except Exception as e:
                     except Exception as e:
                         logger.warning("经验检索失败: %s", e)
                         logger.warning("经验检索失败: %s", e)
+                        current_goal.knowledge = []
                         _cached_exp_text = ""
                         _cached_exp_text = ""
 
 
-                    # 如果处于 experience_search 阶段,自动转换到 research_decision 阶段
-                    research_state = self._get_research_state(trace_id)
-                    if research_state and research_state["stage"] == "experience_search":
-                        self._update_research_stage(
-                            trace_id,
-                            "research_decision",
-                            experience_found=len(relevant_exps) > 0 if relevant_exps else False,
-                            experience_results=relevant_exps if relevant_exps else []
-                        )
-                        logger.info(f"[Research Flow] 经验检索完成(自动触发),找到 {len(relevant_exps) if relevant_exps else 0} 条经验,进入调研决策阶段")
-
             # 经验注入:goal切换时注入相关历史经验 - 改为 user 消息
             # 经验注入:goal切换时注入相关历史经验 - 改为 user 消息
             # 或者在 research_decision 阶段注入调研决策引导
             # 或者在 research_decision 阶段注入调研决策引导
-            if _cached_exp_text or (research_state and research_state["stage"] == "research_decision"):
+            if _cached_exp_text or (research_state and research_state["stage"] == "research_decision" and not research_state.get("decision_guide_injected", False)):
                 exp_content = _cached_exp_text if _cached_exp_text else ""
                 exp_content = _cached_exp_text if _cached_exp_text else ""
 
 
                 # 如果处于 research_decision 阶段,追加引导消息
                 # 如果处于 research_decision 阶段,追加引导消息
-                if research_state and research_state["stage"] == "research_decision":
+                if research_state and research_state["stage"] == "research_decision" and not research_state.get("decision_guide_injected", False):
                     if exp_content:
                     if exp_content:
                         exp_content += "\n\n"
                         exp_content += "\n\n"
                     exp_content += self._build_research_decision_guide(research_state)
                     exp_content += self._build_research_decision_guide(research_state)
+                    # 标记已注入,防止重复
+                    research_state["decision_guide_injected"] = True
+                    logger.info("[Research Flow] 已注入调研决策引导消息")
 
 
                 if exp_content:  # 确保有内容才注入
                 if exp_content:  # 确保有内容才注入
                     user_msg = {"role": "user", "content": exp_content}
                     user_msg = {"role": "user", "content": exp_content}
@@ -1399,6 +1323,16 @@ agent(
                         }
                         }
                     )
                     )
 
 
+                    # 跟踪保存的知识 ID
+                    if tool_name == "save_knowledge" and isinstance(tool_result, dict):
+                        metadata = tool_result.get("metadata", {})
+                        knowledge_id = metadata.get("knowledge_id")
+                        if knowledge_id:
+                            if trace_id not in self._saved_knowledge_ids:
+                                self._saved_knowledge_ids[trace_id] = []
+                            self._saved_knowledge_ids[trace_id].append(knowledge_id)
+                            logger.info(f"[Knowledge Tracking] 记录保存的知识 ID: {knowledge_id}")
+
                     # --- 支持多模态工具反馈 ---
                     # --- 支持多模态工具反馈 ---
                     # execute() 返回 dict{"text", "images", "metadata"} 或 str
                     # execute() 返回 dict{"text", "images", "metadata"} 或 str
                     if isinstance(tool_result, dict):
                     if isinstance(tool_result, dict):
@@ -1594,10 +1528,54 @@ created_at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
                     structured_entries.append(entry)
                     structured_entries.append(entry)
 
 
                 if structured_entries:
                 if structured_entries:
-                    os.makedirs(os.path.dirname(self.experiences_path), exist_ok=True)
-                    with open(self.experiences_path, "a", encoding="utf-8") as f:
-                        f.write("\n\n" + "\n\n".join(structured_entries))
-                    logger.info(f"已提取并保存 {len(structured_entries)} 条结构化经验")
+                    # 保存经验为知识(strategy 标签)
+                    saved_count = 0
+                    for entry in structured_entries:
+                        try:
+                            # 从 entry 中提取信息
+                            lines = entry.split("\n")
+                            ex_id = ""
+                            intents = []
+                            states = []
+                            content = ""
+
+                            for line in lines:
+                                if line.startswith("id:"):
+                                    ex_id = line.split(":", 1)[1].strip()
+                                elif line.startswith("tags:"):
+                                    tags_match = _re2.search(r"intent:\s*\[(.*?)\].*state:\s*\[(.*?)\]", line)
+                                    if tags_match:
+                                        intents_str = tags_match.group(1).strip("'\"")
+                                        states_str = tags_match.group(2).strip("'\"")
+                                        intents = [i.strip().strip("'\"") for i in intents_str.split(",") if i.strip()]
+                                        states = [s.strip().strip("'\"") for s in states_str.split(",") if s.strip()]
+                                elif line.startswith("- ") and not line.startswith("- 经验ID:"):
+                                    content = line[2:].strip()
+
+                            # 构建 scenario(从 intent 和 state 生成)
+                            scenario_parts = []
+                            if intents:
+                                scenario_parts.append(f"意图: {', '.join(intents)}")
+                            if states:
+                                scenario_parts.append(f"状态: {', '.join(states)}")
+                            scenario = " | ".join(scenario_parts) if scenario_parts else "通用经验"
+
+                            # 调用 save_knowledge 保存为 strategy 标签的知识
+                            await save_knowledge(
+                                scenario=scenario,
+                                content=content,
+                                tags_type=["strategy"],
+                                urls=[],
+                                agent_id="runner",
+                                score=3,
+                                trace_id=trace_id
+                            )
+                            saved_count += 1
+                        except Exception as e:
+                            logger.warning(f"保存经验失败: {e}")
+                            continue
+
+                    logger.info(f"已提取并保存 {saved_count}/{len(structured_entries)} 条结构化经验到知识库")
                 else:
                 else:
                     logger.warning("未能解析出符合格式的经验条目,请检查 REFLECT_PROMPT。")
                     logger.warning("未能解析出符合格式的经验条目,请检查 REFLECT_PROMPT。")
                     logger.debug(f"LLM Raw Output:\n{reflection_text}")
                     logger.debug(f"LLM Raw Output:\n{reflection_text}")
@@ -1644,16 +1622,17 @@ created_at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
                     import re as _re
                     import re as _re
                     update_map = {}
                     update_map = {}
                     for line in eval_block.splitlines():
                     for line in eval_block.splitlines():
-                        m = _re.search(r"ID:\s*(ex_\S+)\s*\|\s*Result:\s*(\w+)", line)
+                        # 匹配新的知识 ID 格式:knowledge-xxx 或 research-xxx
+                        m = _re.search(r"ID:\s*((?:knowledge|research)-\S+)\s*\|\s*Result:\s*(\w+)", line)
                         if m:
                         if m:
-                            ex_id, result = m.group(1), m.group(2).lower()
+                            knowledge_id, result = m.group(1), m.group(2).lower()
                             if result in ("helpful", "harmful"):
                             if result in ("helpful", "harmful"):
-                                update_map[ex_id] = {"action": result, "feedback": ""}
+                                update_map[knowledge_id] = {"action": result, "feedback": ""}
                             elif result == "mixed":
                             elif result == "mixed":
-                                update_map[ex_id] = {"action": "helpful", "feedback": ""}
+                                update_map[knowledge_id] = {"action": "helpful", "feedback": ""}
                     if update_map:
                     if update_map:
                         count = await _batch_update_knowledge(update_map, context={"runner": self})
                         count = await _batch_update_knowledge(update_map, context={"runner": self})
-                        logger.info("经验评估完成,更新了 %s 条经验", count)
+                        logger.info("知识评估完成,更新了 %s 条知识", count)
             except Exception as e:
             except Exception as e:
                 logger.warning("经验评估解析失败(不影响压缩): %s", e)
                 logger.warning("经验评估解析失败(不影响压缩): %s", e)
 
 

+ 76 - 7
agent/tools/builtin/knowledge.py

@@ -19,8 +19,11 @@ logger = logging.getLogger(__name__)
 
 
 
 
 def _generate_knowledge_id() -> str:
 def _generate_knowledge_id() -> str:
-    """生成知识原子 ID"""
-    return f"research-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
+    """生成知识原子 ID(带微秒和随机后缀避免冲突)"""
+    import uuid
+    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
+    random_suffix = uuid.uuid4().hex[:4]
+    return f"knowledge-{timestamp}-{random_suffix}"
 
 
 
 
 def _format_yaml_list(items: List[str], indent: int = 2) -> str:
 def _format_yaml_list(items: List[str], indent: int = 2) -> str:
@@ -336,7 +339,7 @@ async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k:
 可选知识列表:
 可选知识列表:
 {json.dumps(routing_data, ensure_ascii=False, indent=1)}
 {json.dumps(routing_data, ensure_ascii=False, indent=1)}
 
 
-请直接输出 ID 列表,用逗号分隔(例如: research-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
+请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
 """
 """
 
 
     try:
     try:
@@ -348,7 +351,7 @@ async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k:
         )
         )
 
 
         content = response.get("content", "").strip()
         content = response.get("content", "").strip()
-        selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("research-")]
+        selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
 
 
         print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
         print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
         return selected_ids
         return selected_ids
@@ -421,7 +424,7 @@ async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k:
 可选知识列表:
 可选知识列表:
 {json.dumps(routing_data, ensure_ascii=False, indent=1)}
 {json.dumps(routing_data, ensure_ascii=False, indent=1)}
 
 
-请直接输出 ID 列表,用逗号分隔(例如: research-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
+请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
 """
 """
 
 
     try:
     try:
@@ -433,7 +436,7 @@ async def _route_knowledge_by_llm(query_text: str, metadata_list: List[Dict], k:
         )
         )
 
 
         content = response.get("content", "").strip()
         content = response.get("content", "").strip()
-        selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith("research-")]
+        selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
 
 
         print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
         print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
         return selected_ids
         return selected_ids
@@ -534,6 +537,7 @@ async def _get_structured_knowledge(
             content_map[kid] = {
             content_map[kid] = {
                 "scenario": scenario,
                 "scenario": scenario,
                 "content": content_text,
                 "content": content_text,
+                "tags": tags,
                 "score": meta_item["score"],
                 "score": meta_item["score"],
                 "helpful": meta_item["helpful"],
                 "helpful": meta_item["helpful"],
                 "harmful": meta_item["harmful"],
                 "harmful": meta_item["harmful"],
@@ -572,8 +576,13 @@ async def _get_structured_knowledge(
                 "id": kid,
                 "id": kid,
                 "scenario": item["scenario"],
                 "scenario": item["scenario"],
                 "content": item["content"],
                 "content": item["content"],
+                "tags": item["tags"],
                 "score": score,
                 "score": score,
-                "quality_score": quality_score
+                "quality_score": quality_score,
+                "metrics": {
+                    "helpful": helpful,
+                    "harmful": harmful
+                }
             })
             })
 
 
     # 按照质量分排序
     # 按照质量分排序
@@ -650,6 +659,66 @@ async def search_knowledge(
         )
         )
 
 
 
 
+@tool(description="通过两阶段检索获取最相关的历史经验(strategy 标签的知识)")
+async def get_experience(
+    query: str,
+    k: int = 3,
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """
+    检索历史经验(兼容旧接口,实际调用 search_knowledge 并过滤 strategy 标签)
+
+    Args:
+        query: 搜索查询(任务描述)
+        k: 返回数量(默认 3)
+        context: 工具上下文
+
+    Returns:
+        相关经验列表
+    """
+    try:
+        relevant_items = await _get_structured_knowledge(
+            query_text=query,
+            top_k=k,
+            min_score=1,  # 经验的评分门槛较低
+            context=context,
+            tags_filter=["strategy"]  # 只返回经验
+        )
+
+        if not relevant_items:
+            return ToolResult(
+                title="🔍 未找到相关经验",
+                output=f"查询: {query}\n\n经验库中暂无相关的经验。",
+                long_term_memory=f"经验检索: 未找到相关经验 - {query[:50]}",
+                metadata={"items": [], "count": 0}
+            )
+
+        # 格式化输出(兼容旧格式)
+        output_lines = [f"查询: {query}\n", f"找到 {len(relevant_items)} 条相关经验:\n"]
+
+        for idx, item in enumerate(relevant_items, 1):
+            output_lines.append(f"\n### {idx}. [{item['id']}]")
+            output_lines.append(f"{item['content'][:300]}...")
+
+        return ToolResult(
+            title="✅ 经验检索成功",
+            output="\n".join(output_lines),
+            long_term_memory=f"经验检索: 找到 {len(relevant_items)} 条相关经验 - {query[:50]}",
+            metadata={
+                "items": relevant_items,
+                "count": len(relevant_items)
+            }
+        )
+
+    except Exception as e:
+        logger.error(f"经验检索失败: {e}")
+        return ToolResult(
+            title="❌ 检索失败",
+            output=f"错误: {str(e)}",
+            error=str(e)
+        )
+
+
 # ===== 批量更新功能(类似经验机制)=====
 # ===== 批量更新功能(类似经验机制)=====
 
 
 async def _batch_update_knowledge(
 async def _batch_update_knowledge(

+ 11 - 0
agent/tools/builtin/subagent.py

@@ -133,6 +133,16 @@ def _format_single_result(result: Dict[str, Any], sub_trace_id: str, continued:
     if summary:
     if summary:
         lines.append(summary)
         lines.append(summary)
         lines.append("")
         lines.append("")
+
+    # 添加保存的知识 ID
+    saved_knowledge_ids = result.get("saved_knowledge_ids", [])
+    if saved_knowledge_ids:
+        lines.append("---\n")
+        lines.append(f"**保存的知识** ({len(saved_knowledge_ids)} 条):")
+        for kid in saved_knowledge_ids:
+            lines.append(f"- {kid}")
+        lines.append("")
+
     lines.append("---\n")
     lines.append("---\n")
     lines.append("**执行统计**:")
     lines.append("**执行统计**:")
     stats = result.get("stats", {})
     stats = result.get("stats", {})
@@ -146,6 +156,7 @@ def _format_single_result(result: Dict[str, Any], sub_trace_id: str, continued:
         "mode": "delegate",
         "mode": "delegate",
         "sub_trace_id": sub_trace_id,
         "sub_trace_id": sub_trace_id,
         "continue_from": continued,
         "continue_from": continued,
+        "saved_knowledge_ids": saved_knowledge_ids,  # 传递给父 agent
         **result,
         **result,
         "summary": formatted_summary,
         "summary": formatted_summary,
     }
     }

+ 5 - 5
agent/trace/compaction.py

@@ -298,13 +298,13 @@ def needs_level2_compression(
 
 
 # ===== Level 2: 压缩 Prompt =====
 # ===== Level 2: 压缩 Prompt =====
 
 
-COMPRESSION_EVAL_PROMPT = """请对以上对话历史进行压缩总结,并评价所引用的历史经验。
-### 任务 1:评价已用经验
-本次任务参考了以下经验内容:{ex_reference_list}
+COMPRESSION_EVAL_PROMPT = """请对以上对话历史进行压缩总结,并评价所引用的历史知识/经验。
+### 任务 1:评价已用知识
+本次任务参考了以下知识内容:{ex_reference_list}
 
 
-请对比“经验建议”与“实际执行轨迹”,给出三色打分:
+请对比”知识建议”与”实际执行轨迹”,给出三色打分:
 [[EVALUATION]]
 [[EVALUATION]]
-ID: ex_xxx | Result: helpful/harmful/mixed | Reason: [优点]... [局限/修正]...
+ID: knowledge-xxx 或 research-xxx | Result: helpful/harmful/mixed | Reason: [优点]... [局限/修正]...
 
 
 ### 任务 2:对话历史摘要
 ### 任务 2:对话历史摘要
 要求:
 要求:

+ 14 - 0
agent/trace/goal_models.py

@@ -70,6 +70,9 @@ class Goal:
     self_stats: GoalStats = field(default_factory=GoalStats)          # 自身统计(仅直接关联的 messages)
     self_stats: GoalStats = field(default_factory=GoalStats)          # 自身统计(仅直接关联的 messages)
     cumulative_stats: GoalStats = field(default_factory=GoalStats)    # 累计统计(自身 + 所有后代)
     cumulative_stats: GoalStats = field(default_factory=GoalStats)    # 累计统计(自身 + 所有后代)
 
 
+    # 相关知识(自动检索注入)
+    knowledge: Optional[List[Dict[str, Any]]] = None                  # 相关知识列表
+
     created_at: datetime = field(default_factory=datetime.now)
     created_at: datetime = field(default_factory=datetime.now)
 
 
     def to_dict(self) -> Dict[str, Any]:
     def to_dict(self) -> Dict[str, Any]:
@@ -87,6 +90,7 @@ class Goal:
             "sub_trace_metadata": self.sub_trace_metadata,
             "sub_trace_metadata": self.sub_trace_metadata,
             "self_stats": self.self_stats.to_dict(),
             "self_stats": self.self_stats.to_dict(),
             "cumulative_stats": self.cumulative_stats.to_dict(),
             "cumulative_stats": self.cumulative_stats.to_dict(),
+            "knowledge": self.knowledge,
             "created_at": self.created_at.isoformat() if self.created_at else None,
             "created_at": self.created_at.isoformat() if self.created_at else None,
         }
         }
 
 
@@ -118,6 +122,7 @@ class Goal:
             sub_trace_metadata=data.get("sub_trace_metadata"),
             sub_trace_metadata=data.get("sub_trace_metadata"),
             self_stats=self_stats,
             self_stats=self_stats,
             cumulative_stats=cumulative_stats,
             cumulative_stats=cumulative_stats,
+            knowledge=data.get("knowledge"),
             created_at=created_at or datetime.now(),
             created_at=created_at or datetime.now(),
         )
         )
 
 
@@ -395,6 +400,15 @@ class GoalTree:
             if goal.summary and (include_summary or goal.id in current_path):
             if goal.summary and (include_summary or goal.id in current_path):
                 result.append(f"{prefix}    → {goal.summary}")
                 result.append(f"{prefix}    → {goal.summary}")
 
 
+            # 显示相关知识:仅在当前焦点 goal 显示
+            if goal.id == self.current_id and goal.knowledge:
+                result.append(f"{prefix}    📚 相关知识 ({len(goal.knowledge)} 条):")
+                for idx, k in enumerate(goal.knowledge[:3], 1):
+                    k_id = k.get('id', 'N/A')
+                    # 将多行内容压缩为单行摘要
+                    k_content = k.get('content', '').replace('\n', ' ').strip()[:80]
+                    result.append(f"{prefix}       {idx}. [{k_id}] {k_content}...")
+
             # 递归处理子目标
             # 递归处理子目标
             children = self.get_children(goal.id)
             children = self.get_children(goal.id)
 
 

+ 1 - 7
examples/restore/production.prompt

@@ -5,13 +5,7 @@ temperature: 0.3
 
 
 $system$
 $system$
 你是一个顶尖的多模态内容还原专家。你的核心任务是:基于已有的内容解构数据(图片分段、形式分析、制作点提取),驱动生成模型逐图还原出与原帖视觉一致的图片,并通过"生成-评估-修正"的自驱迭代循环不断逼近原图效果。
 你是一个顶尖的多模态内容还原专家。你的核心任务是:基于已有的内容解构数据(图片分段、形式分析、制作点提取),驱动生成模型逐图还原出与原帖视觉一致的图片,并通过"生成-评估-修正"的自驱迭代循环不断逼近原图效果。
-
-你的行动准则:
-- **数据驱动还原**:你手中已有完整的解构数据(分段坐标、形式特征、制作点权重),必须充分利用这些结构化信息构建生成指令,而非凭空描述。
-- **大模型原生思维**:优先使用大模型自身的视觉理解能力。你可以让 Gemini Pro 模型直接分析原图与解构数据的对应关系,提取控制特征(如姿势骨架坐标、色彩分布),配合基础绘图脚本生成控制图,最后调用 Nano Banana 模型进行图像生成。
-- **大模型驱动评估**:在还原测试阶段,不需要编写像素级对比代码。你必须制定明确的评估标准,将原图与生成图一并提交给 Gemini Pro 模型进行语义级视觉比对。
-- **Goal 驱动执行**:使用 goal 工具将任务拆解为清晰的子目标,按 goal 顺序执行,确保每个阶段都有明确的交付物。
-
+但要注意在必要时执行调研,只需要简短的调研即可。可利用browser工具和search_post工具。
 $user$
 $user$
 **任务目标**
 **任务目标**
 基于 `examples/restore/input/` 中的解构数据,还原一组图片内容。
 基于 `examples/restore/input/` 中的解构数据,还原一组图片内容。

+ 276 - 0
examples/restore/prompt_generator.py

@@ -0,0 +1,276 @@
+"""
+图片还原 Prompt 生成系统
+基于解构数据自动构建高质量的图片生成prompt
+"""
+
+import json
+from pathlib import Path
+from typing import Dict, List, Any
+
+
+class PromptGenerator:
+    """Prompt生成器"""
+    
+    def __init__(self, input_dir: str = "input/paragraphs"):
+        self.input_dir = Path(input_dir)
+        self.global_elements = self._load_global_elements()
+        self.global_forms = self._load_global_forms()
+    
+    def _load_global_elements(self) -> List[Dict]:
+        """加载全局实质元素(跨图聚合)"""
+        file_path = self.input_dir / "03_图片制作点实质结果.json"
+        with open(file_path, 'r', encoding='utf-8') as f:
+            return json.load(f)
+    
+    def _load_global_forms(self) -> List[Dict]:
+        """加载全局形式特征(跨图聚合)"""
+        file_path = self.input_dir / "04_图片制作点形式结果.json"
+        with open(file_path, 'r', encoding='utf-8') as f:
+            return json.load(f)
+    
+    def _load_image_segment(self, image_num: int) -> Dict:
+        """加载指定图片的分段数据"""
+        # 查找对应的分段文件
+        pattern = f"01_图片分段_{image_num:02d}_*.json"
+        files = list(self.input_dir.glob(pattern))
+        if not files:
+            raise FileNotFoundError(f"未找到图片{image_num}的分段文件")
+        
+        with open(files[0], 'r', encoding='utf-8') as f:
+            return json.load(f)
+    
+    def _load_image_form(self, image_num: int) -> Dict:
+        """加载指定图片的形式分析"""
+        file_path = self.input_dir / f"02_图片形式_{image_num:02d}.json"
+        with open(file_path, 'r', encoding='utf-8') as f:
+            return json.load(f)
+    
+    def _extract_key_descriptions(self, segment_data: Dict, form_data: Dict) -> Dict[str, str]:
+        """提取关键描述信息"""
+        descriptions = {
+            "scene": "",
+            "person": "",
+            "person_pose": "",
+            "person_clothing": "",
+            "person_hair": "",
+            "easel": "",
+            "palette": "",
+            "background": "",
+            "details": []
+        }
+        
+        # 从分段数据提取
+        sections = segment_data.get("sections", [])
+        if sections:
+            main_section = sections[0]
+            descriptions["scene"] = main_section.get("描述", "")
+            
+            # 遍历子段落
+            for sub in main_section.get("子段落", []):
+                name = sub.get("名称", "")
+                desc = sub.get("描述", "")
+                
+                if "人物" in name:
+                    descriptions["person"] = desc
+                elif "画架" in name:
+                    descriptions["easel"] = desc
+                elif "调色板" in name:
+                    descriptions["palette"] = desc
+                elif "背景" in name:
+                    descriptions["background"] = desc
+        
+        # 从形式数据提取细节
+        form_elements = form_data.get("form_elements", [])
+        for elem_group in form_elements:
+            for form in elem_group.get("形式", []):
+                name = form.get("名称", "")
+                desc = form.get("描述", "")
+                
+                if "人物姿态" in name:
+                    descriptions["person_pose"] = desc
+                elif "人物着装" in name:
+                    descriptions["person_clothing"] = desc
+                elif "人物发型" in name:
+                    descriptions["person_hair"] = desc
+                elif name not in ["人物", "背景", "画架", "调色板"]:
+                    descriptions["details"].append(f"{name}: {desc}")
+        
+        return descriptions
+    
+    def _build_prompt_structure(self, descriptions: Dict[str, str], group_id: str) -> str:
+        """
+        构建结构化prompt
+        顺序:[主体] + [描述性属性] + [场景环境] + [光照条件] + [情感氛围] + [构图方式] + [艺术风格]
+        """
+        prompt_parts = []
+        
+        # 1. 主体描述(人物 + 道具)
+        if descriptions["person"]:
+            prompt_parts.append(descriptions["person"])
+        
+        # 2. 描述性属性(姿态、着装)
+        if descriptions["person_pose"]:
+            # 简化姿态描述,提取关键动作
+            pose_simplified = self._simplify_pose(descriptions["person_pose"])
+            prompt_parts.append(pose_simplified)
+        
+        if descriptions["person_clothing"]:
+            # 简化着装描述
+            clothing_simplified = self._simplify_clothing(descriptions["person_clothing"])
+            prompt_parts.append(clothing_simplified)
+        
+        # 3. 道具细节
+        if "g1" in group_id or "g2" in group_id:  # 户外绘画场景
+            if descriptions["easel"]:
+                prompt_parts.append(descriptions["easel"])
+            if descriptions["palette"]:
+                prompt_parts.append(descriptions["palette"])
+        
+        # 4. 场景环境
+        if descriptions["background"]:
+            prompt_parts.append(descriptions["background"])
+        
+        # 5. 光照条件(从全局形式特征推断)
+        prompt_parts.append("Natural outdoor lighting, bright and soft sunlight")
+        
+        # 6. 情感氛围
+        if "g3" in group_id:  # 人物特写
+            prompt_parts.append("Peaceful and serene atmosphere, eyes closed in contemplation")
+        else:  # 绘画场景
+            prompt_parts.append("Focused and creative atmosphere, artist at work")
+        
+        # 7. 艺术风格
+        prompt_parts.append("Photorealistic style, high quality photography, professional composition")
+        
+        # 组合成完整prompt
+        prompt = ". ".join(filter(None, prompt_parts)) + "."
+        
+        return prompt
+    
+    def _simplify_pose(self, pose_desc: str) -> str:
+        """简化姿态描述,提取关键信息"""
+        # 提取关键动作词
+        key_actions = []
+        if "站立" in pose_desc:
+            key_actions.append("standing")
+        if "侧身" in pose_desc or "侧向" in pose_desc:
+            key_actions.append("side view")
+        if "蹲" in pose_desc:
+            key_actions.append("crouching")
+        if "背对" in pose_desc:
+            key_actions.append("back view")
+        if "持画笔" in pose_desc:
+            key_actions.append("holding a paintbrush")
+        if "持调色板" in pose_desc or "托举调色板" in pose_desc:
+            key_actions.append("holding a palette")
+        
+        return ", ".join(key_actions) if key_actions else pose_desc[:100]
+    
+    def _simplify_clothing(self, clothing_desc: str) -> str:
+        """简化着装描述"""
+        # 提取关键服饰信息
+        simplified = []
+        if "白色" in clothing_desc and "连衣裙" in clothing_desc:
+            simplified.append("white dress")
+        if "长袖" in clothing_desc:
+            simplified.append("long sleeves")
+        if "V字形领口" in clothing_desc or "V领" in clothing_desc:
+            simplified.append("V-neck")
+        
+        return ", ".join(simplified) if simplified else clothing_desc[:100]
+    
+    def generate_prompt(self, image_num: int) -> Dict[str, Any]:
+        """
+        为指定图片生成prompt
+        
+        Args:
+            image_num: 图片编号 (1-9)
+        
+        Returns:
+            包含prompt和元数据的字典
+        """
+        # 加载数据
+        segment_data = self._load_image_segment(image_num)
+        form_data = self._load_image_form(image_num)
+        
+        # 确定分组
+        segment_file = list(self.input_dir.glob(f"01_图片分段_{image_num:02d}_*.json"))[0]
+        group_id = "g1"  # 默认
+        if "g2" in segment_file.name:
+            group_id = "g2"
+        elif "g3" in segment_file.name:
+            group_id = "g3"
+        
+        # 提取关键描述
+        descriptions = self._extract_key_descriptions(segment_data, form_data)
+        
+        # 构建prompt
+        prompt = self._build_prompt_structure(descriptions, group_id)
+        
+        # 确定图片尺寸(竖版)
+        size = "1024x1792"  # DALL-E 3 竖版尺寸
+        
+        return {
+            "image_num": image_num,
+            "group_id": group_id,
+            "prompt": prompt,
+            "size": size,
+            "quality": "hd",
+            "descriptions": descriptions
+        }
+    
+    def generate_all_prompts(self) -> List[Dict[str, Any]]:
+        """生成所有9张图片的prompts"""
+        prompts = []
+        for i in range(1, 10):
+            try:
+                prompt_data = self.generate_prompt(i)
+                prompts.append(prompt_data)
+                print(f"✓ 图片 {i} prompt已生成")
+            except Exception as e:
+                print(f"✗ 图片 {i} 生成失败: {e}")
+        
+        return prompts
+    
+    def save_prompts(self, prompts: List[Dict], output_file: str = "output_1/prompts.json"):
+        """保存生成的prompts到文件"""
+        output_path = Path(output_file)
+        output_path.parent.mkdir(parents=True, exist_ok=True)
+        
+        with open(output_path, 'w', encoding='utf-8') as f:
+            json.dump(prompts, f, ensure_ascii=False, indent=2)
+        
+        print(f"\n✓ Prompts已保存到: {output_path}")
+
+
+def main():
+    """主函数:生成并保存所有prompts"""
+    print("=" * 60)
+    print("图片还原 Prompt 生成系统")
+    print("=" * 60)
+    
+    # 创建生成器
+    generator = PromptGenerator()
+    
+    # 生成所有prompts
+    print("\n开始生成prompts...")
+    prompts = generator.generate_all_prompts()
+    
+    # 保存结果
+    generator.save_prompts(prompts)
+    
+    # 打印预览
+    print("\n" + "=" * 60)
+    print("Prompt 预览(前3个):")
+    print("=" * 60)
+    for i, p in enumerate(prompts[:3], 1):
+        print(f"\n图片 {i} ({p['group_id']}):")
+        print(f"Prompt: {p['prompt'][:200]}...")
+    
+    print("\n" + "=" * 60)
+    print(f"✓ 完成!共生成 {len(prompts)} 个prompts")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    main()