elksmmx 6 дней назад
Родитель
Сommit
a6e77dc276
1 измененных файлов с 43 добавлено и 60 удалено
  1. 43 60
      examples/process_pipeline/script/apply_to_grounding.py

+ 43 - 60
examples/process_pipeline/script/apply_to_grounding.py

@@ -243,24 +243,22 @@ async def ground_single_case(
     result = dict(case_item)
     title = case_item.get("title", "")[:20] or "untitled"
 
-    # 处理 workflow (strategy) - 整体处理,保持上下文
-    workflow = case_item.get("workflow")
-    if isinstance(workflow, dict) and "steps" in workflow:
-        steps = workflow.get("steps", [])
-
-        # 检查是否有任何 step 包含 apply_to_draft
+    # 处理 fragments - 整体处理,保持上下文。只要存在 fragments,就不再读取 capabilities。
+    fragments = case_item.get("fragments")
+    has_fragments = isinstance(fragments, list) and bool(fragments)
+    if has_fragments:
         has_draft = any(
-            isinstance(step, dict) and "apply_to_draft" in step
-            for step in steps
+            isinstance(frag, dict) and "apply_to_draft" in frag
+            for frag in fragments
         )
 
         if has_draft:
-            # 收集所有 step 的关键词(用于 API 搜索)
+            # 收集所有 fragment 的关键词(用于 API 搜索)
             if use_api:
                 all_keywords = []
-                for step in steps:
-                    if isinstance(step, dict) and "apply_to_draft" in step:
-                        apply_to_draft = step.get("apply_to_draft", {})
+                for frag in fragments:
+                    if isinstance(frag, dict) and "apply_to_draft" in frag:
+                        apply_to_draft = frag.get("apply_to_draft", {})
                         for key in ["实质", "形式"]:
                             for draft_text in apply_to_draft.get(key, []):
                                 all_keywords.extend(extract_keywords_from_draft(draft_text))
@@ -268,20 +266,20 @@ async def ground_single_case(
 
                 if all_keywords:
                     categories = await search_categories_by_keywords(all_keywords, top_k=5)
-                    workflow_compact_tree = build_compact_tree(categories)
-                    workflow_ref_paths = list(dict.fromkeys(
+                    frag_compact_tree = build_compact_tree(categories)
+                    frag_ref_paths = list(dict.fromkeys(
                         c["path"] for c in categories if c.get("path")
                     ))
                 else:
-                    workflow_compact_tree = compact_tree or "[]"
-                    workflow_ref_paths = []
+                    frag_compact_tree = compact_tree or "[]"
+                    frag_ref_paths = []
             else:
-                workflow_compact_tree = compact_tree or "[]"
-                workflow_ref_paths = []
+                frag_compact_tree = compact_tree or "[]"
+                frag_ref_paths = []
 
-            # 整个 workflow 传给 LLM(保持上下文)
-            draft = {"strategy": workflow}
-            prompt = render_grounding_prompt(template, "strategy", draft, workflow_compact_tree, workflow_ref_paths)
+            # 复用 capability grounding 的 prompt/schema,只把数据源从 workflow step 换成 fragment。
+            draft = {"capabilities": fragments}
+            prompt = render_grounding_prompt(template, "capability", draft, frag_compact_tree, frag_ref_paths)
             messages = [{"role": "user", "content": prompt}]
 
             grounded, cost = await call_llm_with_retry(
@@ -291,39 +289,28 @@ async def ground_single_case(
                 temperature=0.1,
                 max_tokens=4000,
                 max_retries=3,
-                schema_name="apply_to_grounding_strategy",
-                task_name=f"Ground_W_{title}",
+                schema_name="apply_to_grounding_capability",
+                task_name=f"Ground_F_{title}",
             )
             total_cost += cost
 
-            # 按 order 回填 apply_to
-            if grounded and isinstance(grounded.get("strategy"), dict):
-                grounded_steps = grounded["strategy"].get("steps", [])
-                # 建立 order -> apply_to 的映射
-                order_to_apply_to = {}
-                for grounded_step in grounded_steps:
-                    if isinstance(grounded_step, dict):
-                        order = grounded_step.get("order")
-                        apply_to = grounded_step.get("apply_to")
-                        if order is not None and apply_to is not None:
-                            order_to_apply_to[order] = apply_to
-
-                # 回填到原 steps
-                updated_steps = []
-                for step in steps:
-                    updated_step = dict(step)
-                    order = step.get("order")
-                    if order in order_to_apply_to:
-                        updated_step["apply_to"] = order_to_apply_to[order]
-                    updated_step.pop("apply_to_draft", None)
-                    updated_steps.append(updated_step)
-
-                result["workflow"] = dict(workflow)
-                result["workflow"]["steps"] = updated_steps
-
-    # 处理 capabilities - 整体处理,保持上下文
+            # 按索引回填 apply_to。输入数组来自 fragments,输出数组使用 capability schema。
+            if grounded and isinstance(grounded.get("capabilities"), list):
+                grounded_frags = grounded["capabilities"]
+                updated_fragments = []
+                for idx, frag in enumerate(fragments):
+                    updated_frag = dict(frag)
+                    if idx < len(grounded_frags) and isinstance(grounded_frags[idx], dict):
+                        apply_to = grounded_frags[idx].get("apply_to")
+                        if apply_to is not None:
+                            updated_frag["apply_to"] = apply_to
+                    updated_frag.pop("apply_to_draft", None)
+                    updated_fragments.append(updated_frag)
+                result["fragments"] = updated_fragments
+
+    # 没有 fragments 时,才回退处理 capabilities。
     capabilities = case_item.get("capabilities")
-    if isinstance(capabilities, list) and capabilities:
+    if not has_fragments and isinstance(capabilities, list) and capabilities:
         has_draft = any(
             isinstance(cap, dict) and "apply_to_draft" in cap
             for cap in capabilities
@@ -430,20 +417,16 @@ async def apply_grounding(
     # 过滤出需要处理的 case(有 apply_to_draft 的)
     needs_grounding = []
     for case in cases:
-        workflow = case.get("workflow")
+        fragments = case.get("fragments")
         capabilities = case.get("capabilities")
-        # 检查 step 级别的 apply_to_draft
-        has_workflow_draft = (
-            isinstance(workflow, dict) and
-            any(
-                isinstance(step, dict) and "apply_to_draft" in step
-                for step in workflow.get("steps", [])
-            )
+        has_fragments = isinstance(fragments, list) and bool(fragments)
+        has_frag_draft = isinstance(fragments, list) and any(
+            isinstance(frag, dict) and "apply_to_draft" in frag for frag in fragments
         )
-        has_cap_draft = isinstance(capabilities, list) and any(
+        has_cap_draft = not has_fragments and isinstance(capabilities, list) and any(
             isinstance(c, dict) and "apply_to_draft" in c for c in capabilities
         )
-        if has_workflow_draft or has_cap_draft:
+        if has_frag_draft or has_cap_draft:
             needs_grounding.append(case)
 
     print(f"Grounding apply_to for {len(needs_grounding)}/{len(cases)} cases...")