| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- """
- Phase 2.2.2: 能力丰富化
- 从 capabilities_temp.json 读取初步聚类的能力,
- 对每个能力,根据 case_references 从 source.json 提取原始帖子信息(包括图片),
- 调用 LLM 进行丰富化,输出到 capabilities.json
- """
- import asyncio
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, List
- from examples.process_pipeline.script.llm_helper import call_llm_with_retry
- from examples.process_pipeline.script.validate_schema import validate_capabilities_enriched
- def load_prompt_template(prompt_name: str) -> str:
- """从 prompts 目录加载 prompt 模板"""
- base_dir = Path(__file__).parent.parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- with open(prompt_path, "r", encoding="utf-8") as f:
- content = f.read()
- if content.startswith("---"):
- parts = content.split("---", 2)
- if len(parts) >= 3:
- content = parts[2]
- content = content.replace("$system$", "").replace("$user$", "")
- return content.strip()
- async def enrich_single_capability(
- capability: Dict[str, Any],
- source_data: Dict[str, Any],
- llm_call,
- model: str
- ) -> Dict[str, Any]:
- """
- 丰富化单个能力
- Args:
- capability: 能力信息(包含 case_references)
- source_data: source.json 的完整数据
- llm_call: LLM 调用函数
- model: 模型名称
- Returns:
- 丰富化后的能力
- """
- case_refs = capability.get("case_references", [])
- if not case_refs:
- return capability
- # 从 source.json 中提取对应的帖子
- posts_content = []
- for ref in case_refs:
- # ref 格式可能是 "bili_BV1xxx" 或 "bili_BV1xxx 中的垫图操作"
- case_id = ref.split()[0] if " " in ref else ref
- # 在 source.json 中查找
- for src in source_data.get("sources", []):
- src_case_id = f"{src['platform']}_{src['channel_content_id']}"
- if src_case_id == case_id:
- post = src.get("post", {})
- post_info = {
- "case_id": case_id,
- "title": post.get("title", ""),
- "body_text": post.get("body_text", ""),
- "images": []
- }
- # 提取图片 URL
- images = post.get("images", [])
- if isinstance(images, list):
- for img in images[:5]:
- if isinstance(img, str):
- post_info["images"].append(img)
- elif isinstance(img, dict) and "url" in img:
- post_info["images"].append(img["url"])
- # 也尝试从 image_url_list 提取
- image_url_list = post.get("image_url_list", [])
- if isinstance(image_url_list, list):
- for img_obj in image_url_list[:5]:
- if isinstance(img_obj, dict) and "image_url" in img_obj:
- post_info["images"].append(img_obj["image_url"])
- posts_content.append(post_info)
- break
- if not posts_content:
- return capability
- # 构造 posts_content 字符串
- posts_text = ""
- for i, post in enumerate(posts_content, 1):
- posts_text += f"\n### 帖子 {i}({post['case_id']})\n"
- posts_text += f"**标题**:{post['title']}\n\n"
- posts_text += f"**正文**:\n{post['body_text'][:1000]}\n\n"
- if post['images']:
- posts_text += f"**图片**:{len(post['images'])} 张\n"
- for img_url in post['images']:
- posts_text += f"- {img_url}\n"
- posts_text += "\n"
- # 构造 prompt
- try:
- prompt_template = load_prompt_template("enrich_capability")
- prompt = prompt_template.replace("%capability_name%", capability.get("name", ""))
- prompt = prompt.replace("%capability_description%", capability.get("description", ""))
- prompt = prompt.replace("%posts_content%", posts_text)
- except Exception:
- prompt = f"""从以下帖子中提取该能力的具体执行过程和核心参数。
- 能力名称:{capability.get("name", "")}
- 能力描述:{capability.get("description", "")}
- 相关帖子内容:
- {posts_text}
- 输出 JSON 格式:
- {{"execution_process": "...", "core_parameters": "...", "effects": "...", "visual_notes": "..."}}"""
- messages = [{"role": "user", "content": prompt}]
- def _validate_enrichment(parsed):
- from examples.process_pipeline.script.schema_manager import validate_with_schema
- return validate_with_schema(parsed, "enrich_capability")
- enriched_data, _ = await call_llm_with_retry(
- llm_call=llm_call,
- messages=messages,
- model=model,
- temperature=0.1,
- max_tokens=8000,
- max_retries=3,
- validate_fn=_validate_enrichment,
- task_name=f"Enrich_{capability.get('name', '')[:20]}",
- )
- if enriched_data:
- capability["enriched_details"] = enriched_data
- return capability
- async def enrich_all_capabilities(
- capabilities_temp_file: Path,
- source_file: Path,
- output_file: Path,
- llm_call,
- model: str = "anthropic/claude-sonnet-4-6",
- ) -> Dict[str, Any]:
- """
- 丰富化所有能力
- Returns:
- 统计信息
- """
- with open(capabilities_temp_file, "r", encoding="utf-8") as f:
- capabilities_data = json.load(f)
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- capabilities = capabilities_data.get("abilities", [])
- enriched_capabilities = []
- total_cost = 0.0
- failed_caps = []
- print(f"Starting enrichment for {len(capabilities)} capabilities...", flush=True)
- for i, cap in enumerate(capabilities, 1):
- # 转换字段名:ability_name -> name, ability_description -> description
- # 保留 ability_id
- normalized_cap = {
- "id": cap.get("ability_id", ""),
- "name": cap.get("ability_name", ""),
- "description": cap.get("ability_description", ""),
- "case_references": cap.get("关联案例", []),
- }
- cap_name = normalized_cap.get("name", "unknown")
- print(f" [{i}/{len(capabilities)}] Enriching: {cap_name}", flush=True)
- enriched_cap = await enrich_single_capability(normalized_cap, source_data, llm_call, model)
- enriched_capabilities.append(enriched_cap)
- if "enriched_details" in enriched_cap:
- total_cost += 0.01
- print(f" [{i}/{len(capabilities)}] ✓ {cap_name}", flush=True)
- else:
- failed_caps.append(cap_name)
- print(f" [{i}/{len(capabilities)}] ⚠️ Failed: {cap_name}", flush=True)
- if failed_caps:
- print(f" ⚠️ {len(failed_caps)} capabilities failed enrichment: {failed_caps}")
- # 输出结果
- output_data = {
- "requirement": capabilities_data.get("requirement", ""),
- "capabilities": enriched_capabilities
- }
- schema_err = validate_capabilities_enriched(output_data)
- if schema_err:
- raise ValueError(f"Final capabilities.json schema invalid: {schema_err}")
- output_file.parent.mkdir(parents=True, exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- return {
- "total_capabilities": len(capabilities),
- "enriched": len([c for c in enriched_capabilities if "enriched_details" in c]),
- "total_cost": total_cost,
- }
- async def main():
- """命令行入口"""
- import argparse
- import sys
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from agent.llm.openrouter import OpenRouterLLM
- parser = argparse.ArgumentParser()
- parser.add_argument("--capabilities-temp", required=True)
- parser.add_argument("--source-file", required=True)
- parser.add_argument("--output-file", required=True)
- parser.add_argument("--model", default="anthropic/claude-sonnet-4-6")
- args = parser.parse_args()
- llm = OpenRouterLLM()
- result = await enrich_all_capabilities(
- capabilities_temp_file=Path(args.capabilities_temp),
- source_file=Path(args.source_file),
- output_file=Path(args.output_file),
- llm_call=llm.chat,
- model=args.model,
- )
- print(f"✓ Enriched {result['enriched']}/{result['total_capabilities']} capabilities")
- if __name__ == "__main__":
- asyncio.run(main())
|