elksmmx há 1 dia atrás
pai
commit
7c791135a6

+ 5 - 4
examples/process_pipeline/prompts/apply_to_grounding.prompt

@@ -3,7 +3,7 @@
 # 绝对规则
 
 - 不要调用任何工具,不要查树;只能使用本 prompt 中给出的完整内容树。
-- 只处理:{target}
+- 只处理:fragments 数组中的每一条 fragment
 - **只输出 apply_to 字段**,不要回显 inputs、outputs、action、body、effects、stage、tools、criterion、unstructured_what 等字段。
 - apply_to.实质 只能选择 source_type=实质 的节点;apply_to.形式 只能选择 source_type=形式 的节点。
 - category_id 和 category_path 必须逐字来自内容树里的 id/path。
@@ -36,8 +36,10 @@
 
 # 输出格式
 
-对于 strategy:输出 `{ "strategy": { "steps": [ { "order": 1, "apply_to": {...} } ] } }`
-对于 capability:输出 `{ "capabilities": [ { "apply_to": {...} } ] }`
+输出 `{ "fragments": [ { "fragment_id": "f_s1_0", "apply_to": {...} } ] }`
+
+- fragments 数组长度和输入 fragments 保持一致。
+- 每个输出项必须带原输入的 fragment_id,逐字照抄。
 
 每个 apply_to 条目格式:`{ "category_id": 123, "category_path": "...", "ideal_path": "...", "element": null, "rationale": "..." }`
 
@@ -47,4 +49,3 @@
 - 不要任何前言、解释、标题。
 - 数字 id 用整数,不要加引号。
 - 字符串值内禁止出现 ASCII 双引号;需要引号请用中文书名号「」或《》。
-

+ 106 - 237
examples/process_pipeline/script/apply_to_grounding.py

@@ -1,8 +1,7 @@
 """
 Stage 2: 将 apply_to_draft 映射为正式 apply_to
 
-从 case.json 读取,优先对每个 case 的 fragments 中的 apply_to_draft 做映射;
-没有 fragments 时,回退处理 workflow steps / capabilities 中的 apply_to_draft。
+从 case.json 读取,只对每个 case 的 fragments 中的 apply_to_draft 做映射。
 调用 LLM 映射到内容树的正式节点,原位回填到 case.json
 
 改造版本:通过远程 API 获取内容树,不再依赖本地文件
@@ -25,6 +24,7 @@ load_dotenv(PROJECT_ROOT / ".env")
 
 # 搜索 API 配置
 SEARCH_API = os.getenv("SEARCH_API", "http://8.147.104.190:8001").rstrip("/")
+FRAGMENT_GROUNDING_BATCH_SIZE = int(os.getenv("FRAGMENT_GROUNDING_BATCH_SIZE", "5"))
 
 # 本地文件路径(作为回退方案)
 EXTRACT_DIR = Path(__file__).resolve().parent / "resource"
@@ -196,6 +196,20 @@ def build_valid_ids(cats: List[Dict]) -> Dict[int, Dict]:
     return {c["id"]: c for c in cats if "id" in c}
 
 
+def iter_batches(items: List[Any], batch_size: int) -> List[List[Any]]:
+    """按固定大小切分列表"""
+    batch_size = max(1, batch_size)
+    return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
+
+
+def build_fragment_grounding_input(frag: Dict[str, Any]) -> Dict[str, Any]:
+    """只保留 fragment grounding 需要的最小字段"""
+    return {
+        "fragment_id": frag.get("fragment_id"),
+        "apply_to_draft": frag.get("apply_to_draft", {}),
+    }
+
+
 def render_grounding_prompt(
     template: str,
     task: str,
@@ -204,10 +218,7 @@ def render_grounding_prompt(
     reference_paths: List[str] = None,
 ) -> str:
     """渲染 Stage 2 prompt"""
-    if task == "capability":
-        target = "capabilities 数组中的每一条 capability"
-    else:
-        target = "strategy;如果 strategy 为 null,则原样返回"
+    target = "fragments 数组中的每一条 fragment"
     paths_str = json.dumps(reference_paths or [], ensure_ascii=False)
     return (
         template
@@ -227,239 +238,107 @@ async def ground_single_case(
     compact_tree: str = None,
 ) -> tuple[Dict[str, Any], float]:
     """
-    对单个 case 做 apply_to 映射
+    对单个 case 的 fragments[*].apply_to_draft 做 apply_to 映射
 
-    优先级:
-    1. 如果存在 fragments,只处理 fragments[*].apply_to_draft,并回填到 fragments[*].apply_to
-    2. 没有 fragments 时,处理旧格式 workflow.steps[*].apply_to_draft
-    3. workflow 没有 draft 时,再处理 capabilities[*].apply_to_draft
-
-    Args:
-        case_item: 案例数据
-        template: prompt 模板
-        llm_call: LLM 调用函数
-        model: 模型名称
-        use_api: 是否使用 API 动态搜索
-        compact_tree: 预加载的完整内容树(use_api=False 时使用)
+    只处理 fragments,不读取 workflow/capabilities。
     """
     total_cost = 0.0
     result = dict(case_item)
     title = case_item.get("title", "")[:20] or "untitled"
 
-    # 处理 fragments - 整体处理,保持上下文。只要存在 fragments,就不再读取 workflow/capabilities。
     fragments = case_item.get("fragments")
-    has_fragments = isinstance(fragments, list) and bool(fragments)
-    if has_fragments:
-        draft_fragment_pairs = [
-            (idx, frag)
-            for idx, frag in enumerate(fragments)
-            if isinstance(frag, dict) and "apply_to_draft" in frag
+    if not isinstance(fragments, list) or not fragments:
+        return result, total_cost
+
+    draft_fragment_pairs = [
+        (idx, frag)
+        for idx, frag in enumerate(fragments)
+        if isinstance(frag, dict) and "apply_to_draft" in frag
+    ]
+    if not draft_fragment_pairs:
+        return result, total_cost
+
+    # 收集 fragment 的关键词(用于 API 搜索)
+    if use_api:
+        all_keywords = []
+        for _, frag in draft_fragment_pairs:
+            apply_to_draft = frag.get("apply_to_draft", {})
+            for key in ["实质", "形式"]:
+                for draft_text in apply_to_draft.get(key, []):
+                    all_keywords.extend(extract_keywords_from_draft(draft_text))
+        all_keywords = list(dict.fromkeys(all_keywords))[:10]
+
+        if all_keywords:
+            categories = await search_categories_by_keywords(all_keywords, top_k=5)
+            frag_compact_tree = build_compact_tree(categories)
+            frag_ref_paths = list(dict.fromkeys(
+                c["path"] for c in categories if c.get("path")
+            ))
+        else:
+            frag_compact_tree = compact_tree or "[]"
+            frag_ref_paths = []
+    else:
+        frag_compact_tree = compact_tree or "[]"
+        frag_ref_paths = []
+
+    updated_fragments = [
+        dict(frag) if isinstance(frag, dict) else frag
+        for frag in fragments
+    ]
+    id_to_index = {
+        frag.get("fragment_id"): idx
+        for idx, frag in draft_fragment_pairs
+        if isinstance(frag.get("fragment_id"), str)
+    }
+
+    batches = iter_batches(draft_fragment_pairs, FRAGMENT_GROUNDING_BATCH_SIZE)
+    for batch_idx, batch_pairs in enumerate(batches, start=1):
+        draft_fragments = [
+            build_fragment_grounding_input(frag)
+            for _, frag in batch_pairs
         ]
-        has_draft = bool(draft_fragment_pairs)
-
-        if has_draft:
-            # 收集所有 fragment 的关键词(用于 API 搜索)
-            if use_api:
-                all_keywords = []
-                for _, frag in draft_fragment_pairs:
-                    apply_to_draft = frag.get("apply_to_draft", {})
-                    for key in ["实质", "形式"]:
-                        for draft_text in apply_to_draft.get(key, []):
-                            all_keywords.extend(extract_keywords_from_draft(draft_text))
-                all_keywords = list(dict.fromkeys(all_keywords))[:10]
-
-                if all_keywords:
-                    categories = await search_categories_by_keywords(all_keywords, top_k=5)
-                    frag_compact_tree = build_compact_tree(categories)
-                    frag_ref_paths = list(dict.fromkeys(
-                        c["path"] for c in categories if c.get("path")
-                    ))
-                else:
-                    frag_compact_tree = compact_tree or "[]"
-                    frag_ref_paths = []
-            else:
-                frag_compact_tree = compact_tree or "[]"
-                frag_ref_paths = []
-
-            # 复用 capability grounding 的 prompt/schema,只把数据源从 workflow step 换成 fragment。
-            # 只发送带 apply_to_draft 的 fragment,再按原始下标回填,避免数组错位。
-            draft_fragments = [frag for _, frag in draft_fragment_pairs]
-            draft = {"capabilities": draft_fragments}
-            prompt = render_grounding_prompt(template, "capability", draft, frag_compact_tree, frag_ref_paths)
-            messages = [{"role": "user", "content": prompt}]
-
-            grounded, cost = await call_llm_with_retry(
-                llm_call=llm_call,
-                messages=messages,
-                model=model,
-                temperature=0.1,
-                max_tokens=4000,
-                max_retries=3,
-                schema_name="apply_to_grounding_capability",
-                task_name=f"Ground_F_{title}",
-            )
-            total_cost += cost
-
-            # 按索引回填 apply_to。输入数组来自 fragments,输出数组使用 capability schema。
-            if grounded and isinstance(grounded.get("capabilities"), list):
-                grounded_frags = grounded["capabilities"]
-                updated_fragments = [
-                    dict(frag) if isinstance(frag, dict) else frag
-                    for frag in fragments
-                ]
-                for draft_idx, (frag_idx, _) in enumerate(draft_fragment_pairs):
-                    if draft_idx < len(grounded_frags) and isinstance(grounded_frags[draft_idx], dict):
-                        apply_to = grounded_frags[draft_idx].get("apply_to")
-                        if apply_to is not None:
-                            updated_fragments[frag_idx]["apply_to"] = apply_to
-                            updated_fragments[frag_idx].pop("apply_to_draft", None)
-                result["fragments"] = updated_fragments
-
-    # 没有 fragments 时,回退处理旧格式 workflow step draft。
-    workflow = case_item.get("workflow")
-    handled_workflow = False
-    if not has_fragments and isinstance(workflow, dict) and "steps" in workflow:
-        steps = workflow.get("steps", [])
-
-        has_draft = any(
-            isinstance(step, dict) and "apply_to_draft" in step
-            for step in steps
+        draft = {"fragments": draft_fragments}
+        prompt = render_grounding_prompt(template, "fragment", draft, frag_compact_tree, frag_ref_paths)
+        messages = [{"role": "user", "content": prompt}]
+
+        grounded, cost = await call_llm_with_retry(
+            llm_call=llm_call,
+            messages=messages,
+            model=model,
+            temperature=0.1,
+            max_tokens=4000,
+            max_retries=3,
+            schema_name="apply_to_grounding_fragment",
+            task_name=f"Ground_F_{title}_B{batch_idx}/{len(batches)}",
         )
+        total_cost += cost
 
-        if has_draft:
-            handled_workflow = True
-            # 收集所有 step 的关键词(用于 API 搜索)
-            if use_api:
-                all_keywords = []
-                for step in steps:
-                    if isinstance(step, dict) and "apply_to_draft" in step:
-                        apply_to_draft = step.get("apply_to_draft", {})
-                        for key in ["实质", "形式"]:
-                            for draft_text in apply_to_draft.get(key, []):
-                                all_keywords.extend(extract_keywords_from_draft(draft_text))
-                all_keywords = list(dict.fromkeys(all_keywords))[:10]
-
-                if all_keywords:
-                    categories = await search_categories_by_keywords(all_keywords, top_k=5)
-                    workflow_compact_tree = build_compact_tree(categories)
-                    workflow_ref_paths = list(dict.fromkeys(
-                        c["path"] for c in categories if c.get("path")
-                    ))
-                else:
-                    workflow_compact_tree = compact_tree or "[]"
-                    workflow_ref_paths = []
-            else:
-                workflow_compact_tree = compact_tree or "[]"
-                workflow_ref_paths = []
-
-            # 整个 workflow 传给 LLM(保持上下文)
-            draft = {"strategy": workflow}
-            prompt = render_grounding_prompt(template, "strategy", draft, workflow_compact_tree, workflow_ref_paths)
-            messages = [{"role": "user", "content": prompt}]
-
-            grounded, cost = await call_llm_with_retry(
-                llm_call=llm_call,
-                messages=messages,
-                model=model,
-                temperature=0.1,
-                max_tokens=4000,
-                max_retries=3,
-                schema_name="apply_to_grounding_strategy",
-                task_name=f"Ground_W_{title}",
-            )
-            total_cost += cost
-
-            # 按 order 回填 apply_to
-            if grounded and isinstance(grounded.get("strategy"), dict):
-                grounded_steps = grounded["strategy"].get("steps", [])
-                order_to_apply_to = {}
-                for grounded_step in grounded_steps:
-                    if isinstance(grounded_step, dict):
-                        order = grounded_step.get("order")
-                        apply_to = grounded_step.get("apply_to")
-                        if order is not None and apply_to is not None:
-                            order_to_apply_to[order] = apply_to
-
-                updated_steps = []
-                for step in steps:
-                    updated_step = dict(step)
-                    order = step.get("order")
-                    if order in order_to_apply_to:
-                        updated_step["apply_to"] = order_to_apply_to[order]
-                        updated_step.pop("apply_to_draft", None)
-                    updated_steps.append(updated_step)
-
-                result["workflow"] = dict(workflow)
-                result["workflow"]["steps"] = updated_steps
-
-    # 没有 fragments 且 workflow 没处理时,才回退处理 capabilities。
-    capabilities = case_item.get("capabilities")
-    if not has_fragments and not handled_workflow and isinstance(capabilities, list) and capabilities:
-        has_draft = any(
-            isinstance(cap, dict) and "apply_to_draft" in cap
-            for cap in capabilities
-        )
+        if not grounded or not isinstance(grounded.get("fragments"), list):
+            continue
 
-        if has_draft:
-            # 收集所有 capability 的关键词
-            if use_api:
-                all_keywords = []
-                for cap in capabilities:
-                    if isinstance(cap, dict) and "apply_to_draft" in cap:
-                        apply_to_draft = cap.get("apply_to_draft", {})
-                        for key in ["实质", "形式"]:
-                            for draft_text in apply_to_draft.get(key, []):
-                                all_keywords.extend(extract_keywords_from_draft(draft_text))
-                all_keywords = list(dict.fromkeys(all_keywords))[:10]
-
-                if all_keywords:
-                    categories = await search_categories_by_keywords(all_keywords, top_k=5)
-                    cap_compact_tree = build_compact_tree(categories)
-                    cap_ref_paths = list(dict.fromkeys(
-                        c["path"] for c in categories if c.get("path")
-                    ))
-                else:
-                    cap_compact_tree = compact_tree or "[]"
-                    cap_ref_paths = []
-            else:
-                cap_compact_tree = compact_tree or "[]"
-                cap_ref_paths = []
-
-            # 整个 capabilities 传给 LLM(保持上下文)
-            draft = {"capabilities": capabilities}
-            prompt = render_grounding_prompt(template, "capability", draft, cap_compact_tree, cap_ref_paths)
-            messages = [{"role": "user", "content": prompt}]
-
-            grounded, cost = await call_llm_with_retry(
-                llm_call=llm_call,
-                messages=messages,
-                model=model,
-                temperature=0.1,
-                max_tokens=4000,
-                max_retries=3,
-                schema_name="apply_to_grounding_capability",
-                task_name=f"Ground_C_{title}",
-            )
-            total_cost += cost
-
-            # 回填 apply_to(按索引匹配)
-            if grounded and isinstance(grounded.get("capabilities"), list):
-                grounded_caps = grounded["capabilities"]
-                updated_capabilities = []
-                for idx, cap in enumerate(capabilities):
-                    updated_cap = dict(cap)
-                    # 如果有对应的 grounded capability,提取 apply_to
-                    if idx < len(grounded_caps) and isinstance(grounded_caps[idx], dict):
-                        apply_to = grounded_caps[idx].get("apply_to")
-                        if apply_to is not None:
-                            updated_cap["apply_to"] = apply_to
-                    updated_cap.pop("apply_to_draft", None)
-                    updated_capabilities.append(updated_cap)
-                result["capabilities"] = updated_capabilities
+        grounded_frags = grounded["fragments"]
+        used_indices = set()
+        for output_idx, grounded_frag in enumerate(grounded_frags):
+            if not isinstance(grounded_frag, dict):
+                continue
+            frag_idx = None
+            fragment_id = grounded_frag.get("fragment_id")
+            if isinstance(fragment_id, str):
+                frag_idx = id_to_index.get(fragment_id)
+            if frag_idx is None and output_idx < len(batch_pairs):
+                frag_idx = batch_pairs[output_idx][0]
+            if frag_idx is None or frag_idx in used_indices:
+                continue
+            apply_to = grounded_frag.get("apply_to")
+            if apply_to is not None and isinstance(updated_fragments[frag_idx], dict):
+                updated_fragments[frag_idx]["apply_to"] = apply_to
+                updated_fragments[frag_idx].pop("apply_to_draft", None)
+                used_indices.add(frag_idx)
+
+    result["fragments"] = updated_fragments
 
     return result, total_cost
 
-
 async def apply_grounding(
     case_file: Path,
     llm_call: Any,
@@ -498,24 +377,14 @@ async def apply_grounding(
     # 加载 prompt 模板
     template = load_prompt_template("apply_to_grounding")
 
-    # 过滤出需要处理的 case(有 apply_to_draft 的
+    # 过滤出需要处理的 case(只看 fragments[*].apply_to_draft
     needs_grounding = []
     for case in cases:
         fragments = case.get("fragments")
-        workflow = case.get("workflow")
-        capabilities = case.get("capabilities")
-        has_fragments = isinstance(fragments, list) and bool(fragments)
         has_frag_draft = isinstance(fragments, list) and any(
             isinstance(frag, dict) and "apply_to_draft" in frag for frag in fragments
         )
-        has_workflow_draft = not has_fragments and isinstance(workflow, dict) and any(
-            isinstance(step, dict) and "apply_to_draft" in step
-            for step in workflow.get("steps", [])
-        )
-        has_cap_draft = not has_fragments and not has_workflow_draft and isinstance(capabilities, list) and any(
-            isinstance(c, dict) and "apply_to_draft" in c for c in capabilities
-        )
-        if has_frag_draft or has_workflow_draft or has_cap_draft:
+        if has_frag_draft:
             needs_grounding.append(case)
 
     print(f"Grounding apply_to for {len(needs_grounding)}/{len(cases)} cases...")