| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470 |
- """
- V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
- """
- import argparse
- import asyncio
- import json
- import os
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
- # Force runtime working directory to project root so relative trace/cache paths
- # always land in the repo root no matter where this script is launched from.
- os.chdir(PROJECT_ROOT)
- # Add project root to path
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.tools.builtin.knowledge import KnowledgeConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.openrouter import create_openrouter_llm_call
- from agent.llm.claude import create_claude_llm_call
- # config from existing setup
- from examples.process_research.config import (
- OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
- BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
- )
- from agent.utils import setup_logging
- async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
- from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
- messages = prompt.build_messages(**kwargs)
- target_tools = []
- if prompt_name == "extract_capabilities":
- target_tools = ["capability_search", "capability_list", "tool_search"]
- # 按 agent 类型配置工具组权限
- tool_groups_map = {
- "researcher": ["core", "content"], # 搜索+文件,无浏览器
- "filter_and_blueprint": ["core"], # 只需文件读写
- "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
- "assemble_strategy": ["core"], # 只需文件读写
- }
- total_task_cost = 0.0
- task_errors = []
- out_file = kwargs.get("output_file")
- max_retries = 3
- last_trace_id = None
- last_validation_error = None
- final_trace_id = None # 用于返回最终成功的 trace_id
- def _instant_validate():
- """文件写出后立即校验并尝试修复,返回 error string 或 None"""
- nonlocal last_validation_error
- if not out_file or not Path(out_file).exists():
- return None
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw = f.read()
- try:
- data = json.loads(raw)
- except json.JSONDecodeError:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- ok, data, desc = try_fix_and_parse(raw)
- if ok:
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Instant Fix] {desc}")
- else:
- last_validation_error = "JSON parse failed, auto-fix unsuccessful"
- return last_validation_error
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- if err:
- last_validation_error = err
- print(f" ⚠️ [Instant Validation] {err}")
- return err
- else:
- print(f" ✅ [Instant Validation] {filename} OK")
- return None
- except Exception as e:
- last_validation_error = str(e)
- return str(e)
- for attempt in range(max_retries):
- if attempt > 0 and last_trace_id and last_validation_error:
- # 续跑模式:把错误信息告诉之前的 agent,让它修复
- print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
- fix_messages = [{
- "role": "user",
- "content": (
- f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
- f"错误详情:{last_validation_error}\n\n"
- f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
- f"只修复有问题的部分,不要丢弃已有的正确内容。"
- )
- }]
- fix_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=0.1,
- name=f"{task_name}_Fix{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for item in runner.run(messages=fix_messages, config=fix_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
- if isinstance(item, Message) and item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Fix File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} fix crashed: {err_msg}")
- elif attempt > 0:
- # 没有 trace_id 或没有 validation error,只能完全重跑
- print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=f"{task_name}_A{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
- else:
- # 首次执行
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=f"{task_name}_A{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
- # Verification & Recovery block
- if out_file and not Path(out_file).exists() and last_trace_id:
- print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
- recovery_messages = [{
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
- }]
- rec_config = RunConfig(
- model=model_name,
- temperature=0.1,
- name=task_name + "_Rec",
- agent_type=prompt_name,
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for r_item in runner.run(messages=recovery_messages, config=rec_config):
- if isinstance(r_item, Trace):
- if r_item.status == "completed":
- total_task_cost += r_item.total_cost
- elif r_item.status == "failed":
- task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
- if isinstance(r_item, Message) and r_item.role == "tool":
- content = r_item.content if isinstance(r_item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} recovery crashed: {err_msg}")
- # Schema Validation (with auto-fix layer)
- if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
- if skip_validation:
- print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
- return total_task_cost, task_errors, last_trace_id
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw_content = f.read()
- # Layer 1: 尝试直接解析
- try:
- data = json.loads(raw_content)
- except json.JSONDecodeError as parse_err:
- # Layer 2: 自动修复 JSON 语法错误
- print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...")
- try:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- success, data, fix_desc = try_fix_and_parse(raw_content)
- if success:
- # 修复成功,写回文件
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}")
- else:
- raise parse_err # 修复失败,抛出原始错误
- except ImportError:
- raise parse_err # fix_json_quotes 不可用,抛出原始错误
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- if err:
- raise ValueError(f"Schema Validation Failed: {err}")
- print(f" ✅ [Schema Validated] {Path(out_file).name}")
- final_trace_id = last_trace_id
- return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
- except Exception as e:
- err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
- print(f"❌ [Validation Error] {task_name}: {err_msg}")
- task_errors.append(f"{task_name} Error: {e}")
- last_validation_error = str(e)
- if attempt == max_retries - 1:
- print(f"❌ [Retry Limit] {task_name} exhausted retries.")
- return total_task_cost, task_errors, last_trace_id
- else:
- print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
- last_validation_error = None
- if attempt == max_retries - 1:
- return total_task_cost, task_errors, last_trace_id
- return total_task_cost, task_errors, last_trace_id
- async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
- """
- 备用:使用纯净的官方 Anthropic SDK 驱动。
- 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
- """
- from anthropic import AsyncAnthropic
- from agent.tools.registry import get_tool_registry
-
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
-
- # 1. 组装输入 Message
- raw_messages = prompt.build_messages(**kwargs)
- system_prompt = ""
- messages = []
- for msg in raw_messages:
- if msg["role"] == "system":
- system_prompt += msg["content"] + "\n\n"
- else:
- messages.append({"role": msg["role"], "content": msg["content"]})
-
- # 2. 映射目标工具
- target_tools = ["write_file", "write_json", "read_file", "glob_files"]
- if prompt_name == "extract_capabilities":
- target_tools.extend(["capability_search", "capability_list", "tool_search"])
-
- registry = get_tool_registry()
- schemas = registry.get_schemas(target_tools)
- anthropic_tools = []
- for s in schemas:
- anthropic_tools.append({
- "name": s["function"]["name"],
- "description": s["function"].get("description", ""),
- "input_schema": s["function"]["parameters"]
- })
-
- # 3. 初始化并开启 Loop
- # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
- client = AsyncAnthropic()
- total_task_cost = 0.0
- task_errors = []
- sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
-
- from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
- out_file = kwargs.get("output_file")
-
- max_retries = 3
- for attempt in range(max_retries):
- if attempt > 0:
- print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
-
- print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
-
- # Reset messages for retry
- messages_copy = list(messages)
-
- max_loops = 50
- for loop_idx in range(max_loops):
- try:
- # 去除前缀(兼容比如 openrouter 传入的名字)
- clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
-
- # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-5
- target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
- response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.2,
- system=system_prompt,
- messages=messages_copy,
- tools=anthropic_tools
- )
-
- # (简略预估,不代表真实官方开销)
- if hasattr(response, 'usage'):
- step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
- total_task_cost += step_cost
-
- # 加入助手回复
- assistant_content = []
- tool_uses = []
-
- for content_block in response.content:
- if content_block.type == "text":
- text_val = content_block.text
- if text_val:
- assistant_content.append({"type": "text", "text": text_val})
- print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
- elif content_block.type == "tool_use":
- assistant_content.append({
- "type": "tool_use",
- "id": content_block.id,
- "name": content_block.name,
- "input": content_block.input
- })
- tool_uses.append(content_block)
-
- if not assistant_content:
- assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
-
- messages_copy.append({"role": "assistant", "content": assistant_content})
-
- # 出口:没有调用工具说明任务结束
- if not tool_uses:
- print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
- break
-
- # 工具执行与回传
- tool_results = []
- for tu in tool_uses:
- if tu.name in ("write_file", "write_json"):
- print(f" 💾 [File Written by SDK] {task_name}")
-
- print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
- # 执行本地环境的函数
- result_str = await registry.execute(tu.name, tu.input)
-
- tool_results.append({
- "type": "tool_result",
- "tool_use_id": tu.id,
- "content": result_str
- })
-
- messages_copy.append({"role": "user", "content": tool_results})
-
- except Exception as e:
- err_msg = str(e)
- print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
- task_errors.append(err_msg)
- break
-
- # Verification & Recovery block for SDK (porting from AgentRunner)
- if out_file and not Path(out_file).exists():
- print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
- messages_copy.append({
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
- })
- try:
- target_model = "claude-sonnet-4-5" if "claude" in clean_model else clean_model
- rec_response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.1,
- system=system_prompt,
- messages=messages_copy,
- tools=anthropic_tools,
- tool_choice={"type": "any"}
- )
- for content_block in rec_response.content:
- if content_block.type == "tool_use":
- if content_block.name in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by SDK] {task_name}")
- await registry.execute(content_block.name, content_block.input)
- except Exception as e:
- print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
- task_errors.append(str(e))
-
- # Schema Validation
- if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
- if skip_validation:
- print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
- return total_task_cost, task_errors, None
-
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- data = json.loads(f.read())
-
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
-
- if err:
- raise ValueError(f"Schema Validation Failed: {err}")
-
- print(f" ✅ [Schema Validated] {Path(out_file).name}")
- return total_task_cost, task_errors # Success! Exit retry loop.
- except Exception as e:
- err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
- print(f"❌ [Validation Error] {task_name}: {err_msg}")
- task_errors.append(f"{task_name} Error: {e}")
-
- if attempt == max_retries - 1:
- print(f"❌ [Retry Limit] {task_name} exhausted retries.")
- return total_task_cost, task_errors
- else:
- print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
- if attempt == max_retries - 1:
- return total_task_cost, task_errors
-
- return total_task_cost, task_errors
- async def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
- STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"]
- parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search")
- parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
- parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step")
- parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)")
- parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)")
- parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)")
- parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)")
- parser.add_argument("--use-claude-sdk", action="store_true", help="Use Claude SDK (CLAUDE_CODE_KEY/URL) instead of OpenRouter")
- args = parser.parse_args()
- # ── 参数验证 ──
- # --case-index 只能与提取步骤一起使用
- if args.case_index is not None:
- extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"}
- if args.only_step and args.only_step not in extraction_steps:
- print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}")
- sys.exit(1)
- if not args.only_step:
- print("❌ Error: --case-index requires --only-step to specify which extraction step to run")
- sys.exit(1)
- # ── 三种模式互斥检查 ──
- mode_count = sum([
- args.only_step is not None,
- args.phase is not None,
- (args.start_from is not None or args.end_at is not None),
- ])
- if mode_count > 1:
- print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.")
- sys.exit(1)
- # 定义步骤拓扑(非线性,Phase 2 有两条并行分支)
- STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查
- def _step_in_range(step, start, end):
- """判断线性前缀中的 step 是否在 [start, end] 范围内"""
- all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"]
- if step not in all_steps or start not in all_steps:
- return False
- # 对于线性前缀,用索引比较
- if step in LINEAR_PREFIX:
- s_idx = LINEAR_PREFIX.index(step)
- start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1
- # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑
- if start_idx < 0:
- return False
- return s_idx >= start_idx
- return False
- PHASE_MAP = {
- 1: {"research", "source", "case-detailed"},
- 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"},
- 3: {"strategy"},
- }
- BRANCH_21 = ["process-cluster", "process-score"]
- BRANCH_22 = ["capability-enrich"]
- # 线性前缀(Phase 2 之前)
- LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"]
- # 计算需要执行的步骤集合
- active_steps = None # None = 全部执行
- if args.only_step:
- active_steps = {args.only_step}
- elif args.phase:
- active_steps = PHASE_MAP[args.phase]
- elif args.start_from or args.end_at:
- start = args.start_from or "research"
- end = args.end_at or "strategy"
- # 构建 active_steps,考虑并行分支
- active = set()
- # 1. 线性前缀部分
- for s in LINEAR_PREFIX:
- if _step_in_range(s, start, end):
- active.add(s)
- # 2. Phase 2 分支部分
- start_in_21 = start in BRANCH_21
- start_in_22 = start in BRANCH_22
- end_in_21 = end in BRANCH_21
- end_in_22 = end in BRANCH_22
- end_is_strategy = end == "strategy"
- # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑
- if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX):
- # 两条分支都包含(如果 start 允许的话)
- if not start_in_21 and not start_in_22:
- # start 在 Phase 2 之前或就是 Phase 2 的开头
- active.update(BRANCH_21)
- active.update(BRANCH_22)
- elif start_in_21:
- # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑
- idx = BRANCH_21.index(start)
- active.update(BRANCH_21[idx:])
- active.update(BRANCH_22)
- elif start_in_22:
- # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑
- idx = BRANCH_22.index(start)
- active.update(BRANCH_22[idx:])
- active.update(BRANCH_21)
- elif end_in_21:
- # end 在 2.1 分支内,只跑 2.1
- end_idx = BRANCH_21.index(end)
- start_idx = BRANCH_21.index(start) if start_in_21 else 0
- active.update(BRANCH_21[start_idx:end_idx + 1])
- elif end_in_22:
- # end 在 2.2 分支内,只跑 2.2
- end_idx = BRANCH_22.index(end)
- start_idx = BRANCH_22.index(start) if start_in_22 else 0
- active.update(BRANCH_22[start_idx:end_idx + 1])
- # 3. strategy
- if end_is_strategy:
- active.add("strategy")
- active_steps = active
- def should_run(step_name: str) -> bool:
- if active_steps is None:
- return True
- return step_name in active_steps
- if active_steps is not None:
- active_list = [s for s in STEP_ORDER if s in active_steps]
- print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}")
- base_dir = Path(__file__).parent
- # Load requirements locally
- req_path = base_dir / "db_requirements.json"
-
- with open(req_path, encoding='utf-8') as f:
- reqs = json.load(f)
-
- if args.index < 0 or args.index >= len(reqs):
- print("Index out of bounds")
- sys.exit(1)
-
- requirement = reqs[args.index]
-
- # 0. Setup directories
- output_dir = base_dir / "output" / f"{(args.index+1):03d}"
- output_dir.mkdir(parents=True, exist_ok=True)
- raw_cases_dir = output_dir / "raw_cases"
- raw_cases_dir.mkdir(parents=True, exist_ok=True)
-
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
-
- print("=" * 60)
- print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
- print("=" * 60)
- # ── 前置文件检查 ──
- # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在)
- STEP_DEPS = {
- "research": [],
- "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))],
- "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())],
- "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
- "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
- "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())],
- "process-cluster": [
- ("case.json", lambda: (output_dir / "case.json").exists()),
- ],
- "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())],
- "capability-enrich": [
- ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()),
- ("case.json", lambda: (output_dir / "case.json").exists()),
- ],
- "strategy": [
- ("process.json", lambda: (output_dir / "process.json").exists()),
- ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()),
- ],
- }
- # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖)
- STEP_PRODUCES = {
- "research": {"case_*.json"},
- "source": {"source.json"},
- "generate-case": {"case.json"},
- "workflow-extract": {"case.json"}, # 原地更新
- "capability-extract": {"case.json"}, # 原地更新
- "apply-grounding": {"case.json"}, # 原地更新
- "process-cluster": {"blueprint_temp.json"},
- "process-score": {"process.json"},
- "capability-enrich": {"capabilities.json"},
- "strategy": {"strategy.json"},
- }
- if active_steps is not None:
- # 计算本次运行会生成的文件集合
- will_produce = set()
- for s in STEP_ORDER:
- if s in active_steps:
- will_produce.update(STEP_PRODUCES.get(s, set()))
- # 检查每个 active step 的前置文件
- missing = []
- for s in STEP_ORDER:
- if s not in active_steps:
- continue
- for dep_name, dep_check in STEP_DEPS.get(s, []):
- if dep_name in will_produce:
- continue # 上游 step 会在本次运行中生成
- if not dep_check():
- missing.append((s, dep_name))
- if missing:
- print(f"\n❌ [Pre-flight Check] Missing prerequisite files:")
- for step, dep in missing:
- print(f" - Step '{step}' requires '{dep}'")
- print(f"\nRun upstream steps first, or use --start-from to include them.")
- sys.exit(1)
- else:
- print(f"✅ [Pre-flight Check] All prerequisites satisfied")
- # Load presets
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- print("✅ Configured Agent Presets (Skills Boundaries)")
- # Browser initialization removed to save resources
-
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- # Instantiate two distinct LLM orchestrators
- qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
- # 根据 --use-claude-sdk 参数选择 LLM 提供商
- if args.use_claude_sdk:
- # 使用 Claude SDK (CLAUDE_CODE_KEY/URL 或 ANTHROPIC_API_KEY/BASE_URL)
- claude_model = "claude-sonnet-4-5"
- print(f"✅ Using Claude SDK with model: {claude_model}")
- print(f" API Key: {os.getenv('CLAUDE_CODE_KEY', 'N/A')[:20]}...")
- print(f" Base URL: {os.getenv('CLAUDE_CODE_URL', os.getenv('ANTHROPIC_BASE_URL', 'https://api.anthropic.com'))}")
- claude_llm_call = create_claude_llm_call(model=claude_model)
- else:
- # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode)
- claude_model = "openai/gpt-5.4"
- print(f"✅ Using OpenRouter with model: {claude_model}")
- from agent.llm.openrouter import create_openrouter_llm_call
- claude_llm_call = create_openrouter_llm_call(model=claude_model)
-
- runner_qwen = AgentRunner(
- trace_store=store,
- llm_call=create_qwen_llm_call(model=qwen_model),
- skills_dir=SKILLS_DIR
- )
-
- runner_claude = AgentRunner(
- trace_store=store,
- llm_call=claude_llm_call,
- skills_dir=SKILLS_DIR
- )
- try:
- start_time = time.time()
- total_cost = 0.0
- costs_breakdown = {}
- global_errors = []
- strategy_file = None
- # ── --only-step 单步执行模式 ──────────────────────────────
- if args.only_step:
- step = args.only_step
- source_file = raw_cases_dir / "source.json"
- detailed_file = raw_cases_dir / "case_detailed.json"
- blueprint_temp_file = output_dir / "blueprint_temp.json"
- capabilities_temp_file = output_dir / "capabilities_temp.json"
- process_file = output_dir / "process.json"
- capabilities_file = output_dir / "capabilities.json"
- print(f"\n[Single Step Mode] Running only: {step}")
- if step == "research":
- # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响
- single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- if not single_platforms:
- print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube")
- sys.exit(1)
- print(f" 🔍 Research platforms: {single_platforms}")
- single_is_single = args.restart_mode.startswith("single_")
- phase1_tasks = []
- for p in single_platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(
- run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single)
- )
- phase1_results = await asyncio.gather(*phase1_tasks)
- for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms):
- print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}")
- elif step == "source":
- # Phase 1.5: 提取原始 source.json
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- result = extract_sources_to_json(raw_cases_dir)
- print(f" ✓ source.json: matched={result['total_matched']}")
- elif step == "generate-case":
- # Phase 1.5.5: 生成标准化 case.json
- from examples.process_pipeline.script.generate_case import generate_case_from_source
- result = await generate_case_from_source(raw_cases_dir)
- print(f" ✓ case.json: cases={result['total_cases']}")
- elif step == "workflow-extract":
- # Phase 1.6a: 提取 workflow 到 case.json
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run --only-step generate-case first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- # 临时只保留目标 case
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.extract_workflow import extract_workflow
- result = await extract_workflow(
- case_file_to_use,
- claude_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- # 更新原始文件中的对应 case
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink() # 删除临时文件
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}")
- elif step == "capability-extract":
- # Phase 1.6b: 提取 capabilities 到 case.json
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run --only-step generate-case first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.extract_capability import extract_capability
- result = await extract_capability(
- case_file_to_use,
- claude_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink()
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + capabilities: success={result['success']}, failed={result['failed']}")
- elif step == "apply-grounding":
- # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run workflow-extract and capability-extract first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.apply_to_grounding import apply_grounding
- result = await apply_grounding(
- case_file_to_use,
- claude_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink()
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}")
- elif step == "process-cluster":
- # Phase 2.1.1: 工序聚类
- if not detailed_file.exists():
- print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
- sys.exit(1)
- from examples.process_pipeline.script.cluster_processes import cluster_processes
- result = await cluster_processes(
- source_file=source_file, detailed_file=detailed_file,
- output_file=blueprint_temp_file, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ blueprint_temp.json: clusters={result['clusters']}")
- elif step == "process-score":
- # Phase 2.1.2: 工序打分
- if not blueprint_temp_file.exists():
- print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.")
- sys.exit(1)
- from examples.process_pipeline.script.score_processes import score_blueprints
- result = await score_blueprints(
- blueprint_file=blueprint_temp_file, output_file=process_file,
- requirement=requirement, llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ process.json: scored={result['scored']}")
- elif step == "capability-extract":
- # Phase 2.2.1: 能力初步聚类
- if not detailed_file.exists():
- print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
- sys.exit(1)
- from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
- result = await extract_capabilities_workflow(
- detailed_file=detailed_file, source_file=source_file,
- output_file=capabilities_temp_file, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}")
- elif step == "capability-enrich":
- # Phase 2.2.2: 能力丰富化
- if not capabilities_temp_file.exists():
- print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.")
- sys.exit(1)
- if not source_file.exists():
- print(f" ❌ source.json not found. Run --only-step source first.")
- sys.exit(1)
- from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
- result = await enrich_all_capabilities(
- capabilities_temp_file=capabilities_temp_file, source_file=source_file,
- output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}")
- elif step == "strategy":
- # Phase 3: 策略组装
- if not process_file.exists():
- print(f" ❌ process.json not found. Run --only-step process-score first.")
- sys.exit(1)
- if not capabilities_file.exists():
- print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.")
- sys.exit(1)
- strategy_file_path = output_dir / "strategy.json"
- from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
- result = await assemble_strategy(
- process_file=process_file, capabilities_file=capabilities_file,
- output_file=strategy_file_path, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}")
- elapsed = time.time() - start_time
- print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s")
- return
- # ── 正常 pipeline 流程 ──────────────────────────────
- existing_platforms = []
- if raw_cases_dir.exists():
- for f in raw_cases_dir.glob("case_*.json"):
- plat = f.stem.replace("case_", "")
- if plat in ["xhs", "youtube", "bili", "x", "zhihu", "gzh"]:
- existing_platforms.append(plat)
- platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- # Phase 0: Platform Selection (controlled by --platforms)
- if should_run("research"):
- new_platforms = [p for p in platforms if p not in existing_platforms]
- if not new_platforms:
- print(f"\n--- Phase 0: Skipping Research (All specified platforms already exist: {existing_platforms}) ---")
- platforms = []
- else:
- print(f"\n--- Phase 0: Using specified platforms ---")
- print(f"📡 Found existing cases: {existing_platforms}. Will research new platforms: {new_platforms}")
- platforms = new_platforms
- is_single_step = args.restart_mode.startswith("single_")
- # Phase 1: MAP (Parallel Search) uses Qwen
- if should_run("research"):
- print(f"\n--- Phase 1: Distributed Research Map ({qwen_model}) ---")
- phase1_tasks = []
- for p in platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=is_single_step))
-
- phase1_results = await asyncio.gather(*phase1_tasks)
- phase1_trace_ids = {}
- for (task_cost, task_errors, trace_id), p in zip(phase1_results, platforms):
- total_cost += task_cost
- costs_breakdown[f"P1_Research_{p}"] = round(task_cost, 4)
- phase1_trace_ids[f"P1_Research_{p}"] = trace_id
- global_errors.extend(task_errors)
- # Check if cases actually got written
- expected_file = Path(raw_cases_dir / f"case_{p}.json")
- if not expected_file.exists():
- err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- else:
- print("\n⏭️ [Skip] Phase 1 Skipped. Using existing cases...")
- # Phase 1.5: Extract raw post data from cache → raw_cases/source.json
- if should_run("source"):
- try:
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- trace_id_list = [tid for tid in phase1_trace_ids.values() if tid] if 'phase1_trace_ids' in dir() else None
- src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
- print(
- f"📎 [Source Extraction] "
- f"matched={src_stats['total_matched']} "
- f"→ {raw_cases_dir / 'source.json'}"
- )
- except Exception as e:
- err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 1.3: Generate case.json from source.json
- if should_run("generate-case"):
- source_file = raw_cases_dir / "source.json"
- case_file = output_dir / "case.json"
- if source_file.exists():
- try:
- from examples.process_pipeline.script.generate_case import generate_case
- print(f"\n--- Phase 1.3: Generate case.json ---")
- case_stats = await generate_case(source_file, case_file)
- print(
- f"📦 [Case Generation] "
- f"generated {case_stats.get('total', 0)} cases "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Case generation failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 1.6: Extract workflow and capabilities sequentially → case.json
- if should_run("workflow-extract"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.extract_workflow import extract_workflow
- print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---")
- workflow_stats = await extract_workflow(
- case_file,
- claude_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += workflow_stats.get("total_cost", 0.0)
- costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
- print(
- f"🔍 [Workflow Extraction] "
- f"success={workflow_stats['success']} "
- f"failed={workflow_stats['failed']}"
- )
- except Exception as e:
- err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- if should_run("capability-extract"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.extract_capability import extract_capability
- print(f"\n--- Phase 1.6b: Capability Extraction ({claude_model}) ---")
- capability_stats = await extract_capability(
- case_file,
- claude_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += capability_stats.get("total_cost", 0.0)
- costs_breakdown["P1.6b_CapabilityExtraction"] = round(capability_stats.get("total_cost", 0.0), 4)
- print(
- f"🧩 [Capability Extraction] "
- f"success={capability_stats['success']} "
- f"failed={capability_stats['failed']} "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Capability extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 1.7: Apply grounding (map apply_to_draft to apply_to)
- if should_run("apply-grounding"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.apply_to_grounding import apply_grounding
- print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---")
- grounding_stats = await apply_grounding(
- case_file,
- claude_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += grounding_stats.get("total_cost", 0.0)
- costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4)
- print(
- f"🗺️ [Apply Grounding] "
- f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Apply grounding failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude
- if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]):
- print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---")
- # 输出文件
- process_file = str(output_dir / "process.json")
- capabilities_file = str(output_dir / "capabilities.json")
- # 中间文件
- blueprint_temp_file = str(output_dir / "blueprint_temp.json")
- capabilities_temp_file = str(output_dir / "capabilities_temp.json")
- # 优先使用结构化数据:source.json + case_detailed.json
- detailed_file = raw_cases_dir / "case_detailed.json"
- source_file = raw_cases_dir / "source.json"
- if detailed_file.exists():
- input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/")
- print(f" Using structured data: source.json + case_detailed.json")
- else:
- input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
- print(f" Fallback to raw cases: case_*.json")
- force_strategy_rerun = False
- force_active = active_steps is not None
- # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ──
- async def run_cluster_processes():
- """2.1.1: 工序聚类 → blueprint_temp.json"""
- if Path(blueprint_temp_file).exists() and not force_active:
- print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping")
- return 0.0
- print(f" [2.1.1] Clustering processes...")
- try:
- from examples.process_pipeline.script.cluster_processes import cluster_processes
- result = await cluster_processes(
- source_file=source_file,
- detailed_file=detailed_file,
- output_file=Path(blueprint_temp_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, "
- f"generated {result.get('blueprints', 0)} blueprints")
- return result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}"
- print(f" [2.1.1] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- async def run_extract_capabilities():
- """2.2.1: 能力初步聚类 → capabilities_temp.json"""
- if Path(capabilities_temp_file).exists() and not force_active:
- print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping")
- return 0.0
- print(f" [2.2.1] Extracting capabilities...")
- try:
- from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
- result = await extract_capabilities_workflow(
- detailed_file=detailed_file,
- source_file=source_file,
- output_file=Path(capabilities_temp_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities")
- return result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}"
- print(f" [2.2.1] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists():
- force_strategy_rerun = True
- step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities())
- for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]):
- total_cost += cost
- costs_breakdown[name] = round(cost, 4)
- # ── Step 2: 并行执行 2.1.2 和 2.2.2 ──────────────────────────────
- async def run_score_processes():
- """2.1.2: 工序匹配度打分"""
- if Path(process_file).exists() and not force_active:
- print(f" [2.1.2] ⏭️ process.json exists, skipping")
- return 0.0
- if not Path(blueprint_temp_file).exists():
- print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping")
- return 0.0
- print(f" [2.1.2] Scoring processes...")
- try:
- from examples.process_pipeline.script.score_processes import score_blueprints
- score_result = await score_blueprints(
- blueprint_file=Path(blueprint_temp_file),
- output_file=Path(process_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints")
- return score_result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.1.2 ScoreProcesses failed: {e}"
- print(f" [2.1.2] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- async def run_enrich_capabilities():
- """2.2.2: 能力丰富化"""
- if Path(capabilities_file).exists() and not force_active:
- print(f" [2.2.2] ⏭️ capabilities.json exists, skipping")
- return 0.0
- if not Path(capabilities_temp_file).exists():
- print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping")
- return 0.0
- if not source_file.exists():
- print(f" [2.2.2] ⚠️ source.json not found, skipping")
- return 0.0
- print(f" [2.2.2] Enriching capabilities...")
- try:
- from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
- enrich_result = await enrich_all_capabilities(
- capabilities_temp_file=Path(capabilities_temp_file),
- source_file=source_file,
- output_file=Path(capabilities_file),
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities")
- return enrich_result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.2.2 EnrichCapabilities failed: {e}"
- print(f" [2.2.2] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- # 并行执行 Step 2
- step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities())
- for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]):
- total_cost += cost
- costs_breakdown[name] = round(cost, 4)
- # Phase 3: REDUCE 2 (Final Assembly) uses Claude
- print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
- strategy_file_path = output_dir / "strategy.json"
- if args.restart_mode == "single":
- force_strategy_rerun = False
- if strategy_file_path.exists() and not force_strategy_rerun and not force_active:
- print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
- else:
- if strategy_file_path.exists():
- print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
- print(" > Using [Workflow Core]")
- from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
- try:
- phase3_result = await assemble_strategy(
- process_file=Path(process_file),
- capabilities_file=Path(capabilities_file),
- output_file=strategy_file_path,
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- phase3_cost = phase3_result.get("total_cost", 0.0)
- print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps")
- except Exception as e:
- err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}"
- print(f" ⚠️ {err_msg}")
- global_errors.append(err_msg)
- phase3_cost = 0.0
- total_cost += phase3_cost
- costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
- else:
- print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
-
- end_time = time.time()
- elapsed_sec = end_time - start_time
-
- # Save Metrics
- metrics_file = base_dir / "run_metrics.json"
- metrics_data = []
- if metrics_file.exists():
- with open(metrics_file, "r", encoding="utf-8") as f:
- try:
- metrics_data = json.load(f)
- except json.JSONDecodeError:
- pass
-
- # Collect trace_ids from all phases
- trace_ids = {}
- if 'phase1_trace_ids' in dir():
- trace_ids.update(phase1_trace_ids)
- metrics_data.append({
- "index": args.index,
- "requirement": requirement[:80] + "...",
- "duration_seconds": round(elapsed_sec, 2),
- "total_cost_usd": round(total_cost, 4),
- "costs_breakdown": costs_breakdown,
- "trace_ids": trace_ids,
- "errors": global_errors,
- "timestamp": datetime.now().isoformat()
- })
-
- with open(metrics_file, "w", encoding="utf-8") as f:
- json.dump(metrics_data, f, indent=2, ensure_ascii=False)
-
- print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
-
- finally:
- pass
-
- print("✅ Pipeline run finished.")
- if strategy_file:
- print("✅ Strategy saved to:", strategy_file)
- if __name__ == "__main__":
- asyncio.run(main())
|