""" Phase 2.2.2: 能力丰富化 从 capabilities_temp.json 读取初步聚类的能力, 对每个能力,根据 case_references 从 source.json 提取原始帖子信息(包括图片), 调用 LLM 进行丰富化,输出到 capabilities.json """ import asyncio import json import re from pathlib import Path from typing import Any, Dict, List from examples.process_pipeline.script.llm_helper import call_llm_with_retry from examples.process_pipeline.script.validate_schema import validate_capabilities_enriched def load_prompt_template(prompt_name: str) -> str: """从 prompts 目录加载 prompt 模板""" base_dir = Path(__file__).parent.parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" with open(prompt_path, "r", encoding="utf-8") as f: content = f.read() if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: content = parts[2] content = content.replace("$system$", "").replace("$user$", "") return content.strip() async def enrich_single_capability( capability: Dict[str, Any], source_data: Dict[str, Any], llm_call, model: str ) -> Dict[str, Any]: """ 丰富化单个能力 Args: capability: 能力信息(包含 case_references) source_data: source.json 的完整数据 llm_call: LLM 调用函数 model: 模型名称 Returns: 丰富化后的能力 """ case_refs = capability.get("case_references", []) if not case_refs: return capability # 从 source.json 中提取对应的帖子 posts_content = [] for ref in case_refs: # ref 格式可能是 "bili_BV1xxx" 或 "bili_BV1xxx 中的垫图操作" case_id = ref.split()[0] if " " in ref else ref # 在 source.json 中查找 for src in source_data.get("sources", []): src_case_id = f"{src['platform']}_{src['channel_content_id']}" if src_case_id == case_id: post = src.get("post", {}) post_info = { "case_id": case_id, "title": post.get("title", ""), "body_text": post.get("body_text", ""), "images": [] } # 提取图片 URL images = post.get("images", []) if isinstance(images, list): for img in images[:5]: if isinstance(img, str): post_info["images"].append(img) elif isinstance(img, dict) and "url" in img: post_info["images"].append(img["url"]) # 也尝试从 image_url_list 提取 image_url_list = post.get("image_url_list", []) if isinstance(image_url_list, list): for img_obj in image_url_list[:5]: if isinstance(img_obj, dict) and "image_url" in img_obj: post_info["images"].append(img_obj["image_url"]) posts_content.append(post_info) break if not posts_content: return capability # 构造 posts_content 字符串 posts_text = "" for i, post in enumerate(posts_content, 1): posts_text += f"\n### 帖子 {i}({post['case_id']})\n" posts_text += f"**标题**:{post['title']}\n\n" posts_text += f"**正文**:\n{post['body_text'][:1000]}\n\n" if post['images']: posts_text += f"**图片**:{len(post['images'])} 张\n" for img_url in post['images']: posts_text += f"- {img_url}\n" posts_text += "\n" # 构造 prompt try: prompt_template = load_prompt_template("enrich_capability") prompt = prompt_template.replace("%capability_name%", capability.get("name", "")) prompt = prompt.replace("%capability_description%", capability.get("description", "")) prompt = prompt.replace("%posts_content%", posts_text) except Exception: prompt = f"""从以下帖子中提取该能力的具体执行过程和核心参数。 能力名称:{capability.get("name", "")} 能力描述:{capability.get("description", "")} 相关帖子内容: {posts_text} 输出 JSON 格式: {{"execution_process": "...", "core_parameters": "...", "effects": "...", "visual_notes": "..."}}""" messages = [{"role": "user", "content": prompt}] def _validate_enrichment(parsed): from examples.process_pipeline.script.schema_manager import validate_with_schema return validate_with_schema(parsed, "enrich_capability") enriched_data, _ = await call_llm_with_retry( llm_call=llm_call, messages=messages, model=model, temperature=0.1, max_tokens=8000, max_retries=3, validate_fn=_validate_enrichment, task_name=f"Enrich_{capability.get('name', '')[:20]}", ) if enriched_data: capability["enriched_details"] = enriched_data return capability async def enrich_all_capabilities( capabilities_temp_file: Path, source_file: Path, output_file: Path, llm_call, model: str = "anthropic/claude-sonnet-4-6", ) -> Dict[str, Any]: """ 丰富化所有能力 Returns: 统计信息 """ with open(capabilities_temp_file, "r", encoding="utf-8") as f: capabilities_data = json.load(f) with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) capabilities = capabilities_data.get("abilities", []) enriched_capabilities = [] total_cost = 0.0 failed_caps = [] print(f"Starting enrichment for {len(capabilities)} capabilities...", flush=True) for i, cap in enumerate(capabilities, 1): # 转换字段名:ability_name -> name, ability_description -> description # 保留 ability_id normalized_cap = { "id": cap.get("ability_id", ""), "name": cap.get("ability_name", ""), "description": cap.get("ability_description", ""), "case_references": cap.get("关联案例", []), } cap_name = normalized_cap.get("name", "unknown") print(f" [{i}/{len(capabilities)}] Enriching: {cap_name}", flush=True) enriched_cap = await enrich_single_capability(normalized_cap, source_data, llm_call, model) enriched_capabilities.append(enriched_cap) if "enriched_details" in enriched_cap: total_cost += 0.01 print(f" [{i}/{len(capabilities)}] ✓ {cap_name}", flush=True) else: failed_caps.append(cap_name) print(f" [{i}/{len(capabilities)}] ⚠️ Failed: {cap_name}", flush=True) if failed_caps: print(f" ⚠️ {len(failed_caps)} capabilities failed enrichment: {failed_caps}") # 输出结果 output_data = { "requirement": capabilities_data.get("requirement", ""), "capabilities": enriched_capabilities } schema_err = validate_capabilities_enriched(output_data) if schema_err: raise ValueError(f"Final capabilities.json schema invalid: {schema_err}") output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) return { "total_capabilities": len(capabilities), "enriched": len([c for c in enriched_capabilities if "enriched_details" in c]), "total_cost": total_cost, } async def main(): """命令行入口""" import argparse import sys sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from agent.llm.openrouter import OpenRouterLLM parser = argparse.ArgumentParser() parser.add_argument("--capabilities-temp", required=True) parser.add_argument("--source-file", required=True) parser.add_argument("--output-file", required=True) parser.add_argument("--model", default="anthropic/claude-sonnet-4-6") args = parser.parse_args() llm = OpenRouterLLM() result = await enrich_all_capabilities( capabilities_temp_file=Path(args.capabilities_temp), source_file=Path(args.source_file), output_file=Path(args.output_file), llm_call=llm.chat, model=args.model, ) print(f"✓ Enriched {result['enriched']}/{result['total_capabilities']} capabilities") if __name__ == "__main__": asyncio.run(main())