| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562 |
- """
- V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
- """
- import argparse
- import asyncio
- import json
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- # Add project root to path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.tools.builtin.knowledge import KnowledgeConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.openrouter import create_openrouter_llm_call
- from agent.llm.claude import create_claude_llm_call
- # config from existing setup
- from examples.process_research.config import (
- OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
- BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
- )
- from agent.utils import setup_logging
- async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str):
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
-
- messages = prompt.build_messages(**kwargs)
- target_tools = []
- if prompt_name == "extract_capabilities":
- target_tools = ["capability_search", "capability_list", "tool_search"]
- # 按 agent 类型配置工具组权限
- tool_groups_map = {
- "researcher": ["core", "content"], # 搜索+文件,无浏览器
- "filter_and_blueprint": ["core"], # 只需文件读写
- "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
- "assemble_strategy": ["core"], # 只需文件读写
- }
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=task_name,
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
-
- task_cost = 0.0
- task_errors = []
- last_trace_id = None
-
- print(f"🚀 [Launch] {task_name}")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- print(f"✅ [Done] {task_name} (Cost: ${item.total_cost:.4f})")
- task_cost = item.total_cost
- elif item.status == "failed":
- print(f"❌ [Fail] {task_name}: {item.error_message}")
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
-
- # Verification & Recovery block
- out_file = kwargs.get("output_file")
- if out_file and not Path(out_file).exists() and last_trace_id:
- print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
- recovery_messages = [{
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
- }]
- rec_config = RunConfig(
- model=model_name,
- temperature=0.1,
- name=task_name + "_Rec",
- agent_type=prompt_name,
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for r_item in runner.run(messages=recovery_messages, config=rec_config):
- if isinstance(r_item, Trace):
- if r_item.status == "completed":
- task_cost += r_item.total_cost
- elif r_item.status == "failed":
- task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
- if isinstance(r_item, Message) and r_item.role == "tool":
- content = r_item.content if isinstance(r_item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by {task_name}]")
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} recovery crashed: {err_msg}")
-
- return task_cost, task_errors
- async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str):
- """
- 备用:使用纯净的官方 Anthropic SDK 驱动。
- 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
- """
- from anthropic import AsyncAnthropic
- from agent.tools.registry import get_tool_registry
-
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
-
- # 1. 组装输入 Message
- raw_messages = prompt.build_messages(**kwargs)
- system_prompt = ""
- messages = []
- for msg in raw_messages:
- if msg["role"] == "system":
- system_prompt += msg["content"] + "\n\n"
- else:
- messages.append({"role": msg["role"], "content": msg["content"]})
-
- # 2. 映射目标工具
- target_tools = ["write_file", "write_json", "read_file", "glob_files"]
- if prompt_name == "extract_capabilities":
- target_tools.extend(["capability_search", "capability_list", "tool_search"])
-
- registry = get_tool_registry()
- schemas = registry.get_schemas(target_tools)
- anthropic_tools = []
- for s in schemas:
- anthropic_tools.append({
- "name": s["function"]["name"],
- "description": s["function"].get("description", ""),
- "input_schema": s["function"]["parameters"]
- })
-
- # 3. 初始化并开启 Loop
- # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
- client = AsyncAnthropic()
- task_cost = 0.0
- task_errors = []
- print(f"🚀 [Launch Anthropic SDK] {task_name}")
-
- max_loops = 50
- for loop_idx in range(max_loops):
- try:
- # 去除前缀(兼容比如 openrouter 传入的名字)
- clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
-
- # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
- target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
- response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.2,
- system=system_prompt,
- messages=messages,
- tools=anthropic_tools
- )
-
- # (简略预估,不代表真实官方开销)
- if hasattr(response, 'usage'):
- step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
- task_cost += step_cost
-
- # 加入助手回复
- assistant_content = []
- tool_uses = []
-
- for content_block in response.content:
- if content_block.type == "text":
- text_val = content_block.text
- if text_val:
- assistant_content.append({"type": "text", "text": text_val})
- print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
- elif content_block.type == "tool_use":
- assistant_content.append({
- "type": "tool_use",
- "id": content_block.id,
- "name": content_block.name,
- "input": content_block.input
- })
- tool_uses.append(content_block)
-
- if not assistant_content:
- assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
-
- messages.append({"role": "assistant", "content": assistant_content})
-
- # 出口:没有调用工具说明任务结束
- if not tool_uses:
- print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${task_cost:.4f})")
- break
-
- # 工具执行与回传
- tool_results = []
- for tu in tool_uses:
- if tu.name in ("write_file", "write_json"):
- print(f" 💾 [File Written by SDK] {task_name}")
-
- print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
- # 执行本地环境的函数
- result_str = await registry.execute(tu.name, tu.input)
-
- tool_results.append({
- "type": "tool_result",
- "tool_use_id": tu.id,
- "content": result_str
- })
-
- messages.append({"role": "user", "content": tool_results})
-
- except Exception as e:
- err_msg = str(e)
- print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
- task_errors.append(err_msg)
- break
-
- # Verification & Recovery block for SDK (porting from AgentRunner)
- out_file = kwargs.get("output_file")
- if out_file and not Path(out_file).exists():
- print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
- messages.append({
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
- })
- try:
- target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
- rec_response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.1,
- system=system_prompt,
- messages=messages,
- tools=anthropic_tools,
- tool_choice={"type": "any"}
- )
- for content_block in rec_response.content:
- if content_block.type == "tool_use":
- if content_block.name in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by SDK] {task_name}")
- await registry.execute(content_block.name, content_block.input)
- except Exception as e:
- print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
- task_errors.append(str(e))
-
- return task_cost, task_errors
- async def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
- parser.add_argument("--skip-research", action="store_true", help="Skip Phase 1 and use existing raw cases")
- parser.add_argument("--research-only", action="store_true", help="Only run research phases, skip Phase 2 and 3")
- parser.add_argument("--platforms", type=str, default="xhs,youtube,bili,x", help="Comma-separated list of platforms to search")
- parser.add_argument("--use-claude-sdk", action="store_true", help="Use pure Anthropic SDK (run_anthropic_sdk_task) instead of internal AgentRunner for Phase 2/3")
- args = parser.parse_args()
- base_dir = Path(__file__).parent
- # Load requirements locally
- req_path = base_dir / "db_requirements.json"
-
- with open(req_path, encoding='utf-8') as f:
- reqs = json.load(f)
-
- if args.index < 0 or args.index >= len(reqs):
- print("Index out of bounds")
- sys.exit(1)
-
- requirement = reqs[args.index]
-
- # 0. Setup directories
- output_dir = base_dir / "output" / f"{(args.index+1):03d}"
- output_dir.mkdir(parents=True, exist_ok=True)
- raw_cases_dir = output_dir / "raw_cases"
- raw_cases_dir.mkdir(parents=True, exist_ok=True)
-
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
-
- print("=" * 60)
- print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
- print("=" * 60)
- # Load presets
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- print("✅ Configured Agent Presets (Skills Boundaries)")
- # Browser initialization removed to save resources
-
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- # Instantiate two distinct LLM orchestrators
- qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
-
- # 当前使用原生 Claude 接口 (走 ANTHROPIC_API_KEY),而非 OpenRouter
- claude_model = "claude-sonnet-4-5" # 切换回 4-5 模型别名
- from agent.llm.claude import create_claude_llm_call
- claude_llm_call = create_claude_llm_call(model=claude_model)
-
- runner_qwen = AgentRunner(
- trace_store=store,
- llm_call=create_qwen_llm_call(model=qwen_model),
- skills_dir=SKILLS_DIR
- )
-
- runner_claude = AgentRunner(
- trace_store=store,
- llm_call=claude_llm_call,
- skills_dir=SKILLS_DIR
- )
- try:
- start_time = time.time()
- total_cost = 0.0
- costs_breakdown = {}
- global_errors = []
- strategy_file = None
-
- existing_platforms = []
- if raw_cases_dir.exists():
- for f in raw_cases_dir.glob("case_*.json"):
- plat = f.stem.replace("case_", "")
- if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
- existing_platforms.append(plat)
-
- platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- needed_count = max(0, 4 - len(existing_platforms))
-
- # Phase 0: Dynamic Routing
- if not args.skip_research:
- if needed_count == 0:
- print(f"\n--- Phase 0: Skipping Routing (Already have {len(existing_platforms)} existing cases: {existing_platforms}) ---")
- platforms = []
- else:
- print(f"\n--- Phase 0: Dynamic Platform Routing ({qwen_model}) ---")
- print(f"📡 Found existing cases: {existing_platforms}. Requesting {needed_count} new platforms...")
- try:
- router_prompt = SimplePrompt(base_dir / "prompts" / "router.prompt")
- rmessages = router_prompt.build_messages(
- requirement=requirement,
- existing_platforms=",".join(existing_platforms) if existing_platforms else "无",
- needed_count=str(needed_count)
- )
- rconfig = RunConfig(
- model=qwen_model,
- temperature=0.1,
- name="P0_Router",
- agent_type="router",
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- print(f"🚀 [Launch] P0_Router calculating optimal platforms...")
-
- router_response = ""
- async for item in runner_qwen.run(messages=rmessages, config=rconfig):
- if isinstance(item, Message) and item.role == "assistant" and isinstance(item.content, dict):
- text = item.content.get("text", "")
- if text and not item.content.get("tool_calls"):
- router_response = text
- if isinstance(item, Trace) and item.status == "completed":
- total_cost += item.total_cost
- costs_breakdown["P0_Router"] = round(item.total_cost, 4)
-
- if router_response:
- import re
- # Extract all alphabetic words from the response to handle extra text or markdown
- words = set(re.findall(r'\b[a-z]+\b', router_response.lower()))
- valid_platforms = ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]
- # Intersect words with valid platforms (direct exact word matching, exclude existing)
- final_platforms = [p for p in valid_platforms if p in words and p not in existing_platforms]
-
- if final_platforms:
- platforms = final_platforms[:needed_count]
- print(f"🎯 [Router Decision] Selected {len(platforms)} new platforms: {platforms}")
- else:
- platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
- print(f"⚠️ [Router Fallback] Invalid output '{router_response}'. Using delta default: {platforms}")
- except Exception as e:
- platforms = [p for p in platforms if p not in existing_platforms][:needed_count]
- print(f"⚠️ [Router Logic Failed] Using delta default platforms ({platforms}). Error: {e}")
- # Phase 1: MAP (Parallel Search) uses Qwen
- if not args.skip_research:
- print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
- phase1_tasks = []
- for p in platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model))
-
- phase1_results = await asyncio.gather(*phase1_tasks)
- for (task_cost, task_errors), p in zip(phase1_results, platforms):
- total_cost += task_cost
- costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
- global_errors.extend(task_errors)
-
- # Check if cases actually got written
- expected_file = Path(raw_cases_dir / f"case_{p}.json")
- if not expected_file.exists():
- err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- else:
- print("\n⏭️ [Skip] Phase 1 Skipped via --skip-research. Using existing cases...")
- # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
- if not args.research_only:
- # Phase 2: REDUCE 1 (Parallel Distillation) uses Claude
- print(f"\n--- Phase 2: Parallel Distillation ({claude_model}) ---")
- blueprint_file = str(output_dir / "blueprint.json")
- capabilities_file = str(output_dir / "capabilities_extracted.json")
- raw_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
-
- task_a = None
- task_b = None
-
- if Path(blueprint_file).exists():
- print(f"⏭️ [Skip P2] blueprint.json already exists. Skipping P2_FilterBlueprint.")
- else:
- if args.use_claude_sdk:
- print(" > Using [Anthropic SDK Core] for P2_FilterBlueprint")
- task_a = run_anthropic_sdk_task("filter_and_blueprint", {
- "requirement": requirement,
- "raw_files_glob": raw_glob,
- "output_file": blueprint_file
- }, "P2_FilterBlueprint", claude_model)
- else:
- print(" > Using [AgentRunner Core] for P2_FilterBlueprint")
- task_a = run_agent_task(runner_claude, "filter_and_blueprint", {
- "requirement": requirement,
- "raw_files_glob": raw_glob,
- "output_file": blueprint_file
- }, "P2_FilterBlueprint", claude_model)
-
- if Path(capabilities_file).exists():
- print(f"⏭️ [Skip P2] capabilities_extracted.json already exists. Skipping P2_ExtractCaps.")
- else:
- if args.use_claude_sdk:
- print(" > Using [Anthropic SDK Core] for P2_ExtractCaps")
- task_b = run_anthropic_sdk_task("extract_capabilities", {
- "requirement": requirement,
- "raw_files_glob": raw_glob,
- "output_file": capabilities_file
- }, "P2_ExtractCaps", claude_model)
- else:
- print(" > Using [AgentRunner Core] for P2_ExtractCaps")
- task_b = run_agent_task(runner_claude, "extract_capabilities", {
- "requirement": requirement,
- "raw_files_glob": raw_glob,
- "output_file": capabilities_file
- }, "P2_ExtractCaps", claude_model)
-
- to_await = []
- names_await = []
- if task_a:
- to_await.append(task_a)
- names_await.append("P2_FilterBlueprint")
- if task_b:
- to_await.append(task_b)
- names_await.append("P2_ExtractCaps")
-
- if to_await:
- phase2_results = await asyncio.gather(*to_await)
- for (cost, errs), t_name in zip(phase2_results, names_await):
- total_cost += cost
- costs_breakdown[t_name] = round(cost, 4)
- global_errors.extend(errs)
- # Phase 3: REDUCE 2 (Final Assembly) uses Claude
- print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
- strategy_file = str(output_dir / "strategy.json")
-
- if Path(strategy_file).exists():
- print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
- else:
- if args.use_claude_sdk:
- print(" > Using [Anthropic SDK Core]")
- phase3_cost, phase3_errs = await run_anthropic_sdk_task("assemble_strategy", {
- "requirement": requirement,
- "blueprint_file": blueprint_file,
- "capabilities_file": capabilities_file,
- "output_file": strategy_file
- }, "P3_Assembler", claude_model)
- else:
- print(" > Using [AgentRunner Core]")
- phase3_cost, phase3_errs = await run_agent_task(runner_claude, "assemble_strategy", {
- "requirement": requirement,
- "blueprint_file": blueprint_file,
- "capabilities_file": capabilities_file,
- "output_file": strategy_file
- }, "P3_Assembler", claude_model)
- total_cost += phase3_cost
- costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
- global_errors.extend(phase3_errs)
- else:
- print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
-
- end_time = time.time()
- elapsed_sec = end_time - start_time
-
- # Save Metrics
- metrics_file = base_dir / "run_metrics.json"
- metrics_data = []
- if metrics_file.exists():
- with open(metrics_file, "r", encoding="utf-8") as f:
- try:
- metrics_data = json.load(f)
- except json.JSONDecodeError:
- pass
-
- metrics_data.append({
- "index": args.index,
- "requirement": requirement[:80] + "...",
- "duration_seconds": round(elapsed_sec, 2),
- "total_cost_usd": round(total_cost, 4),
- "costs_breakdown": costs_breakdown,
- "errors": global_errors,
- "timestamp": datetime.now().isoformat()
- })
-
- with open(metrics_file, "w", encoding="utf-8") as f:
- json.dump(metrics_data, f, indent=2, ensure_ascii=False)
-
- print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
-
- finally:
- pass
-
- print("✅ Pipeline run finished.")
- if strategy_file:
- print("✅ Strategy saved to:", strategy_file)
- if __name__ == "__main__":
- asyncio.run(main())
|