| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552 |
- """
- V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
- """
- import argparse
- import asyncio
- import json
- import os
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
- # Force runtime working directory to project root so relative trace/cache paths
- # always land in the repo root no matter where this script is launched from.
- os.chdir(PROJECT_ROOT)
- # Add project root to path
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.tools.builtin.knowledge import KnowledgeConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.openrouter import create_openrouter_llm_call
- from agent.llm.claude import create_claude_llm_call
- # config from existing setup
- from examples.process_research.config import (
- OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
- BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
- )
- from agent.utils import setup_logging
- async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False, start_trace_id: str = None, additional_messages: list = None):
- from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
- messages = prompt.build_messages(**kwargs)
- if additional_messages:
- messages.extend(additional_messages)
- target_tools = []
- if prompt_name == "extract_capabilities":
- target_tools = ["capability_search", "capability_list", "tool_search"]
- # 按 agent 类型配置工具组权限
- tool_groups_map = {
- "researcher": ["core", "content"], # 搜索+文件,无浏览器
- "filter_and_blueprint": ["core"], # 只需文件读写
- "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
- "assemble_strategy": ["core"], # 只需文件读写
- }
- total_task_cost = 0.0
- task_errors = []
- out_file = kwargs.get("output_file")
- max_retries = 3
- last_trace_id = start_trace_id
- last_validation_error = None
- final_trace_id = None # 用于返回最终成功的 trace_id
- def _instant_validate():
- """文件写出后立即校验并尝试修复,返回 error string 或 None"""
- nonlocal last_validation_error
- if not out_file or not Path(out_file).exists():
- return None
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw = f.read()
- try:
- data = json.loads(raw)
- except json.JSONDecodeError:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- ok, data, desc = try_fix_and_parse(raw)
- if ok:
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Instant Fix] {desc}")
- else:
- last_validation_error = "JSON parse failed, auto-fix unsuccessful"
- return last_validation_error
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- if err:
- last_validation_error = err
- print(f" ⚠️ [Instant Validation] {err}")
- return err
- else:
- print(f" ✅ [Instant Validation] {filename} OK")
- return None
- except Exception as e:
- last_validation_error = str(e)
- return str(e)
- for attempt in range(max_retries):
- if attempt > 0 and last_trace_id and last_validation_error:
- # 续跑模式:把错误信息告诉之前的 agent,让它修复
- print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
- fix_messages = [{
- "role": "user",
- "content": (
- f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
- f"错误详情:{last_validation_error}\n\n"
- f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
- f"只修复有问题的部分,不要丢弃已有的正确内容。"
- )
- }]
- fix_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=0.1,
- name=f"{task_name}_Fix{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for item in runner.run(messages=fix_messages, config=fix_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
- if isinstance(item, Message) and item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Fix File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} fix crashed: {err_msg}")
- elif attempt > 0:
- # 没有 trace_id 或没有 validation error,只能完全重跑
- print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=f"{task_name}_A{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
- else:
- # 首次执行
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=f"{task_name}_A{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False),
- trace_id=last_trace_id
- )
- print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
- # Verification & Recovery block
- if out_file and not Path(out_file).exists() and last_trace_id:
- print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
- recovery_messages = [{
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
- }]
- rec_config = RunConfig(
- model=model_name,
- temperature=0.1,
- name=task_name + "_Rec",
- agent_type=prompt_name,
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for r_item in runner.run(messages=recovery_messages, config=rec_config):
- if isinstance(r_item, Trace):
- if r_item.status == "completed":
- total_task_cost += r_item.total_cost
- elif r_item.status == "failed":
- task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
- if isinstance(r_item, Message) and r_item.role == "tool":
- content = r_item.content if isinstance(r_item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} recovery crashed: {err_msg}")
- # Schema Validation (with auto-fix layer)
- if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
- if skip_validation:
- print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
- return total_task_cost, task_errors, last_trace_id
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw_content = f.read()
- # Layer 1: 尝试直接解析
- try:
- data = json.loads(raw_content)
- except json.JSONDecodeError as parse_err:
- # Layer 2: 自动修复 JSON 语法错误
- print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...")
- try:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- success, data, fix_desc = try_fix_and_parse(raw_content)
- if success:
- # 修复成功,写回文件
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}")
- else:
- raise parse_err # 修复失败,抛出原始错误
- except ImportError:
- raise parse_err # fix_json_quotes 不可用,抛出原始错误
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- if err:
- raise ValueError(f"Schema Validation Failed: {err}")
- print(f" ✅ [Schema Validated] {Path(out_file).name}")
- final_trace_id = last_trace_id
- return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
- except Exception as e:
- err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
- print(f"❌ [Validation Error] {task_name}: {err_msg}")
- task_errors.append(f"{task_name} Error: {e}")
- last_validation_error = str(e)
- if attempt == max_retries - 1:
- print(f"❌ [Retry Limit] {task_name} exhausted retries.")
- return total_task_cost, task_errors, last_trace_id
- else:
- print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
- last_validation_error = None
- if attempt == max_retries - 1:
- return total_task_cost, task_errors, last_trace_id
- return total_task_cost, task_errors, last_trace_id
- async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
- """
- 备用:使用纯净的官方 Anthropic SDK 驱动。
- 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
- """
- from anthropic import AsyncAnthropic
- from agent.tools.registry import get_tool_registry
-
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
-
- # 1. 组装输入 Message
- raw_messages = prompt.build_messages(**kwargs)
- system_prompt = ""
- messages = []
- for msg in raw_messages:
- if msg["role"] == "system":
- system_prompt += msg["content"] + "\n\n"
- else:
- messages.append({"role": msg["role"], "content": msg["content"]})
-
- # 2. 映射目标工具
- target_tools = ["write_file", "write_json", "read_file", "glob_files"]
- if prompt_name == "extract_capabilities":
- target_tools.extend(["capability_search", "capability_list", "tool_search"])
-
- registry = get_tool_registry()
- schemas = registry.get_schemas(target_tools)
- anthropic_tools = []
- for s in schemas:
- anthropic_tools.append({
- "name": s["function"]["name"],
- "description": s["function"].get("description", ""),
- "input_schema": s["function"]["parameters"]
- })
-
- # 3. 初始化并开启 Loop
- # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
- client = AsyncAnthropic()
- total_task_cost = 0.0
- task_errors = []
- sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
-
- from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
- out_file = kwargs.get("output_file")
-
- max_retries = 3
- for attempt in range(max_retries):
- if attempt > 0:
- print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
-
- print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
-
- # Reset messages for retry
- messages_copy = list(messages)
-
- max_loops = 50
- for loop_idx in range(max_loops):
- try:
- # 去除前缀(兼容比如 openrouter 传入的名字)
- clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
-
- # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-6
- target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model
- response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.2,
- system=system_prompt,
- messages=messages_copy,
- tools=anthropic_tools
- )
-
- # (简略预估,不代表真实官方开销)
- if hasattr(response, 'usage'):
- step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
- total_task_cost += step_cost
-
- # 加入助手回复
- assistant_content = []
- tool_uses = []
-
- for content_block in response.content:
- if content_block.type == "text":
- text_val = content_block.text
- if text_val:
- assistant_content.append({"type": "text", "text": text_val})
- print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
- elif content_block.type == "tool_use":
- assistant_content.append({
- "type": "tool_use",
- "id": content_block.id,
- "name": content_block.name,
- "input": content_block.input
- })
- tool_uses.append(content_block)
-
- if not assistant_content:
- assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
-
- messages_copy.append({"role": "assistant", "content": assistant_content})
-
- # 出口:没有调用工具说明任务结束
- if not tool_uses:
- print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
- break
-
- # 工具执行与回传
- tool_results = []
- for tu in tool_uses:
- if tu.name in ("write_file", "write_json"):
- print(f" 💾 [File Written by SDK] {task_name}")
-
- print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
- # 执行本地环境的函数
- result_str = await registry.execute(tu.name, tu.input)
-
- tool_results.append({
- "type": "tool_result",
- "tool_use_id": tu.id,
- "content": result_str
- })
-
- messages_copy.append({"role": "user", "content": tool_results})
-
- except Exception as e:
- err_msg = str(e)
- print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
- task_errors.append(err_msg)
- break
-
- # Verification & Recovery block for SDK (porting from AgentRunner)
- if out_file and not Path(out_file).exists():
- print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
- messages_copy.append({
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
- })
- try:
- target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model
- rec_response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.1,
- system=system_prompt,
- messages=messages_copy,
- tools=anthropic_tools,
- tool_choice={"type": "any"}
- )
- for content_block in rec_response.content:
- if content_block.type == "tool_use":
- if content_block.name in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by SDK] {task_name}")
- await registry.execute(content_block.name, content_block.input)
- except Exception as e:
- print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
- task_errors.append(str(e))
-
- # Schema Validation
- if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
- if skip_validation:
- print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
- return total_task_cost, task_errors, None
-
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- data = json.loads(f.read())
-
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
-
- if err:
- raise ValueError(f"Schema Validation Failed: {err}")
-
- print(f" ✅ [Schema Validated] {Path(out_file).name}")
- return total_task_cost, task_errors # Success! Exit retry loop.
- except Exception as e:
- err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
- print(f"❌ [Validation Error] {task_name}: {err_msg}")
- task_errors.append(f"{task_name} Error: {e}")
-
- if attempt == max_retries - 1:
- print(f"❌ [Retry Limit] {task_name} exhausted retries.")
- return total_task_cost, task_errors
- else:
- print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
- if attempt == max_retries - 1:
- return total_task_cost, task_errors
-
- return total_task_cost, task_errors
- async def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
- STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"]
- parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search")
- parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
- parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step")
- parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)")
- parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)")
- parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)")
- parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)")
- parser.add_argument("--use-claude-sdk", action="store_true", help="Use Claude SDK (CLAUDE_CODE_KEY/URL) instead of OpenRouter")
- args = parser.parse_args()
- # ── 参数验证 ──
- # --case-index 只能与提取步骤一起使用
- if args.case_index is not None:
- extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"}
- if args.only_step and args.only_step not in extraction_steps:
- print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}")
- sys.exit(1)
- if not args.only_step:
- print("❌ Error: --case-index requires --only-step to specify which extraction step to run")
- sys.exit(1)
- # ── 三种模式互斥检查 ──
- mode_count = sum([
- args.only_step is not None,
- args.phase is not None,
- (args.start_from is not None or args.end_at is not None),
- ])
- if mode_count > 1:
- print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.")
- sys.exit(1)
- # 定义步骤拓扑(非线性,Phase 2 有两条并行分支)
- STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查
- def _step_in_range(step, start, end):
- """判断线性前缀中的 step 是否在 [start, end] 范围内"""
- all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"]
- if step not in all_steps or start not in all_steps:
- return False
- # 对于线性前缀,用索引比较
- if step in LINEAR_PREFIX:
- s_idx = LINEAR_PREFIX.index(step)
- start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1
- # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑
- if start_idx < 0:
- return False
- return s_idx >= start_idx
- return False
- PHASE_MAP = {
- 1: {"research", "source", "case-detailed"},
- 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"},
- 3: {"strategy"},
- }
- BRANCH_21 = ["process-cluster", "process-score"]
- BRANCH_22 = ["capability-enrich"]
- # 线性前缀(Phase 2 之前)
- LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"]
- # 计算需要执行的步骤集合
- active_steps = None # None = 全部执行
- if args.only_step:
- active_steps = {args.only_step}
- elif args.phase:
- active_steps = PHASE_MAP[args.phase]
- elif args.start_from or args.end_at:
- start = args.start_from or "research"
- end = args.end_at or "strategy"
- # 构建 active_steps,考虑并行分支
- active = set()
- # 1. 线性前缀部分
- for s in LINEAR_PREFIX:
- if _step_in_range(s, start, end):
- active.add(s)
- # 2. Phase 2 分支部分
- start_in_21 = start in BRANCH_21
- start_in_22 = start in BRANCH_22
- end_in_21 = end in BRANCH_21
- end_in_22 = end in BRANCH_22
- end_is_strategy = end == "strategy"
- # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑
- if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX):
- # 两条分支都包含(如果 start 允许的话)
- if not start_in_21 and not start_in_22:
- # start 在 Phase 2 之前或就是 Phase 2 的开头
- active.update(BRANCH_21)
- active.update(BRANCH_22)
- elif start_in_21:
- # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑
- idx = BRANCH_21.index(start)
- active.update(BRANCH_21[idx:])
- active.update(BRANCH_22)
- elif start_in_22:
- # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑
- idx = BRANCH_22.index(start)
- active.update(BRANCH_22[idx:])
- active.update(BRANCH_21)
- elif end_in_21:
- # end 在 2.1 分支内,只跑 2.1
- end_idx = BRANCH_21.index(end)
- start_idx = BRANCH_21.index(start) if start_in_21 else 0
- active.update(BRANCH_21[start_idx:end_idx + 1])
- elif end_in_22:
- # end 在 2.2 分支内,只跑 2.2
- end_idx = BRANCH_22.index(end)
- start_idx = BRANCH_22.index(start) if start_in_22 else 0
- active.update(BRANCH_22[start_idx:end_idx + 1])
- # 3. strategy
- if end_is_strategy:
- active.add("strategy")
- active_steps = active
- def should_run(step_name: str) -> bool:
- if active_steps is None:
- return True
- return step_name in active_steps
- if active_steps is not None:
- active_list = [s for s in STEP_ORDER if s in active_steps]
- print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}")
- base_dir = Path(__file__).parent
- # Load requirements locally
- req_path = base_dir / "db_requirements.json"
-
- with open(req_path, encoding='utf-8') as f:
- reqs = json.load(f)
-
- if args.index < 0 or args.index >= len(reqs):
- print("Index out of bounds")
- sys.exit(1)
-
- requirement = reqs[args.index]
-
- # 0. Setup directories
- output_dir = base_dir / "output" / f"{(args.index+1):03d}"
- output_dir.mkdir(parents=True, exist_ok=True)
- raw_cases_dir = output_dir / "raw_cases"
- raw_cases_dir.mkdir(parents=True, exist_ok=True)
- # 0.5 本次运行的快照盒:output_dir/history/<run_id>/{case.json, run.log}
- # run_id 同时作为 case_history 的活动 run id,让快照和 log 落在同一个文件夹
- from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file
- _run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
- set_run_id(_run_id)
- _run_dir = output_dir / "history" / _run_id
- _run_dir.mkdir(parents=True, exist_ok=True)
- _run_log_path = _run_dir / "run.log"
- _run_log_file = open(_run_log_path, "w", encoding="utf-8")
- # 启动时立刻快照一次原 case.json(如果已存在),作为本次运行的回滚点
- _initial_case_file = output_dir / "case.json"
- if _initial_case_file.exists():
- snapshot_case_file(_initial_case_file, step="run_start")
- class _Tee:
- def __init__(self, *streams):
- self.streams = streams
- def write(self, s):
- for st in self.streams:
- try:
- st.write(s)
- except Exception:
- pass
- self.flush()
- def flush(self):
- for st in self.streams:
- try:
- st.flush()
- except Exception:
- pass
- def isatty(self):
- return False
- sys.stdout = _Tee(sys.__stdout__, _run_log_file)
- sys.stderr = _Tee(sys.__stderr__, _run_log_file)
- import atexit as _atexit
- _atexit.register(_run_log_file.close)
- print(f"[run-log] tee active → {_run_log_path}")
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
-
- print("=" * 60)
- print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
- print("=" * 60)
- # ── 前置文件检查 ──
- # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在)
- STEP_DEPS = {
- "research": [],
- "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))],
- "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())],
- "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
- "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
- "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())],
- "process-cluster": [
- ("case.json", lambda: (output_dir / "case.json").exists()),
- ],
- "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())],
- "capability-enrich": [
- ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()),
- ("case.json", lambda: (output_dir / "case.json").exists()),
- ],
- "strategy": [
- ("process.json", lambda: (output_dir / "process.json").exists()),
- ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()),
- ],
- }
- # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖)
- STEP_PRODUCES = {
- "research": {"case_*.json"},
- "source": {"source.json"},
- "generate-case": {"case.json"},
- "workflow-extract": {"case.json"}, # 原地更新
- "capability-extract": {"case.json"}, # 原地更新
- "apply-grounding": {"case.json"}, # 原地更新
- "process-cluster": {"blueprint_temp.json"},
- "process-score": {"process.json"},
- "capability-enrich": {"capabilities.json"},
- "strategy": {"strategy.json"},
- }
- if active_steps is not None:
- # 计算本次运行会生成的文件集合
- will_produce = set()
- for s in STEP_ORDER:
- if s in active_steps:
- will_produce.update(STEP_PRODUCES.get(s, set()))
- # 检查每个 active step 的前置文件
- missing = []
- for s in STEP_ORDER:
- if s not in active_steps:
- continue
- for dep_name, dep_check in STEP_DEPS.get(s, []):
- if dep_name in will_produce:
- continue # 上游 step 会在本次运行中生成
- if not dep_check():
- missing.append((s, dep_name))
- if missing:
- print(f"\n❌ [Pre-flight Check] Missing prerequisite files:")
- for step, dep in missing:
- print(f" - Step '{step}' requires '{dep}'")
- print(f"\nRun upstream steps first, or use --start-from to include them.")
- sys.exit(1)
- else:
- print(f"✅ [Pre-flight Check] All prerequisites satisfied")
- # Load presets
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- print("✅ Configured Agent Presets (Skills Boundaries)")
- # Browser initialization removed to save resources
-
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- # Instantiate two distinct LLM orchestrators
- qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
- # 根据 --use-claude-sdk 参数选择 LLM 提供商
- if args.use_claude_sdk:
- # 使用 Claude SDK (CLAUDE_CODE_KEY/URL 或 ANTHROPIC_API_KEY/BASE_URL)
- claude_model = "claude-sonnet-4-6"
- print(f"✅ Using Claude SDK with model: {claude_model}")
- # 所有环节走 Claude Agent SDK(OAuth / Max 订阅)
- from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call
- claude_llm_call = create_claude_code_oauth_llm_call(model=claude_model)
- workflow_llm_call = claude_llm_call
- print(f"✅ All Claude operations will use Claude Agent SDK (OAuth/Max subscription)")
- else:
- # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode)
- claude_model = "openai/gpt-5.4"
- print(f"✅ Using OpenRouter with model: {claude_model}")
- from agent.llm.openrouter import create_openrouter_llm_call
- claude_llm_call = create_openrouter_llm_call(model=claude_model)
- workflow_llm_call = claude_llm_call # 没开 SDK 时与默认一致
-
- runner_qwen = AgentRunner(
- trace_store=store,
- llm_call=create_qwen_llm_call(model=qwen_model),
- skills_dir=SKILLS_DIR
- )
-
- runner_claude = AgentRunner(
- trace_store=store,
- llm_call=claude_llm_call,
- skills_dir=SKILLS_DIR
- )
- try:
- start_time = time.time()
- total_cost = 0.0
- costs_breakdown = {}
- global_errors = []
- strategy_file = None
- TARGET_QUALIFIED_CASES = 15
- # ── --only-step 单步执行模式 ──────────────────────────────
- if args.only_step:
- step = args.only_step
- source_file = raw_cases_dir / "source.json"
- detailed_file = raw_cases_dir / "case_detailed.json"
- blueprint_temp_file = output_dir / "blueprint_temp.json"
- capabilities_temp_file = output_dir / "capabilities_temp.json"
- process_file = output_dir / "process.json"
- capabilities_file = output_dir / "capabilities.json"
- print(f"\n[Single Step Mode] Running only: {step}")
- if step == "research":
- # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响
- single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- if not single_platforms:
- print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube")
- sys.exit(1)
- print(f" 🔍 Research platforms: {single_platforms}")
- single_is_single = args.restart_mode.startswith("single_")
- phase1_tasks = []
- for p in single_platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=80、正文充实)。"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(
- run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single)
- )
- phase1_results = await asyncio.gather(*phase1_tasks)
- for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms):
- print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}")
- elif step == "source":
- # Phase 1.5: 提取原始 source.json
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- result = extract_sources_to_json(raw_cases_dir)
- print(f" ✓ source.json: matched={result['total_matched']}, filtered={result['filtered_total']}")
- if result.get("filtered_reasons"):
- for reason, cnt in result["filtered_reasons"].items():
- print(f" - {reason}: {cnt}")
- elif step == "generate-case":
- # Phase 1.5.5: 生成标准化 case.json
- from examples.process_pipeline.script.generate_case import generate_case_from_source
- result = await generate_case_from_source(raw_cases_dir)
- print(f" ✓ case.json: cases={result['total_cases']}")
- elif step == "workflow-extract":
- # Phase 1.6a: 提取 workflow 到 case.json
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run --only-step generate-case first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- # 临时只保留目标 case
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.extract_workflow import extract_workflow
- result = await extract_workflow(
- case_file_to_use,
- workflow_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- # 更新原始文件中的对应 case
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- # 写前快照:把旧 case.json 复制到 history/ 留底
- from examples.process_pipeline.script.case_history import snapshot_case_file
- snap = snapshot_case_file(case_file, step="workflow_merge")
- if snap:
- print(f" [snapshot] {snap.name}")
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink() # 删除临时文件
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}")
- elif step == "capability-extract":
- # Phase 1.6b 已废弃:capability 现在由 workflow-extract 一并写入 case.json
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run --only-step generate-case first.")
- sys.exit(1)
- print(" ⏭️ capability-extract is integrated into workflow-extract; skipping old extractor.")
- elif step == "apply-grounding":
- # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run workflow-extract first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.apply_to_grounding import apply_grounding
- result = await apply_grounding(
- case_file_to_use,
- claude_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink()
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}")
- elif step == "process-cluster":
- # Phase 2.1.1: 工序聚类
- if not detailed_file.exists():
- print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
- sys.exit(1)
- from examples.process_pipeline.script.cluster_processes import cluster_processes
- result = await cluster_processes(
- source_file=source_file, detailed_file=detailed_file,
- output_file=blueprint_temp_file, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ blueprint_temp.json: clusters={result['clusters']}")
- elif step == "process-score":
- # Phase 2.1.2: 工序打分
- if not blueprint_temp_file.exists():
- print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.")
- sys.exit(1)
- from examples.process_pipeline.script.score_processes import score_blueprints
- result = await score_blueprints(
- blueprint_file=blueprint_temp_file, output_file=process_file,
- requirement=requirement, llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ process.json: scored={result['scored']}")
- elif step == "capability-extract":
- # Phase 2.2.1: 能力初步聚类
- if not detailed_file.exists():
- print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
- sys.exit(1)
- from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
- result = await extract_capabilities_workflow(
- detailed_file=detailed_file, source_file=source_file,
- output_file=capabilities_temp_file, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}")
- elif step == "capability-enrich":
- # Phase 2.2.2: 能力丰富化
- if not capabilities_temp_file.exists():
- print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.")
- sys.exit(1)
- if not source_file.exists():
- print(f" ❌ source.json not found. Run --only-step source first.")
- sys.exit(1)
- from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
- result = await enrich_all_capabilities(
- capabilities_temp_file=capabilities_temp_file, source_file=source_file,
- output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}")
- elif step == "strategy":
- # Phase 3: 策略组装
- if not process_file.exists():
- print(f" ❌ process.json not found. Run --only-step process-score first.")
- sys.exit(1)
- if not capabilities_file.exists():
- print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.")
- sys.exit(1)
- strategy_file_path = output_dir / "strategy.json"
- from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
- result = await assemble_strategy(
- process_file=process_file, capabilities_file=capabilities_file,
- output_file=strategy_file_path, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}")
- elapsed = time.time() - start_time
- print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s")
- return
- # ── 正常 pipeline 流程 ──────────────────────────────
- platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- # Phase 0: Platform Selection (controlled by --platforms)
- if should_run("research"):
- print(f"\n--- Phase 0: Using specified platforms ---")
- print(f"📡 Will research platforms: {platforms}")
- # 注:不再跳过已有平台,因为 agent 是增量追加模式
- is_single_step = args.restart_mode.startswith("single_")
- # Phase 1 & 1.5: MAP (Parallel Search) and Source Extraction Loop
- if should_run("research") or should_run("source"):
- print(f"\n--- Phase 1: Distributed Research Map with Source Filter Loop ({qwen_model}) ---")
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- MAX_ROUNDS = 50
- platform_traces = {p: None for p in platforms}
- active_platforms = platforms.copy()
- phase1_trace_ids = {}
- if not should_run("research") and should_run("source"):
- try:
- src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=None)
- print(f"📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}")
- if src_stats.get("filtered_reasons"):
- for reason, cnt in src_stats["filtered_reasons"].items():
- print(f" - {reason}: {cnt}")
- except Exception as e:
- err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- elif should_run("research"):
- round_idx = 0
- last_src_stats = None # 保存上一轮的过滤统计
- last_platform_count = {} # 保存上一轮每个平台的合格数
- while active_platforms and round_idx < MAX_ROUNDS:
- print(f"\n >>> [Research Loop] Round {round_idx+1} - Active platforms: {active_platforms}")
- if active_platforms:
- phase1_tasks = []
- for p in active_platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=70、正文充实)。"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- additional_msgs = None
- if round_idx > 0 and last_src_stats:
- # 构建带过滤详情的 feedback message
- p_count = last_platform_count.get(p, 0)
- # 筛选出该平台被过滤的条目
- p_filtered = [d for d in last_src_stats.get("filtered_details", []) if d.get("platform") == p]
- reason_summary = last_src_stats.get("filtered_reasons", {})
- feedback_lines = [
- f"【系统反馈】你在上一轮提取的有效案例数量未达标。",
- f"当前 {p.upper()} 合格案例:{p_count}/{TARGET_QUALIFIED_CASES}",
- f"过滤统计:{dict(reason_summary)}" if reason_summary else "",
- ]
- if p_filtered:
- feedback_lines.append(f"\n以下是你提交的被过滤掉的帖子(共{len(p_filtered)}条):")
- for item in p_filtered[:10]:
- feedback_lines.append(f" - [{item['case_id']}] {item['title']} → 原因: {item['filter_reason']}")
- if len(p_filtered) > 10:
- feedback_lines.append(f" ... 还有 {len(p_filtered) - 10} 条未列出")
- feedback_lines.append(
- f"\n请继续搜索并提取更多**全新的、不同的**高质量案例,**追加**写入到你一直在维护的文件中。"
- f"注意:不要重复之前已找过的案例!针对上述被过滤的原因,请确保新案例有详实的正文内容且评分准确。"
- )
- additional_msgs = [{
- "role": "user",
- "content": "\n".join(line for line in feedback_lines if line)
- }]
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(
- run_agent_task(
- runner_qwen, "researcher", kwargs,
- f"P1_Research_{p}_R{round_idx+1}", qwen_model,
- skip_validation=is_single_step,
- start_trace_id=platform_traces[p],
- additional_messages=additional_msgs
- )
- )
- phase1_results = await asyncio.gather(*phase1_tasks)
- for (task_cost, task_errors, trace_id), p in zip(phase1_results, active_platforms):
- total_cost += task_cost
- cost_key = f"P1_Research_{p}"
- costs_breakdown[cost_key] = costs_breakdown.get(cost_key, 0.0) + round(task_cost, 4)
- platform_traces[p] = trace_id
- phase1_trace_ids[f"P1_Research_{p}"] = trace_id
- global_errors.extend(task_errors)
- expected_file = Path(raw_cases_dir / f"case_{p}.json")
- if not expected_file.exists():
- err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- if should_run("source"):
- try:
- trace_id_list = [tid for tid in phase1_trace_ids.values() if tid]
- src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
- last_src_stats = src_stats
- print(f" 📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}")
- if src_stats.get("filtered_reasons"):
- for reason, cnt in src_stats["filtered_reasons"].items():
- print(f" - {reason}: {cnt}")
- except Exception as e:
- err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
- print(f" ⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # 判断是否达标:直接看 source.json 里每个平台的条目数
- source_file = raw_cases_dir / "source.json"
- if source_file.exists():
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- platform_count = {}
- for s in source_data.get("sources", []):
- p = s.get("platform")
- if p:
- platform_count[p] = platform_count.get(p, 0) + 1
- last_platform_count = platform_count
- print(f" 📊 [Source Count] Target: >={TARGET_QUALIFIED_CASES} qualified items per platform")
- platforms_to_keep = []
- for p in platforms:
- count = platform_count.get(p, 0)
- if p in active_platforms:
- print(f" - {p}: {count}/{TARGET_QUALIFIED_CASES}")
- if count < TARGET_QUALIFIED_CASES:
- platforms_to_keep.append(p)
- active_platforms = platforms_to_keep
- if not active_platforms:
- print(f" ✅ All platforms reached the target {TARGET_QUALIFIED_CASES} qualified items!")
- break
- else:
- print(f" ⚠️ source.json not found, continuing loop to retry...")
- round_idx += 1
- if round_idx >= MAX_ROUNDS and active_platforms:
- print(f" ⚠️ [Max Rounds] Reached {MAX_ROUNDS} rounds. Remaining platforms: {active_platforms}")
- else:
- print("\n⏭️ [Skip] Phase 1 & 1.5 Skipped. Using existing cases...")
- # Phase 1.3: Generate case.json from source.json
- if should_run("generate-case"):
- source_file = raw_cases_dir / "source.json"
- case_file = output_dir / "case.json"
- if source_file.exists():
- try:
- from examples.process_pipeline.script.generate_case import generate_case
- print(f"\n--- Phase 1.3: Generate case.json ---")
- case_stats = await generate_case(source_file, case_file)
- print(
- f"📦 [Case Generation] "
- f"generated {case_stats.get('total', 0)} cases "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Case generation failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 1.6: Extract workflow and capability together → case.json
- if should_run("workflow-extract"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.extract_workflow import extract_workflow
- print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---")
- workflow_stats = await extract_workflow(
- case_file,
- workflow_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += workflow_stats.get("total_cost", 0.0)
- costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
- print(
- f"🔍 [Workflow Extraction] "
- f"success={workflow_stats['success']} "
- f"failed={workflow_stats['failed']}"
- )
- except Exception as e:
- err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- if should_run("capability-extract"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- print("\n--- Phase 1.6b: Capability Extraction ---")
- print("🧩 [Capability Extraction] integrated into workflow-extract; skipping old extractor.")
- # Phase 1.7: Apply grounding (map apply_to_draft to apply_to)
- if should_run("apply-grounding"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.apply_to_grounding import apply_grounding
- print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---")
- grounding_stats = await apply_grounding(
- case_file,
- claude_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += grounding_stats.get("total_cost", 0.0)
- costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4)
- print(
- f"🗺️ [Apply Grounding] "
- f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Apply grounding failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude
- if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]):
- print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---")
- # 输出文件
- process_file = str(output_dir / "process.json")
- capabilities_file = str(output_dir / "capabilities.json")
- # 中间文件
- blueprint_temp_file = str(output_dir / "blueprint_temp.json")
- capabilities_temp_file = str(output_dir / "capabilities_temp.json")
- # 优先使用结构化数据:source.json + case_detailed.json
- detailed_file = raw_cases_dir / "case_detailed.json"
- source_file = raw_cases_dir / "source.json"
- if detailed_file.exists():
- input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/")
- print(f" Using structured data: source.json + case_detailed.json")
- else:
- input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
- print(f" Fallback to raw cases: case_*.json")
- force_strategy_rerun = False
- force_active = active_steps is not None
- # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ──
- async def run_cluster_processes():
- """2.1.1: 工序聚类 → blueprint_temp.json"""
- if Path(blueprint_temp_file).exists() and not force_active:
- print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping")
- return 0.0
- print(f" [2.1.1] Clustering processes...")
- try:
- from examples.process_pipeline.script.cluster_processes import cluster_processes
- result = await cluster_processes(
- source_file=source_file,
- detailed_file=detailed_file,
- output_file=Path(blueprint_temp_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, "
- f"generated {result.get('blueprints', 0)} blueprints")
- return result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}"
- print(f" [2.1.1] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- async def run_extract_capabilities():
- """2.2.1: 能力初步聚类 → capabilities_temp.json"""
- if Path(capabilities_temp_file).exists() and not force_active:
- print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping")
- return 0.0
- print(f" [2.2.1] Extracting capabilities...")
- try:
- from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
- result = await extract_capabilities_workflow(
- detailed_file=detailed_file,
- source_file=source_file,
- output_file=Path(capabilities_temp_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities")
- return result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}"
- print(f" [2.2.1] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists():
- force_strategy_rerun = True
- step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities())
- for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]):
- total_cost += cost
- costs_breakdown[name] = round(cost, 4)
- # ── Step 2: 并行执行 2.1.2 和 2.2.2 ──────────────────────────────
- async def run_score_processes():
- """2.1.2: 工序匹配度打分"""
- if Path(process_file).exists() and not force_active:
- print(f" [2.1.2] ⏭️ process.json exists, skipping")
- return 0.0
- if not Path(blueprint_temp_file).exists():
- print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping")
- return 0.0
- print(f" [2.1.2] Scoring processes...")
- try:
- from examples.process_pipeline.script.score_processes import score_blueprints
- score_result = await score_blueprints(
- blueprint_file=Path(blueprint_temp_file),
- output_file=Path(process_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints")
- return score_result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.1.2 ScoreProcesses failed: {e}"
- print(f" [2.1.2] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- async def run_enrich_capabilities():
- """2.2.2: 能力丰富化"""
- if Path(capabilities_file).exists() and not force_active:
- print(f" [2.2.2] ⏭️ capabilities.json exists, skipping")
- return 0.0
- if not Path(capabilities_temp_file).exists():
- print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping")
- return 0.0
- if not source_file.exists():
- print(f" [2.2.2] ⚠️ source.json not found, skipping")
- return 0.0
- print(f" [2.2.2] Enriching capabilities...")
- try:
- from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
- enrich_result = await enrich_all_capabilities(
- capabilities_temp_file=Path(capabilities_temp_file),
- source_file=source_file,
- output_file=Path(capabilities_file),
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities")
- return enrich_result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.2.2 EnrichCapabilities failed: {e}"
- print(f" [2.2.2] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- # 并行执行 Step 2
- step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities())
- for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]):
- total_cost += cost
- costs_breakdown[name] = round(cost, 4)
- # Phase 3: REDUCE 2 (Final Assembly) uses Claude
- print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
- strategy_file_path = output_dir / "strategy.json"
- if args.restart_mode == "single":
- force_strategy_rerun = False
- if strategy_file_path.exists() and not force_strategy_rerun and not force_active:
- print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
- else:
- if strategy_file_path.exists():
- print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
- print(" > Using [Workflow Core]")
- from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
- try:
- phase3_result = await assemble_strategy(
- process_file=Path(process_file),
- capabilities_file=Path(capabilities_file),
- output_file=strategy_file_path,
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- phase3_cost = phase3_result.get("total_cost", 0.0)
- print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps")
- except Exception as e:
- err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}"
- print(f" ⚠️ {err_msg}")
- global_errors.append(err_msg)
- phase3_cost = 0.0
- total_cost += phase3_cost
- costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
- else:
- print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
-
- end_time = time.time()
- elapsed_sec = end_time - start_time
-
- # Save Metrics
- metrics_file = base_dir / "run_metrics.json"
- metrics_data = []
- if metrics_file.exists():
- with open(metrics_file, "r", encoding="utf-8") as f:
- try:
- metrics_data = json.load(f)
- except json.JSONDecodeError:
- pass
-
- # Collect trace_ids from all phases
- trace_ids = {}
- if 'phase1_trace_ids' in dir():
- trace_ids.update(phase1_trace_ids)
- metrics_data.append({
- "index": args.index,
- "requirement": requirement[:80] + "...",
- "duration_seconds": round(elapsed_sec, 2),
- "total_cost_usd": round(total_cost, 4),
- "costs_breakdown": costs_breakdown,
- "trace_ids": trace_ids,
- "errors": global_errors,
- "timestamp": datetime.now().isoformat()
- })
-
- with open(metrics_file, "w", encoding="utf-8") as f:
- json.dump(metrics_data, f, indent=2, ensure_ascii=False)
-
- print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
-
- finally:
- pass
-
- print("✅ Pipeline run finished.")
- if strategy_file:
- print("✅ Strategy saved to:", strategy_file)
- if __name__ == "__main__":
- asyncio.run(main())
|