""" V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research """ import argparse import asyncio import json import os import sys import time from datetime import datetime from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent # Force runtime working directory to project root so relative trace/cache paths # always land in the repo root no matter where this script is launched from. os.chdir(PROJECT_ROOT) # Add project root to path sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner, RunConfig from agent.tools.builtin.knowledge import KnowledgeConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.llm.openrouter import create_openrouter_llm_call from agent.llm.claude import create_claude_llm_call # config from existing setup from examples.process_research.config import ( OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE, BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG ) from agent.utils import setup_logging async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False, start_trace_id: str = None, additional_messages: list = None): from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy base_dir = Path(__file__).parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" prompt = SimplePrompt(prompt_path) messages = prompt.build_messages(**kwargs) if additional_messages: messages.extend(additional_messages) target_tools = [] if prompt_name == "extract_capabilities": target_tools = ["capability_search", "capability_list", "tool_search"] # 按 agent 类型配置工具组权限 tool_groups_map = { "researcher": ["core", "content"], # 搜索+文件,无浏览器 "filter_and_blueprint": ["core"], # 只需文件读写 "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充) "assemble_strategy": ["core"], # 只需文件读写 } total_task_cost = 0.0 task_errors = [] out_file = kwargs.get("output_file") max_retries = 3 last_trace_id = start_trace_id last_validation_error = None final_trace_id = None # 用于返回最终成功的 trace_id def _instant_validate(): """文件写出后立即校验并尝试修复,返回 error string 或 None""" nonlocal last_validation_error if not out_file or not Path(out_file).exists(): return None try: with open(out_file, "r", encoding="utf-8") as f: raw = f.read() try: data = json.loads(raw) except json.JSONDecodeError: from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse ok, data, desc = try_fix_and_parse(raw) if ok: with open(out_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f" 🔧 [Instant Fix] {desc}") else: last_validation_error = "JSON parse failed, auto-fix unsuccessful" return last_validation_error filename = Path(out_file).name err = None if filename.startswith("case_"): err = validate_case(data) elif filename == "blueprint.json": err = validate_blueprint(data) elif filename == "capabilities_extracted.json": err = validate_capabilities(data) elif filename == "strategy.json": err = validate_strategy(data) if err: last_validation_error = err print(f" ⚠️ [Instant Validation] {err}") return err else: print(f" ✅ [Instant Validation] {filename} OK") return None except Exception as e: last_validation_error = str(e) return str(e) for attempt in range(max_retries): if attempt > 0 and last_trace_id and last_validation_error: # 续跑模式:把错误信息告诉之前的 agent,让它修复 print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent") fix_messages = [{ "role": "user", "content": ( f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n" f"错误详情:{last_validation_error}\n\n" f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。" f"只修复有问题的部分,不要丢弃已有的正确内容。" ) }] fix_config = RunConfig( model=prompt.config.get("model") or model_name, temperature=0.1, name=f"{task_name}_Fix{attempt}", agent_type=prompt_name, tools=target_tools, tool_groups=tool_groups_map.get(prompt_name, ["core"]), trace_id=last_trace_id, knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) try: async for item in runner.run(messages=fix_messages, config=fix_config): if isinstance(item, Trace): last_trace_id = item.trace_id if item.status == "completed": total_task_cost += item.total_cost elif item.status == "failed": task_errors.append(f"{task_name} Fix Failed: {item.error_message}") if isinstance(item, Message) and item.role == "tool": content = item.content if isinstance(item.content, dict) else {} if content.get("tool_name") in ("write_file", "write_json"): print(f" 💾 [Fix File Written by {task_name}]") _instant_validate() except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} fix crashed: {err_msg}") elif attempt > 0: # 没有 trace_id 或没有 validation error,只能完全重跑 print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart") if out_file and Path(out_file).exists(): Path(out_file).unlink() run_config = RunConfig( model=prompt.config.get("model") or model_name, temperature=prompt.config.get("temperature") or 0.3, name=f"{task_name}_A{attempt}", agent_type=prompt_name, tools=target_tools, tool_groups=tool_groups_map.get(prompt_name, ["core"]), knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})") try: async for item in runner.run(messages=messages, config=run_config): if isinstance(item, Trace): last_trace_id = item.trace_id if item.status == "completed": total_task_cost += item.total_cost elif item.status == "failed": task_errors.append(f"{task_name} Failed: {item.error_message}") if isinstance(item, Message): if item.role == "tool": content = item.content if isinstance(item.content, dict) else {} t_name = content.get("tool_name", "unknown") if t_name in ("write_file", "write_json"): print(f" 💾 [File Written by {task_name}]") _instant_validate() except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} crashed: {err_msg}") else: # 首次执行 run_config = RunConfig( model=prompt.config.get("model") or model_name, temperature=prompt.config.get("temperature") or 0.3, name=f"{task_name}_A{attempt}", agent_type=prompt_name, tools=target_tools, tool_groups=tool_groups_map.get(prompt_name, ["core"]), knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False), trace_id=last_trace_id ) print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})") try: async for item in runner.run(messages=messages, config=run_config): if isinstance(item, Trace): last_trace_id = item.trace_id if item.status == "completed": total_task_cost += item.total_cost elif item.status == "failed": task_errors.append(f"{task_name} Failed: {item.error_message}") if isinstance(item, Message): if item.role == "tool": content = item.content if isinstance(item.content, dict) else {} t_name = content.get("tool_name", "unknown") if t_name in ("write_file", "write_json"): print(f" 💾 [File Written by {task_name}]") _instant_validate() except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} crashed: {err_msg}") # Verification & Recovery block if out_file and not Path(out_file).exists() and last_trace_id: print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...") recovery_messages = [{ "role": "user", "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!" }] rec_config = RunConfig( model=model_name, temperature=0.1, name=task_name + "_Rec", agent_type=prompt_name, trace_id=last_trace_id, knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False) ) try: async for r_item in runner.run(messages=recovery_messages, config=rec_config): if isinstance(r_item, Trace): if r_item.status == "completed": total_task_cost += r_item.total_cost elif r_item.status == "failed": task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}") if isinstance(r_item, Message) and r_item.role == "tool": content = r_item.content if isinstance(r_item.content, dict) else {} if content.get("tool_name") in ("write_file", "write_json"): print(f" 💾 [Recovery File Written by {task_name}]") _instant_validate() except Exception as e: err_msg = f"{type(e).__name__}: {e}" print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}") task_errors.append(f"{task_name} recovery crashed: {err_msg}") # Schema Validation (with auto-fix layer) if out_file and Path(out_file).exists() and str(out_file).endswith(".json"): if skip_validation: print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)") return total_task_cost, task_errors, last_trace_id try: with open(out_file, "r", encoding="utf-8") as f: raw_content = f.read() # Layer 1: 尝试直接解析 try: data = json.loads(raw_content) except json.JSONDecodeError as parse_err: # Layer 2: 自动修复 JSON 语法错误 print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...") try: from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse success, data, fix_desc = try_fix_and_parse(raw_content) if success: # 修复成功,写回文件 with open(out_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}") else: raise parse_err # 修复失败,抛出原始错误 except ImportError: raise parse_err # fix_json_quotes 不可用,抛出原始错误 filename = Path(out_file).name err = None if filename.startswith("case_"): err = validate_case(data) elif filename == "blueprint.json": err = validate_blueprint(data) elif filename == "capabilities_extracted.json": err = validate_capabilities(data) elif filename == "strategy.json": err = validate_strategy(data) if err: raise ValueError(f"Schema Validation Failed: {err}") print(f" ✅ [Schema Validated] {Path(out_file).name}") final_trace_id = last_trace_id return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop. except Exception as e: err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}" print(f"❌ [Validation Error] {task_name}: {err_msg}") task_errors.append(f"{task_name} Error: {e}") last_validation_error = str(e) if attempt == max_retries - 1: print(f"❌ [Retry Limit] {task_name} exhausted retries.") return total_task_cost, task_errors, last_trace_id else: print(f"❌ [Missing File] {task_name} did not produce output file after recovery.") last_validation_error = None if attempt == max_retries - 1: return total_task_cost, task_errors, last_trace_id return total_task_cost, task_errors, last_trace_id async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False): """ 备用:使用纯净的官方 Anthropic SDK 驱动。 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。 """ from anthropic import AsyncAnthropic from agent.tools.registry import get_tool_registry base_dir = Path(__file__).parent prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt" prompt = SimplePrompt(prompt_path) # 1. 组装输入 Message raw_messages = prompt.build_messages(**kwargs) system_prompt = "" messages = [] for msg in raw_messages: if msg["role"] == "system": system_prompt += msg["content"] + "\n\n" else: messages.append({"role": msg["role"], "content": msg["content"]}) # 2. 映射目标工具 target_tools = ["write_file", "write_json", "read_file", "glob_files"] if prompt_name == "extract_capabilities": target_tools.extend(["capability_search", "capability_list", "tool_search"]) registry = get_tool_registry() schemas = registry.get_schemas(target_tools) anthropic_tools = [] for s in schemas: anthropic_tools.append({ "name": s["function"]["name"], "description": s["function"].get("description", ""), "input_schema": s["function"]["parameters"] }) # 3. 初始化并开启 Loop # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量 client = AsyncAnthropic() total_task_cost = 0.0 task_errors = [] sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy out_file = kwargs.get("output_file") max_retries = 3 for attempt in range(max_retries): if attempt > 0: print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}") if out_file and Path(out_file).exists(): Path(out_file).unlink() print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})") # Reset messages for retry messages_copy = list(messages) max_loops = 50 for loop_idx in range(max_loops): try: # 去除前缀(兼容比如 openrouter 传入的名字) clean_model = model_name.split("/")[-1] if "/" in model_name else model_name # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-6 target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model response = await client.messages.create( model=target_model, max_tokens=4096, temperature=0.2, system=system_prompt, messages=messages_copy, tools=anthropic_tools ) # (简略预估,不代表真实官方开销) if hasattr(response, 'usage'): step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0) total_task_cost += step_cost # 加入助手回复 assistant_content = [] tool_uses = [] for content_block in response.content: if content_block.type == "text": text_val = content_block.text if text_val: assistant_content.append({"type": "text", "text": text_val}) print(f"\n🤖 [{task_name} Output]:\n{text_val}\n") elif content_block.type == "tool_use": assistant_content.append({ "type": "tool_use", "id": content_block.id, "name": content_block.name, "input": content_block.input }) tool_uses.append(content_block) if not assistant_content: assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"}) messages_copy.append({"role": "assistant", "content": assistant_content}) # 出口:没有调用工具说明任务结束 if not tool_uses: print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})") break # 工具执行与回传 tool_results = [] for tu in tool_uses: if tu.name in ("write_file", "write_json"): print(f" 💾 [File Written by SDK] {task_name}") print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}") # 执行本地环境的函数 result_str = await registry.execute(tu.name, tu.input) tool_results.append({ "type": "tool_result", "tool_use_id": tu.id, "content": result_str }) messages_copy.append({"role": "user", "content": tool_results}) except Exception as e: err_msg = str(e) print(f"❌ [Fail SDK Core] {task_name}: {err_msg}") task_errors.append(err_msg) break # Verification & Recovery block for SDK (porting from AgentRunner) if out_file and not Path(out_file).exists(): print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...") messages_copy.append({ "role": "user", "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!" }) try: target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model rec_response = await client.messages.create( model=target_model, max_tokens=4096, temperature=0.1, system=system_prompt, messages=messages_copy, tools=anthropic_tools, tool_choice={"type": "any"} ) for content_block in rec_response.content: if content_block.type == "tool_use": if content_block.name in ("write_file", "write_json"): print(f" 💾 [Recovery File Written by SDK] {task_name}") await registry.execute(content_block.name, content_block.input) except Exception as e: print(f"❌ [Fail SDK Recovery] {task_name}: {e}") task_errors.append(str(e)) # Schema Validation if out_file and Path(out_file).exists() and str(out_file).endswith(".json"): if skip_validation: print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)") return total_task_cost, task_errors, None try: with open(out_file, "r", encoding="utf-8") as f: data = json.loads(f.read()) filename = Path(out_file).name err = None if filename.startswith("case_"): err = validate_case(data) elif filename == "blueprint.json": err = validate_blueprint(data) elif filename == "capabilities_extracted.json": err = validate_capabilities(data) elif filename == "strategy.json": err = validate_strategy(data) if err: raise ValueError(f"Schema Validation Failed: {err}") print(f" ✅ [Schema Validated] {Path(out_file).name}") return total_task_cost, task_errors # Success! Exit retry loop. except Exception as e: err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}" print(f"❌ [Validation Error] {task_name}: {err_msg}") task_errors.append(f"{task_name} Error: {e}") if attempt == max_retries - 1: print(f"❌ [Retry Limit] {task_name} exhausted retries.") return total_task_cost, task_errors else: print(f"❌ [Missing File] {task_name} did not produce output file after recovery.") if attempt == max_retries - 1: return total_task_cost, task_errors return total_task_cost, task_errors async def main(): parser = argparse.ArgumentParser() parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json") STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"] parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search") parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions") parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step") parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)") parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)") parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)") parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)") parser.add_argument("--use-claude-sdk", action="store_true", help="Use Claude SDK (CLAUDE_CODE_KEY/URL) instead of OpenRouter") args = parser.parse_args() # ── 参数验证 ── # --case-index 只能与提取步骤一起使用 if args.case_index is not None: extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"} if args.only_step and args.only_step not in extraction_steps: print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}") sys.exit(1) if not args.only_step: print("❌ Error: --case-index requires --only-step to specify which extraction step to run") sys.exit(1) # ── 三种模式互斥检查 ── mode_count = sum([ args.only_step is not None, args.phase is not None, (args.start_from is not None or args.end_at is not None), ]) if mode_count > 1: print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.") sys.exit(1) # 定义步骤拓扑(非线性,Phase 2 有两条并行分支) STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查 def _step_in_range(step, start, end): """判断线性前缀中的 step 是否在 [start, end] 范围内""" all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"] if step not in all_steps or start not in all_steps: return False # 对于线性前缀,用索引比较 if step in LINEAR_PREFIX: s_idx = LINEAR_PREFIX.index(step) start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1 # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑 if start_idx < 0: return False return s_idx >= start_idx return False PHASE_MAP = { 1: {"research", "source", "case-detailed"}, 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"}, 3: {"strategy"}, } BRANCH_21 = ["process-cluster", "process-score"] BRANCH_22 = ["capability-enrich"] # 线性前缀(Phase 2 之前) LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"] # 计算需要执行的步骤集合 active_steps = None # None = 全部执行 if args.only_step: active_steps = {args.only_step} elif args.phase: active_steps = PHASE_MAP[args.phase] elif args.start_from or args.end_at: start = args.start_from or "research" end = args.end_at or "strategy" # 构建 active_steps,考虑并行分支 active = set() # 1. 线性前缀部分 for s in LINEAR_PREFIX: if _step_in_range(s, start, end): active.add(s) # 2. Phase 2 分支部分 start_in_21 = start in BRANCH_21 start_in_22 = start in BRANCH_22 end_in_21 = end in BRANCH_21 end_in_22 = end in BRANCH_22 end_is_strategy = end == "strategy" # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑 if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX): # 两条分支都包含(如果 start 允许的话) if not start_in_21 and not start_in_22: # start 在 Phase 2 之前或就是 Phase 2 的开头 active.update(BRANCH_21) active.update(BRANCH_22) elif start_in_21: # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑 idx = BRANCH_21.index(start) active.update(BRANCH_21[idx:]) active.update(BRANCH_22) elif start_in_22: # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑 idx = BRANCH_22.index(start) active.update(BRANCH_22[idx:]) active.update(BRANCH_21) elif end_in_21: # end 在 2.1 分支内,只跑 2.1 end_idx = BRANCH_21.index(end) start_idx = BRANCH_21.index(start) if start_in_21 else 0 active.update(BRANCH_21[start_idx:end_idx + 1]) elif end_in_22: # end 在 2.2 分支内,只跑 2.2 end_idx = BRANCH_22.index(end) start_idx = BRANCH_22.index(start) if start_in_22 else 0 active.update(BRANCH_22[start_idx:end_idx + 1]) # 3. strategy if end_is_strategy: active.add("strategy") active_steps = active def should_run(step_name: str) -> bool: if active_steps is None: return True return step_name in active_steps if active_steps is not None: active_list = [s for s in STEP_ORDER if s in active_steps] print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}") base_dir = Path(__file__).parent # Load requirements locally req_path = base_dir / "db_requirements.json" with open(req_path, encoding='utf-8') as f: reqs = json.load(f) if args.index < 0 or args.index >= len(reqs): print("Index out of bounds") sys.exit(1) requirement = reqs[args.index] # 0. Setup directories output_dir = base_dir / "output" / f"{(args.index+1):03d}" output_dir.mkdir(parents=True, exist_ok=True) raw_cases_dir = output_dir / "raw_cases" raw_cases_dir.mkdir(parents=True, exist_ok=True) # 0.5 本次运行的快照盒:output_dir/history//{case.json, run.log} # run_id 同时作为 case_history 的活动 run id,让快照和 log 落在同一个文件夹 from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file _run_id = datetime.now().strftime("%Y%m%d_%H%M%S") set_run_id(_run_id) _run_dir = output_dir / "history" / _run_id _run_dir.mkdir(parents=True, exist_ok=True) _run_log_path = _run_dir / "run.log" _run_log_file = open(_run_log_path, "w", encoding="utf-8") # 启动时立刻快照一次原 case.json(如果已存在),作为本次运行的回滚点 _initial_case_file = output_dir / "case.json" if _initial_case_file.exists(): snapshot_case_file(_initial_case_file, step="run_start") class _Tee: def __init__(self, *streams): self.streams = streams def write(self, s): for st in self.streams: try: st.write(s) except Exception: pass self.flush() def flush(self): for st in self.streams: try: st.flush() except Exception: pass def isatty(self): return False sys.stdout = _Tee(sys.__stdout__, _run_log_file) sys.stderr = _Tee(sys.__stderr__, _run_log_file) import atexit as _atexit _atexit.register(_run_log_file.close) print(f"[run-log] tee active → {_run_log_path}") setup_logging(level=LOG_LEVEL, file=LOG_FILE) print("=" * 60) print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...") print("=" * 60) # ── 前置文件检查 ── # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在) STEP_DEPS = { "research": [], "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))], "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())], "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())], "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())], "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())], "process-cluster": [ ("case.json", lambda: (output_dir / "case.json").exists()), ], "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())], "capability-enrich": [ ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()), ("case.json", lambda: (output_dir / "case.json").exists()), ], "strategy": [ ("process.json", lambda: (output_dir / "process.json").exists()), ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()), ], } # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖) STEP_PRODUCES = { "research": {"case_*.json"}, "source": {"source.json"}, "generate-case": {"case.json"}, "workflow-extract": {"case.json"}, # 原地更新 "capability-extract": {"case.json"}, # 原地更新 "apply-grounding": {"case.json"}, # 原地更新 "process-cluster": {"blueprint_temp.json"}, "process-score": {"process.json"}, "capability-enrich": {"capabilities.json"}, "strategy": {"strategy.json"}, } if active_steps is not None: # 计算本次运行会生成的文件集合 will_produce = set() for s in STEP_ORDER: if s in active_steps: will_produce.update(STEP_PRODUCES.get(s, set())) # 检查每个 active step 的前置文件 missing = [] for s in STEP_ORDER: if s not in active_steps: continue for dep_name, dep_check in STEP_DEPS.get(s, []): if dep_name in will_produce: continue # 上游 step 会在本次运行中生成 if not dep_check(): missing.append((s, dep_name)) if missing: print(f"\n❌ [Pre-flight Check] Missing prerequisite files:") for step, dep in missing: print(f" - Step '{step}' requires '{dep}'") print(f"\nRun upstream steps first, or use --start-from to include them.") sys.exit(1) else: print(f"✅ [Pre-flight Check] All prerequisites satisfied") # Load presets presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) print("✅ Configured Agent Presets (Skills Boundaries)") # Browser initialization removed to save resources store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) # Instantiate two distinct LLM orchestrators qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface # 根据 --use-claude-sdk 参数选择 LLM 提供商 if args.use_claude_sdk: # 使用 Claude SDK (CLAUDE_CODE_KEY/URL 或 ANTHROPIC_API_KEY/BASE_URL) claude_model = "claude-sonnet-4-6" print(f"✅ Using Claude SDK with model: {claude_model}") # 所有环节走 Claude Agent SDK(OAuth / Max 订阅) from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call claude_llm_call = create_claude_code_oauth_llm_call(model=claude_model) workflow_llm_call = claude_llm_call print(f"✅ All Claude operations will use Claude Agent SDK (OAuth/Max subscription)") else: # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode) claude_model = "openai/gpt-5.4" print(f"✅ Using OpenRouter with model: {claude_model}") from agent.llm.openrouter import create_openrouter_llm_call claude_llm_call = create_openrouter_llm_call(model=claude_model) workflow_llm_call = claude_llm_call # 没开 SDK 时与默认一致 runner_qwen = AgentRunner( trace_store=store, llm_call=create_qwen_llm_call(model=qwen_model), skills_dir=SKILLS_DIR ) runner_claude = AgentRunner( trace_store=store, llm_call=claude_llm_call, skills_dir=SKILLS_DIR ) try: start_time = time.time() total_cost = 0.0 costs_breakdown = {} global_errors = [] strategy_file = None TARGET_QUALIFIED_CASES = 15 # ── --only-step 单步执行模式 ────────────────────────────── if args.only_step: step = args.only_step source_file = raw_cases_dir / "source.json" detailed_file = raw_cases_dir / "case_detailed.json" blueprint_temp_file = output_dir / "blueprint_temp.json" capabilities_temp_file = output_dir / "capabilities_temp.json" process_file = output_dir / "process.json" capabilities_file = output_dir / "capabilities.json" print(f"\n[Single Step Mode] Running only: {step}") if step == "research": # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响 single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()] if not single_platforms: print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube") sys.exit(1) print(f" 🔍 Research platforms: {single_platforms}") single_is_single = args.restart_mode.startswith("single_") phase1_tasks = [] for p in single_platforms: task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=80、正文充实)。" out_file = str(raw_cases_dir / f"case_{p}.json") kwargs = { "task": task_desc, "output_file": out_file } phase1_tasks.append( run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single) ) phase1_results = await asyncio.gather(*phase1_tasks) for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms): print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}") elif step == "source": # Phase 1.5: 提取原始 source.json from examples.process_pipeline.script.extract_sources import extract_sources_to_json result = extract_sources_to_json(raw_cases_dir) print(f" ✓ source.json: matched={result['total_matched']}, filtered={result['filtered_total']}") if result.get("filtered_reasons"): for reason, cnt in result["filtered_reasons"].items(): print(f" - {reason}: {cnt}") elif step == "generate-case": # Phase 1.5.5: 生成标准化 case.json from examples.process_pipeline.script.generate_case import generate_case_from_source result = await generate_case_from_source(raw_cases_dir) print(f" ✓ case.json: cases={result['total_cases']}") elif step == "workflow-extract": # Phase 1.6a: 提取 workflow 到 case.json case_file = output_dir / "case.json" if not case_file.exists(): print(f" ❌ case.json not found. Run --only-step generate-case first.") sys.exit(1) # 如果指定了 --case-index,先过滤 case.json if args.case_index is not None: with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) original_cases = case_data.get("cases", []) target_case = next((c for c in original_cases if c.get("index") == args.case_index), None) if not target_case: print(f" ❌ Case with index {args.case_index} not found in case.json") sys.exit(1) # 临时只保留目标 case case_data["cases"] = [target_case] temp_case_file = output_dir / f"case_temp_{args.case_index}.json" with open(temp_case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}") case_file_to_use = temp_case_file else: case_file_to_use = case_file from examples.process_pipeline.script.extract_workflow import extract_workflow result = await extract_workflow( case_file_to_use, workflow_llm_call, model=claude_model ) # 如果使用了临时文件,需要合并回原始 case.json if args.case_index is not None: with open(case_file_to_use, "r", encoding="utf-8") as f: updated_data = json.load(f) updated_case = updated_data["cases"][0] # 更新原始文件中的对应 case with open(case_file, "r", encoding="utf-8") as f: original_data = json.load(f) for i, c in enumerate(original_data["cases"]): if c.get("index") == args.case_index: original_data["cases"][i] = updated_case break # 写前快照:把旧 case.json 复制到 history/ 留底 from examples.process_pipeline.script.case_history import snapshot_case_file snap = snapshot_case_file(case_file, step="workflow_merge") if snap: print(f" [snapshot] {snap.name}") with open(case_file, "w", encoding="utf-8") as f: json.dump(original_data, f, ensure_ascii=False, indent=2) temp_case_file.unlink() # 删除临时文件 print(f" ✓ Merged case {args.case_index} back to case.json") print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}") elif step == "capability-extract": # Phase 1.6b 已废弃:capability 现在由 workflow-extract 一并写入 case.json case_file = output_dir / "case.json" if not case_file.exists(): print(f" ❌ case.json not found. Run --only-step generate-case first.") sys.exit(1) print(" ⏭️ capability-extract is integrated into workflow-extract; skipping old extractor.") elif step == "apply-grounding": # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to case_file = output_dir / "case.json" if not case_file.exists(): print(f" ❌ case.json not found. Run workflow-extract first.") sys.exit(1) # 如果指定了 --case-index,先过滤 case.json if args.case_index is not None: with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) original_cases = case_data.get("cases", []) target_case = next((c for c in original_cases if c.get("index") == args.case_index), None) if not target_case: print(f" ❌ Case with index {args.case_index} not found in case.json") sys.exit(1) case_data["cases"] = [target_case] temp_case_file = output_dir / f"case_temp_{args.case_index}.json" with open(temp_case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}") case_file_to_use = temp_case_file else: case_file_to_use = case_file from examples.process_pipeline.script.apply_to_grounding import apply_grounding result = await apply_grounding( case_file_to_use, claude_llm_call, model=claude_model ) # 如果使用了临时文件,需要合并回原始 case.json if args.case_index is not None: with open(case_file_to_use, "r", encoding="utf-8") as f: updated_data = json.load(f) updated_case = updated_data["cases"][0] with open(case_file, "r", encoding="utf-8") as f: original_data = json.load(f) for i, c in enumerate(original_data["cases"]): if c.get("index") == args.case_index: original_data["cases"][i] = updated_case break with open(case_file, "w", encoding="utf-8") as f: json.dump(original_data, f, ensure_ascii=False, indent=2) temp_case_file.unlink() print(f" ✓ Merged case {args.case_index} back to case.json") print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}") elif step == "process-cluster": # Phase 2.1.1: 工序聚类 if not detailed_file.exists(): print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.") sys.exit(1) from examples.process_pipeline.script.cluster_processes import cluster_processes result = await cluster_processes( source_file=source_file, detailed_file=detailed_file, output_file=blueprint_temp_file, requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" ✓ blueprint_temp.json: clusters={result['clusters']}") elif step == "process-score": # Phase 2.1.2: 工序打分 if not blueprint_temp_file.exists(): print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.") sys.exit(1) from examples.process_pipeline.script.score_processes import score_blueprints result = await score_blueprints( blueprint_file=blueprint_temp_file, output_file=process_file, requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" ✓ process.json: scored={result['scored']}") elif step == "capability-extract": # Phase 2.2.1: 能力初步聚类 if not detailed_file.exists(): print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.") sys.exit(1) from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow result = await extract_capabilities_workflow( detailed_file=detailed_file, source_file=source_file, output_file=capabilities_temp_file, requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}") elif step == "capability-enrich": # Phase 2.2.2: 能力丰富化 if not capabilities_temp_file.exists(): print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.") sys.exit(1) if not source_file.exists(): print(f" ❌ source.json not found. Run --only-step source first.") sys.exit(1) from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities result = await enrich_all_capabilities( capabilities_temp_file=capabilities_temp_file, source_file=source_file, output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model, ) print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}") elif step == "strategy": # Phase 3: 策略组装 if not process_file.exists(): print(f" ❌ process.json not found. Run --only-step process-score first.") sys.exit(1) if not capabilities_file.exists(): print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.") sys.exit(1) strategy_file_path = output_dir / "strategy.json" from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy result = await assemble_strategy( process_file=process_file, capabilities_file=capabilities_file, output_file=strategy_file_path, requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}") elapsed = time.time() - start_time print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s") return # ── 正常 pipeline 流程 ────────────────────────────── platforms = [p.strip() for p in args.platforms.split(",") if p.strip()] # Phase 0: Platform Selection (controlled by --platforms) if should_run("research"): print(f"\n--- Phase 0: Using specified platforms ---") print(f"📡 Will research platforms: {platforms}") # 注:不再跳过已有平台,因为 agent 是增量追加模式 is_single_step = args.restart_mode.startswith("single_") # Phase 1 & 1.5: MAP (Parallel Search) and Source Extraction Loop if should_run("research") or should_run("source"): print(f"\n--- Phase 1: Distributed Research Map with Source Filter Loop ({qwen_model}) ---") from examples.process_pipeline.script.extract_sources import extract_sources_to_json MAX_ROUNDS = 50 platform_traces = {p: None for p in platforms} active_platforms = platforms.copy() phase1_trace_ids = {} if not should_run("research") and should_run("source"): try: src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=None) print(f"📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}") if src_stats.get("filtered_reasons"): for reason, cnt in src_stats["filtered_reasons"].items(): print(f" - {reason}: {cnt}") except Exception as e: err_msg = f"Source extraction failed: {type(e).__name__}: {e}" print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) elif should_run("research"): round_idx = 0 last_src_stats = None # 保存上一轮的过滤统计 last_platform_count = {} # 保存上一轮每个平台的合格数 while active_platforms and round_idx < MAX_ROUNDS: print(f"\n >>> [Research Loop] Round {round_idx+1} - Active platforms: {active_platforms}") if active_platforms: phase1_tasks = [] for p in active_platforms: task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=70、正文充实)。" out_file = str(raw_cases_dir / f"case_{p}.json") additional_msgs = None if round_idx > 0 and last_src_stats: # 构建带过滤详情的 feedback message p_count = last_platform_count.get(p, 0) # 筛选出该平台被过滤的条目 p_filtered = [d for d in last_src_stats.get("filtered_details", []) if d.get("platform") == p] reason_summary = last_src_stats.get("filtered_reasons", {}) feedback_lines = [ f"【系统反馈】你在上一轮提取的有效案例数量未达标。", f"当前 {p.upper()} 合格案例:{p_count}/{TARGET_QUALIFIED_CASES}", f"过滤统计:{dict(reason_summary)}" if reason_summary else "", ] if p_filtered: feedback_lines.append(f"\n以下是你提交的被过滤掉的帖子(共{len(p_filtered)}条):") for item in p_filtered[:10]: feedback_lines.append(f" - [{item['case_id']}] {item['title']} → 原因: {item['filter_reason']}") if len(p_filtered) > 10: feedback_lines.append(f" ... 还有 {len(p_filtered) - 10} 条未列出") feedback_lines.append( f"\n请继续搜索并提取更多**全新的、不同的**高质量案例,**追加**写入到你一直在维护的文件中。" f"注意:不要重复之前已找过的案例!针对上述被过滤的原因,请确保新案例有详实的正文内容且评分准确。" ) additional_msgs = [{ "role": "user", "content": "\n".join(line for line in feedback_lines if line) }] kwargs = { "task": task_desc, "output_file": out_file } phase1_tasks.append( run_agent_task( runner_qwen, "researcher", kwargs, f"P1_Research_{p}_R{round_idx+1}", qwen_model, skip_validation=is_single_step, start_trace_id=platform_traces[p], additional_messages=additional_msgs ) ) phase1_results = await asyncio.gather(*phase1_tasks) for (task_cost, task_errors, trace_id), p in zip(phase1_results, active_platforms): total_cost += task_cost cost_key = f"P1_Research_{p}" costs_breakdown[cost_key] = costs_breakdown.get(cost_key, 0.0) + round(task_cost, 4) platform_traces[p] = trace_id phase1_trace_ids[f"P1_Research_{p}"] = trace_id global_errors.extend(task_errors) expected_file = Path(raw_cases_dir / f"case_{p}.json") if not expected_file.exists(): err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving." print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) if should_run("source"): try: trace_id_list = [tid for tid in phase1_trace_ids.values() if tid] src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list) last_src_stats = src_stats print(f" 📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}") if src_stats.get("filtered_reasons"): for reason, cnt in src_stats["filtered_reasons"].items(): print(f" - {reason}: {cnt}") except Exception as e: err_msg = f"Source extraction failed: {type(e).__name__}: {e}" print(f" ⚠️ [Warning] {err_msg}") global_errors.append(err_msg) # 判断是否达标:直接看 source.json 里每个平台的条目数 source_file = raw_cases_dir / "source.json" if source_file.exists(): with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) platform_count = {} for s in source_data.get("sources", []): p = s.get("platform") if p: platform_count[p] = platform_count.get(p, 0) + 1 last_platform_count = platform_count print(f" 📊 [Source Count] Target: >={TARGET_QUALIFIED_CASES} qualified items per platform") platforms_to_keep = [] for p in platforms: count = platform_count.get(p, 0) if p in active_platforms: print(f" - {p}: {count}/{TARGET_QUALIFIED_CASES}") if count < TARGET_QUALIFIED_CASES: platforms_to_keep.append(p) active_platforms = platforms_to_keep if not active_platforms: print(f" ✅ All platforms reached the target {TARGET_QUALIFIED_CASES} qualified items!") break else: print(f" ⚠️ source.json not found, continuing loop to retry...") round_idx += 1 if round_idx >= MAX_ROUNDS and active_platforms: print(f" ⚠️ [Max Rounds] Reached {MAX_ROUNDS} rounds. Remaining platforms: {active_platforms}") else: print("\n⏭️ [Skip] Phase 1 & 1.5 Skipped. Using existing cases...") # Phase 1.3: Generate case.json from source.json if should_run("generate-case"): source_file = raw_cases_dir / "source.json" case_file = output_dir / "case.json" if source_file.exists(): try: from examples.process_pipeline.script.generate_case import generate_case print(f"\n--- Phase 1.3: Generate case.json ---") case_stats = await generate_case(source_file, case_file) print( f"📦 [Case Generation] " f"generated {case_stats.get('total', 0)} cases " f"→ {case_file}" ) except Exception as e: err_msg = f"Case generation failed: {type(e).__name__}: {e}" print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) # Phase 1.6: Extract workflow and capability together → case.json if should_run("workflow-extract"): case_file = output_dir / "case.json" if case_file.exists(): try: from examples.process_pipeline.script.extract_workflow import extract_workflow print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---") workflow_stats = await extract_workflow( case_file, workflow_llm_call, model=claude_model, max_concurrent=3 ) total_cost += workflow_stats.get("total_cost", 0.0) costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4) print( f"🔍 [Workflow Extraction] " f"success={workflow_stats['success']} " f"failed={workflow_stats['failed']}" ) except Exception as e: err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}" print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) if should_run("capability-extract"): case_file = output_dir / "case.json" if case_file.exists(): print("\n--- Phase 1.6b: Capability Extraction ---") print("🧩 [Capability Extraction] integrated into workflow-extract; skipping old extractor.") # Phase 1.7: Apply grounding (map apply_to_draft to apply_to) if should_run("apply-grounding"): case_file = output_dir / "case.json" if case_file.exists(): try: from examples.process_pipeline.script.apply_to_grounding import apply_grounding print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---") grounding_stats = await apply_grounding( case_file, claude_llm_call, model=claude_model, max_concurrent=3 ) total_cost += grounding_stats.get("total_cost", 0.0) costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4) print( f"🗺️ [Apply Grounding] " f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} " f"→ {case_file}" ) except Exception as e: err_msg = f"Apply grounding failed: {type(e).__name__}: {e}" print(f"⚠️ [Warning] {err_msg}") global_errors.append(err_msg) # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]): print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---") # 输出文件 process_file = str(output_dir / "process.json") capabilities_file = str(output_dir / "capabilities.json") # 中间文件 blueprint_temp_file = str(output_dir / "blueprint_temp.json") capabilities_temp_file = str(output_dir / "capabilities_temp.json") # 优先使用结构化数据:source.json + case_detailed.json detailed_file = raw_cases_dir / "case_detailed.json" source_file = raw_cases_dir / "source.json" if detailed_file.exists(): input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/") print(f" Using structured data: source.json + case_detailed.json") else: input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/") print(f" Fallback to raw cases: case_*.json") force_strategy_rerun = False force_active = active_steps is not None # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ── async def run_cluster_processes(): """2.1.1: 工序聚类 → blueprint_temp.json""" if Path(blueprint_temp_file).exists() and not force_active: print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping") return 0.0 print(f" [2.1.1] Clustering processes...") try: from examples.process_pipeline.script.cluster_processes import cluster_processes result = await cluster_processes( source_file=source_file, detailed_file=detailed_file, output_file=Path(blueprint_temp_file), requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, " f"generated {result.get('blueprints', 0)} blueprints") return result.get("total_cost", 0.0) except Exception as e: err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}" print(f" [2.1.1] ⚠️ {err_msg}") global_errors.append(err_msg) return 0.0 async def run_extract_capabilities(): """2.2.1: 能力初步聚类 → capabilities_temp.json""" if Path(capabilities_temp_file).exists() and not force_active: print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping") return 0.0 print(f" [2.2.1] Extracting capabilities...") try: from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow result = await extract_capabilities_workflow( detailed_file=detailed_file, source_file=source_file, output_file=Path(capabilities_temp_file), requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities") return result.get("total_cost", 0.0) except Exception as e: err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}" print(f" [2.2.1] ⚠️ {err_msg}") global_errors.append(err_msg) return 0.0 if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists(): force_strategy_rerun = True step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities()) for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]): total_cost += cost costs_breakdown[name] = round(cost, 4) # ── Step 2: 并行执行 2.1.2 和 2.2.2 ────────────────────────────── async def run_score_processes(): """2.1.2: 工序匹配度打分""" if Path(process_file).exists() and not force_active: print(f" [2.1.2] ⏭️ process.json exists, skipping") return 0.0 if not Path(blueprint_temp_file).exists(): print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping") return 0.0 print(f" [2.1.2] Scoring processes...") try: from examples.process_pipeline.script.score_processes import score_blueprints score_result = await score_blueprints( blueprint_file=Path(blueprint_temp_file), output_file=Path(process_file), requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints") return score_result.get("total_cost", 0.0) except Exception as e: err_msg = f"P2.1.2 ScoreProcesses failed: {e}" print(f" [2.1.2] ⚠️ {err_msg}") global_errors.append(err_msg) return 0.0 async def run_enrich_capabilities(): """2.2.2: 能力丰富化""" if Path(capabilities_file).exists() and not force_active: print(f" [2.2.2] ⏭️ capabilities.json exists, skipping") return 0.0 if not Path(capabilities_temp_file).exists(): print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping") return 0.0 if not source_file.exists(): print(f" [2.2.2] ⚠️ source.json not found, skipping") return 0.0 print(f" [2.2.2] Enriching capabilities...") try: from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities enrich_result = await enrich_all_capabilities( capabilities_temp_file=Path(capabilities_temp_file), source_file=source_file, output_file=Path(capabilities_file), llm_call=claude_llm_call, model=claude_model, ) print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities") return enrich_result.get("total_cost", 0.0) except Exception as e: err_msg = f"P2.2.2 EnrichCapabilities failed: {e}" print(f" [2.2.2] ⚠️ {err_msg}") global_errors.append(err_msg) return 0.0 # 并行执行 Step 2 step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities()) for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]): total_cost += cost costs_breakdown[name] = round(cost, 4) # Phase 3: REDUCE 2 (Final Assembly) uses Claude print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---") strategy_file_path = output_dir / "strategy.json" if args.restart_mode == "single": force_strategy_rerun = False if strategy_file_path.exists() and not force_strategy_rerun and not force_active: print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.") else: if strategy_file_path.exists(): print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...") print(" > Using [Workflow Core]") from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy try: phase3_result = await assemble_strategy( process_file=Path(process_file), capabilities_file=Path(capabilities_file), output_file=strategy_file_path, requirement=requirement, llm_call=claude_llm_call, model=claude_model, ) phase3_cost = phase3_result.get("total_cost", 0.0) print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps") except Exception as e: err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}" print(f" ⚠️ {err_msg}") global_errors.append(err_msg) phase3_cost = 0.0 total_cost += phase3_cost costs_breakdown["P3_Assembler"] = round(phase3_cost, 4) else: print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---") end_time = time.time() elapsed_sec = end_time - start_time # Save Metrics metrics_file = base_dir / "run_metrics.json" metrics_data = [] if metrics_file.exists(): with open(metrics_file, "r", encoding="utf-8") as f: try: metrics_data = json.load(f) except json.JSONDecodeError: pass # Collect trace_ids from all phases trace_ids = {} if 'phase1_trace_ids' in dir(): trace_ids.update(phase1_trace_ids) metrics_data.append({ "index": args.index, "requirement": requirement[:80] + "...", "duration_seconds": round(elapsed_sec, 2), "total_cost_usd": round(total_cost, 4), "costs_breakdown": costs_breakdown, "trace_ids": trace_ids, "errors": global_errors, "timestamp": datetime.now().isoformat() }) with open(metrics_file, "w", encoding="utf-8") as f: json.dump(metrics_data, f, indent=2, ensure_ascii=False) print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}") finally: pass print("✅ Pipeline run finished.") if strategy_file: print("✅ Strategy saved to:", strategy_file) if __name__ == "__main__": asyncio.run(main())