| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508 |
- """
- V4 Pipeline: Hardcoded Map-Reduce Orchestration for AIGC Process Research
- """
- import argparse
- import asyncio
- import json
- import os
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
- # Force runtime working directory to project root so relative trace/cache paths
- # always land in the repo root no matter where this script is launched from.
- os.chdir(PROJECT_ROOT)
- # Add project root to path
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.tools.builtin.knowledge import KnowledgeConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.openrouter import create_openrouter_llm_call
- from agent.llm.claude import create_claude_llm_call
- # config from existing setup
- from examples.process_research.config import (
- OUTPUT_DIR, TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
- BROWSER_TYPE, HEADLESS, COORDINATOR_RUN_CONFIG
- )
- from agent.utils import setup_logging
- async def run_agent_task(runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False, start_trace_id: str = None, additional_messages: list = None):
- from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
- messages = prompt.build_messages(**kwargs)
- if additional_messages:
- messages.extend(additional_messages)
- target_tools = []
- if prompt_name == "extract_capabilities":
- target_tools = ["capability_search", "capability_list", "tool_search"]
- # 按 agent 类型配置工具组权限
- tool_groups_map = {
- "researcher": ["core", "content"], # 搜索+文件,无浏览器
- "filter_and_blueprint": ["core"], # 只需文件读写
- "extract_capabilities": ["core"], # 只需文件读写(额外工具由 target_tools 补充)
- "assemble_strategy": ["core"], # 只需文件读写
- }
- total_task_cost = 0.0
- task_errors = []
- out_file = kwargs.get("output_file")
- max_retries = 3
- last_trace_id = start_trace_id
- last_validation_error = None
- final_trace_id = None # 用于返回最终成功的 trace_id
- def _instant_validate():
- """文件写出后立即校验并尝试修复,返回 error string 或 None"""
- nonlocal last_validation_error
- if not out_file or not Path(out_file).exists():
- return None
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw = f.read()
- try:
- data = json.loads(raw)
- except json.JSONDecodeError:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- ok, data, desc = try_fix_and_parse(raw)
- if ok:
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Instant Fix] {desc}")
- else:
- last_validation_error = "JSON parse failed, auto-fix unsuccessful"
- return last_validation_error
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- if err:
- last_validation_error = err
- print(f" ⚠️ [Instant Validation] {err}")
- return err
- else:
- print(f" ✅ [Instant Validation] {filename} OK")
- return None
- except Exception as e:
- last_validation_error = str(e)
- return str(e)
- for attempt in range(max_retries):
- if attempt > 0 and last_trace_id and last_validation_error:
- # 续跑模式:把错误信息告诉之前的 agent,让它修复
- print(f"🔄 [Continue {attempt}/{max_retries-1}] {task_name} - sending fix instructions to existing agent")
- fix_messages = [{
- "role": "user",
- "content": (
- f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
- f"错误详情:{last_validation_error}\n\n"
- f"请立刻读取该文件,根据以上错误信息修复内容,然后重新调用 write_json 写入到同一路径 `{out_file}`。"
- f"只修复有问题的部分,不要丢弃已有的正确内容。"
- )
- }]
- fix_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=0.1,
- name=f"{task_name}_Fix{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for item in runner.run(messages=fix_messages, config=fix_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Fix Failed: {item.error_message}")
- if isinstance(item, Message) and item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Fix File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Fix] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} fix crashed: {err_msg}")
- elif attempt > 0:
- # 没有 trace_id 或没有 validation error,只能完全重跑
- print(f"🔄 [Retry {attempt}/{max_retries-1}] {task_name} - no prior trace, full restart")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=f"{task_name}_A{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
- else:
- # 首次执行
- run_config = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=prompt.config.get("temperature") or 0.3,
- name=f"{task_name}_A{attempt}",
- agent_type=prompt_name,
- tools=target_tools,
- tool_groups=tool_groups_map.get(prompt_name, ["core"]),
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False),
- trace_id=last_trace_id
- )
- print(f"🚀 [Launch] {task_name} (Attempt {attempt+1})")
- try:
- async for item in runner.run(messages=messages, config=run_config):
- if isinstance(item, Trace):
- last_trace_id = item.trace_id
- if item.status == "completed":
- total_task_cost += item.total_cost
- elif item.status == "failed":
- task_errors.append(f"{task_name} Failed: {item.error_message}")
- if isinstance(item, Message):
- if item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- t_name = content.get("tool_name", "unknown")
- if t_name in ("write_file", "write_json"):
- print(f" 💾 [File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} crashed: {err_msg}")
- # Verification & Recovery block
- if out_file and not Path(out_file).exists() and last_trace_id:
- print(f"⚠️ [Recovery] {task_name} missing output file. Triggering forced wrap-up continuation...")
- recovery_messages = [{
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已终止,但尚未将结果写入文件。请立刻调用 write_json 工具,将你目前已经搜集或处理到的原生结构化内容直接作为 json_data 参数对象写入到绝对路径 `{out_file}`,如果搜集失败也请写入空的总结对象。必须立刻执行写入!"
- }]
- rec_config = RunConfig(
- model=model_name,
- temperature=0.1,
- name=task_name + "_Rec",
- agent_type=prompt_name,
- trace_id=last_trace_id,
- knowledge=KnowledgeConfig(enable_completion_extraction=False, enable_extraction=False, enable_injection=False)
- )
- try:
- async for r_item in runner.run(messages=recovery_messages, config=rec_config):
- if isinstance(r_item, Trace):
- if r_item.status == "completed":
- total_task_cost += r_item.total_cost
- elif r_item.status == "failed":
- task_errors.append(f"{task_name} Recovery Failed: {r_item.error_message}")
- if isinstance(r_item, Message) and r_item.role == "tool":
- content = r_item.content if isinstance(r_item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by {task_name}]")
- _instant_validate()
- except Exception as e:
- err_msg = f"{type(e).__name__}: {e}"
- print(f"❌ [Exception Recovery] {task_name} crashed: {err_msg}")
- task_errors.append(f"{task_name} recovery crashed: {err_msg}")
- # Schema Validation (with auto-fix layer)
- if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
- if skip_validation:
- print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
- return total_task_cost, task_errors, last_trace_id
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw_content = f.read()
- # Layer 1: 尝试直接解析
- try:
- data = json.loads(raw_content)
- except json.JSONDecodeError as parse_err:
- # Layer 2: 自动修复 JSON 语法错误
- print(f" 🔧 [Auto-Fix] {Path(out_file).name} has JSON syntax error, attempting fix...")
- try:
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- success, data, fix_desc = try_fix_and_parse(raw_content)
- if success:
- # 修复成功,写回文件
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Auto-Fix] Fixed: {fix_desc}")
- else:
- raise parse_err # 修复失败,抛出原始错误
- except ImportError:
- raise parse_err # fix_json_quotes 不可用,抛出原始错误
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- if err:
- raise ValueError(f"Schema Validation Failed: {err}")
- print(f" ✅ [Schema Validated] {Path(out_file).name}")
- final_trace_id = last_trace_id
- return total_task_cost, task_errors, final_trace_id # Success! Exit retry loop.
- except Exception as e:
- err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
- print(f"❌ [Validation Error] {task_name}: {err_msg}")
- task_errors.append(f"{task_name} Error: {e}")
- last_validation_error = str(e)
- if attempt == max_retries - 1:
- print(f"❌ [Retry Limit] {task_name} exhausted retries.")
- return total_task_cost, task_errors, last_trace_id
- else:
- print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
- last_validation_error = None
- if attempt == max_retries - 1:
- return total_task_cost, task_errors, last_trace_id
- return total_task_cost, task_errors, last_trace_id
- async def run_anthropic_sdk_task(prompt_name: str, kwargs: dict, task_name: str, model_name: str, skip_validation: bool = False):
- """
- 备用:使用纯净的官方 Anthropic SDK 驱动。
- 跳过内部大架构的 trace 追踪,但保留对原有 Python 工具库(如 write_file/glob_files)的无缝调用。
- """
- from anthropic import AsyncAnthropic
- from agent.tools.registry import get_tool_registry
-
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
-
- # 1. 组装输入 Message
- raw_messages = prompt.build_messages(**kwargs)
- system_prompt = ""
- messages = []
- for msg in raw_messages:
- if msg["role"] == "system":
- system_prompt += msg["content"] + "\n\n"
- else:
- messages.append({"role": msg["role"], "content": msg["content"]})
-
- # 2. 映射目标工具
- target_tools = ["write_file", "write_json", "read_file", "glob_files"]
- if prompt_name == "extract_capabilities":
- target_tools.extend(["capability_search", "capability_list", "tool_search"])
-
- registry = get_tool_registry()
- schemas = registry.get_schemas(target_tools)
- anthropic_tools = []
- for s in schemas:
- anthropic_tools.append({
- "name": s["function"]["name"],
- "description": s["function"].get("description", ""),
- "input_schema": s["function"]["parameters"]
- })
-
- # 3. 初始化并开启 Loop
- # 提示:你需要在你的终端中配置好 ANTHROPIC_API_KEY 环境变量
- client = AsyncAnthropic()
- total_task_cost = 0.0
- task_errors = []
- sdk_trace_id = None # 用于记录 Anthropic SDK 的 response ID
-
- from examples.process_pipeline.script.validate_schema import validate_case, validate_blueprint, validate_capabilities, validate_strategy
- out_file = kwargs.get("output_file")
-
- max_retries = 3
- for attempt in range(max_retries):
- if attempt > 0:
- print(f"🔄 [Retry SDK {attempt}/{max_retries-1}] {task_name}")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
-
- print(f"🚀 [Launch Anthropic SDK] {task_name} (Attempt {attempt+1})")
-
- # Reset messages for retry
- messages_copy = list(messages)
-
- max_loops = 50
- for loop_idx in range(max_loops):
- try:
- # 去除前缀(兼容比如 openrouter 传入的名字)
- clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
-
- # 这里专门将实际请求映射到其可用的特殊别名 claude-sonnet-4-6
- target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model
- response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.2,
- system=system_prompt,
- messages=messages_copy,
- tools=anthropic_tools
- )
-
- # (简略预估,不代表真实官方开销)
- if hasattr(response, 'usage'):
- step_cost = (response.usage.input_tokens / 1e6 * 3.0) + (response.usage.output_tokens / 1e6 * 15.0)
- total_task_cost += step_cost
-
- # 加入助手回复
- assistant_content = []
- tool_uses = []
-
- for content_block in response.content:
- if content_block.type == "text":
- text_val = content_block.text
- if text_val:
- assistant_content.append({"type": "text", "text": text_val})
- print(f"\n🤖 [{task_name} Output]:\n{text_val}\n")
- elif content_block.type == "tool_use":
- assistant_content.append({
- "type": "tool_use",
- "id": content_block.id,
- "name": content_block.name,
- "input": content_block.input
- })
- tool_uses.append(content_block)
-
- if not assistant_content:
- assistant_content.append({"type": "text", "text": "(Thinking completed but no output)"})
-
- messages_copy.append({"role": "assistant", "content": assistant_content})
-
- # 出口:没有调用工具说明任务结束
- if not tool_uses:
- print(f"✅ [Done Anthropic SDK] {task_name} (Cost: ${total_task_cost:.4f})")
- break
-
- # 工具执行与回传
- tool_results = []
- for tu in tool_uses:
- if tu.name in ("write_file", "write_json"):
- print(f" 💾 [File Written by SDK] {task_name}")
-
- print(f" 🛠️ [Tool Exec Debug] name_is={tu.name}, input_is={tu.input}, type_is={type(tu.input)}")
- # 执行本地环境的函数
- result_str = await registry.execute(tu.name, tu.input)
-
- tool_results.append({
- "type": "tool_result",
- "tool_use_id": tu.id,
- "content": result_str
- })
-
- messages_copy.append({"role": "user", "content": tool_results})
-
- except Exception as e:
- err_msg = str(e)
- print(f"❌ [Fail SDK Core] {task_name}: {err_msg}")
- task_errors.append(err_msg)
- break
-
- # Verification & Recovery block for SDK (porting from AgentRunner)
- if out_file and not Path(out_file).exists():
- print(f"⚠️ [Recovery SDK] {task_name} missing output file. Triggering forced wrap-up continuation...")
- messages_copy.append({
- "role": "user",
- "content": f"【系统强制指令】你的任务阶段已完成分析,但尚未将最终结果写入目标文件。请立刻调用 write_json (或 write_file) 工具,将你的成果数据直接写入到绝对路径 `{out_file}`,务必立刻执行写入动作!"
- })
- try:
- target_model = "claude-sonnet-4-6" if "claude" in clean_model else clean_model
- rec_response = await client.messages.create(
- model=target_model,
- max_tokens=4096,
- temperature=0.1,
- system=system_prompt,
- messages=messages_copy,
- tools=anthropic_tools,
- tool_choice={"type": "any"}
- )
- for content_block in rec_response.content:
- if content_block.type == "tool_use":
- if content_block.name in ("write_file", "write_json"):
- print(f" 💾 [Recovery File Written by SDK] {task_name}")
- await registry.execute(content_block.name, content_block.input)
- except Exception as e:
- print(f"❌ [Fail SDK Recovery] {task_name}: {e}")
- task_errors.append(str(e))
-
- # Schema Validation
- if out_file and Path(out_file).exists() and str(out_file).endswith(".json"):
- if skip_validation:
- print(f" ⏭️ [Schema Validation Skipped] {Path(out_file).name} (Single-Step Mode)")
- return total_task_cost, task_errors, None
-
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- data = json.loads(f.read())
-
- filename = Path(out_file).name
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
-
- if err:
- raise ValueError(f"Schema Validation Failed: {err}")
-
- print(f" ✅ [Schema Validated] {Path(out_file).name}")
- return total_task_cost, task_errors # Success! Exit retry loop.
- except Exception as e:
- err_msg = f"Invalid JSON or Schema in {Path(out_file).name}: {e}"
- print(f"❌ [Validation Error] {task_name}: {err_msg}")
- task_errors.append(f"{task_name} Error: {e}")
-
- if attempt == max_retries - 1:
- print(f"❌ [Retry Limit] {task_name} exhausted retries.")
- return total_task_cost, task_errors
- else:
- print(f"❌ [Missing File] {task_name} did not produce output file after recovery.")
- if attempt == max_retries - 1:
- return total_task_cost, task_errors
-
- return total_task_cost, task_errors
- async def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json")
- STEP_NAMES = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding", "process-cluster", "process-score", "capability-enrich", "strategy"]
- parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube", help="Comma-separated list of platforms to search")
- parser.add_argument("--restart-mode", type=str, default="smart", help="Granular restart mode for cascading deletions")
- parser.add_argument("--only-step", type=str, choices=STEP_NAMES, help="Run only a single step")
- parser.add_argument("--phase", type=int, choices=[1, 2, 3], help="Run all steps in a phase (1=research~case-detailed, 2=process~capability, 3=strategy)")
- parser.add_argument("--start-from", type=str, choices=STEP_NAMES, help="Start from this step (inclusive)")
- parser.add_argument("--end-at", type=str, choices=STEP_NAMES, help="End at this step (inclusive)")
- parser.add_argument("--case-index", type=int, help="Rerun extraction for a specific case index (only for workflow-extract, capability-extract, apply-grounding)")
- parser.add_argument("--use-claude-sdk", action="store_true", help="Use Claude SDK (CLAUDE_CODE_KEY/URL) instead of OpenRouter")
- args = parser.parse_args()
- # ── 参数验证 ──
- # --case-index 只能与提取步骤一起使用
- if args.case_index is not None:
- extraction_steps = {"workflow-extract", "capability-extract", "apply-grounding"}
- if args.only_step and args.only_step not in extraction_steps:
- print(f"❌ Error: --case-index can only be used with extraction steps: {', '.join(extraction_steps)}")
- sys.exit(1)
- if not args.only_step:
- print("❌ Error: --case-index requires --only-step to specify which extraction step to run")
- sys.exit(1)
- # ── 三种模式互斥检查 ──
- mode_count = sum([
- args.only_step is not None,
- args.phase is not None,
- (args.start_from is not None or args.end_at is not None),
- ])
- if mode_count > 1:
- print("❌ Error: --only-step, --phase, and --start-from/--end-at are mutually exclusive.")
- sys.exit(1)
- # 定义步骤拓扑(非线性,Phase 2 有两条并行分支)
- STEP_ORDER = STEP_NAMES # 用于 phase 分组和前置检查
- def _step_in_range(step, start, end):
- """判断线性前缀中的 step 是否在 [start, end] 范围内"""
- all_steps = LINEAR_PREFIX + BRANCH_21 + BRANCH_22 + ["strategy"]
- if step not in all_steps or start not in all_steps:
- return False
- # 对于线性前缀,用索引比较
- if step in LINEAR_PREFIX:
- s_idx = LINEAR_PREFIX.index(step)
- start_idx = LINEAR_PREFIX.index(start) if start in LINEAR_PREFIX else -1
- # 如果 start 在 Phase 2 或 strategy 中,线性前缀不需要跑
- if start_idx < 0:
- return False
- return s_idx >= start_idx
- return False
- PHASE_MAP = {
- 1: {"research", "source", "case-detailed"},
- 2: {"process-cluster", "process-score", "capability-extract", "capability-enrich"},
- 3: {"strategy"},
- }
- BRANCH_21 = ["process-cluster", "process-score"]
- BRANCH_22 = ["capability-enrich"]
- # 线性前缀(Phase 2 之前)
- LINEAR_PREFIX = ["research", "source", "generate-case", "workflow-extract", "capability-extract", "apply-grounding"]
- # 计算需要执行的步骤集合
- active_steps = None # None = 全部执行
- if args.only_step:
- active_steps = {args.only_step}
- elif args.phase:
- active_steps = PHASE_MAP[args.phase]
- elif args.start_from or args.end_at:
- start = args.start_from or "research"
- end = args.end_at or "strategy"
- # 构建 active_steps,考虑并行分支
- active = set()
- # 1. 线性前缀部分
- for s in LINEAR_PREFIX:
- if _step_in_range(s, start, end):
- active.add(s)
- # 2. Phase 2 分支部分
- start_in_21 = start in BRANCH_21
- start_in_22 = start in BRANCH_22
- end_in_21 = end in BRANCH_21
- end_in_22 = end in BRANCH_22
- end_is_strategy = end == "strategy"
- # 如果 end 是 strategy 或覆盖了整个 Phase 2,两条分支都跑
- if end_is_strategy or (not end_in_21 and not end_in_22 and end not in LINEAR_PREFIX):
- # 两条分支都包含(如果 start 允许的话)
- if not start_in_21 and not start_in_22:
- # start 在 Phase 2 之前或就是 Phase 2 的开头
- active.update(BRANCH_21)
- active.update(BRANCH_22)
- elif start_in_21:
- # start 在 2.1 分支内,但 end 到了 strategy,所以 2.2 也要全跑
- idx = BRANCH_21.index(start)
- active.update(BRANCH_21[idx:])
- active.update(BRANCH_22)
- elif start_in_22:
- # start 在 2.2 分支内,但 end 到了 strategy,所以 2.1 也要全跑
- idx = BRANCH_22.index(start)
- active.update(BRANCH_22[idx:])
- active.update(BRANCH_21)
- elif end_in_21:
- # end 在 2.1 分支内,只跑 2.1
- end_idx = BRANCH_21.index(end)
- start_idx = BRANCH_21.index(start) if start_in_21 else 0
- active.update(BRANCH_21[start_idx:end_idx + 1])
- elif end_in_22:
- # end 在 2.2 分支内,只跑 2.2
- end_idx = BRANCH_22.index(end)
- start_idx = BRANCH_22.index(start) if start_in_22 else 0
- active.update(BRANCH_22[start_idx:end_idx + 1])
- # 3. strategy
- if end_is_strategy:
- active.add("strategy")
- active_steps = active
- def should_run(step_name: str) -> bool:
- if active_steps is None:
- return True
- return step_name in active_steps
- if active_steps is not None:
- active_list = [s for s in STEP_ORDER if s in active_steps]
- print(f"\n[Selective Mode] Steps: {' -> '.join(active_list)}")
- base_dir = Path(__file__).parent
- # Load requirements locally
- req_path = base_dir / "db_requirements.json"
-
- with open(req_path, encoding='utf-8') as f:
- reqs = json.load(f)
-
- if args.index < 0 or args.index >= len(reqs):
- print("Index out of bounds")
- sys.exit(1)
-
- requirement = reqs[args.index]
-
- # 0. Setup directories
- output_dir = base_dir / "output" / f"{(args.index+1):03d}"
- output_dir.mkdir(parents=True, exist_ok=True)
- raw_cases_dir = output_dir / "raw_cases"
- raw_cases_dir.mkdir(parents=True, exist_ok=True)
-
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
-
- print("=" * 60)
- print(f"V4 Hardcoded Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
- print("=" * 60)
- # ── 前置文件检查 ──
- # 每个 step 需要的前置文件(如果该 step 不在 active_steps 中,则需要预先存在)
- STEP_DEPS = {
- "research": [],
- "source": [("case_*.json", lambda: bool(list(raw_cases_dir.glob("case_*.json"))))],
- "generate-case": [("source.json", lambda: (raw_cases_dir / "source.json").exists())],
- "workflow-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
- "capability-extract": [("case.json", lambda: (output_dir / "case.json").exists())],
- "apply-grounding": [("case.json", lambda: (output_dir / "case.json").exists())],
- "process-cluster": [
- ("case.json", lambda: (output_dir / "case.json").exists()),
- ],
- "process-score": [("blueprint_temp.json", lambda: (output_dir / "blueprint_temp.json").exists())],
- "capability-enrich": [
- ("capabilities_temp.json", lambda: (output_dir / "capabilities_temp.json").exists()),
- ("case.json", lambda: (output_dir / "case.json").exists()),
- ],
- "strategy": [
- ("process.json", lambda: (output_dir / "process.json").exists()),
- ("capabilities.json", lambda: (output_dir / "capabilities.json").exists()),
- ],
- }
- # 每个 step 会生成的文件(用于判断上游 step 是否会在本次运行中生成依赖)
- STEP_PRODUCES = {
- "research": {"case_*.json"},
- "source": {"source.json"},
- "generate-case": {"case.json"},
- "workflow-extract": {"case.json"}, # 原地更新
- "capability-extract": {"case.json"}, # 原地更新
- "apply-grounding": {"case.json"}, # 原地更新
- "process-cluster": {"blueprint_temp.json"},
- "process-score": {"process.json"},
- "capability-enrich": {"capabilities.json"},
- "strategy": {"strategy.json"},
- }
- if active_steps is not None:
- # 计算本次运行会生成的文件集合
- will_produce = set()
- for s in STEP_ORDER:
- if s in active_steps:
- will_produce.update(STEP_PRODUCES.get(s, set()))
- # 检查每个 active step 的前置文件
- missing = []
- for s in STEP_ORDER:
- if s not in active_steps:
- continue
- for dep_name, dep_check in STEP_DEPS.get(s, []):
- if dep_name in will_produce:
- continue # 上游 step 会在本次运行中生成
- if not dep_check():
- missing.append((s, dep_name))
- if missing:
- print(f"\n❌ [Pre-flight Check] Missing prerequisite files:")
- for step, dep in missing:
- print(f" - Step '{step}' requires '{dep}'")
- print(f"\nRun upstream steps first, or use --start-from to include them.")
- sys.exit(1)
- else:
- print(f"✅ [Pre-flight Check] All prerequisites satisfied")
- # Load presets
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- print("✅ Configured Agent Presets (Skills Boundaries)")
- # Browser initialization removed to save resources
-
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- # Instantiate two distinct LLM orchestrators
- qwen_model = "qwen3.5-plus" # maps to qwen3.5-plus via Qwen interface
- # 根据 --use-claude-sdk 参数选择 LLM 提供商
- if args.use_claude_sdk:
- # 使用 Claude SDK (CLAUDE_CODE_KEY/URL 或 ANTHROPIC_API_KEY/BASE_URL)
- claude_model = "claude-sonnet-4-6"
- print(f"✅ Using Claude SDK with model: {claude_model}")
- # 所有环节走 Claude Agent SDK(OAuth / Max 订阅)
- from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call
- claude_llm_call = create_claude_code_oauth_llm_call(model=claude_model)
- workflow_llm_call = claude_llm_call
- print(f"✅ All Claude operations will use Claude Agent SDK (OAuth/Max subscription)")
- else:
- # 使用 OpenRouter 代理的 GPT-5.4(支持结构化输出 strict mode)
- claude_model = "openai/gpt-5.4"
- print(f"✅ Using OpenRouter with model: {claude_model}")
- from agent.llm.openrouter import create_openrouter_llm_call
- claude_llm_call = create_openrouter_llm_call(model=claude_model)
- workflow_llm_call = claude_llm_call # 没开 SDK 时与默认一致
-
- runner_qwen = AgentRunner(
- trace_store=store,
- llm_call=create_qwen_llm_call(model=qwen_model),
- skills_dir=SKILLS_DIR
- )
-
- runner_claude = AgentRunner(
- trace_store=store,
- llm_call=claude_llm_call,
- skills_dir=SKILLS_DIR
- )
- try:
- start_time = time.time()
- total_cost = 0.0
- costs_breakdown = {}
- global_errors = []
- strategy_file = None
- TARGET_QUALIFIED_CASES = 15
- # ── --only-step 单步执行模式 ──────────────────────────────
- if args.only_step:
- step = args.only_step
- source_file = raw_cases_dir / "source.json"
- detailed_file = raw_cases_dir / "case_detailed.json"
- blueprint_temp_file = output_dir / "blueprint_temp.json"
- capabilities_temp_file = output_dir / "capabilities_temp.json"
- process_file = output_dir / "process.json"
- capabilities_file = output_dir / "capabilities.json"
- print(f"\n[Single Step Mode] Running only: {step}")
- if step == "research":
- # Phase 1: 只跑调研,强制按 --platforms 重跑,不受已有 case 文件影响
- single_platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- if not single_platforms:
- print(" ❌ No platforms specified. Use --platforms xhs,zhihu,gzh,youtube")
- sys.exit(1)
- print(f" 🔍 Research platforms: {single_platforms}")
- single_is_single = args.restart_mode.startswith("single_")
- phase1_tasks = []
- for p in single_platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=80、正文充实)。"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(
- run_agent_task(runner_qwen, "researcher", kwargs, f"P1_Research_{p}", qwen_model, skip_validation=single_is_single)
- )
- phase1_results = await asyncio.gather(*phase1_tasks)
- for (task_cost, task_errors, trace_id), p in zip(phase1_results, single_platforms):
- print(f" ✓ {p}: cost=${task_cost:.4f}, errors={len(task_errors)}")
- elif step == "source":
- # Phase 1.5: 提取原始 source.json
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- result = extract_sources_to_json(raw_cases_dir)
- print(f" ✓ source.json: matched={result['total_matched']}, filtered={result['filtered_total']}")
- if result.get("filtered_reasons"):
- for reason, cnt in result["filtered_reasons"].items():
- print(f" - {reason}: {cnt}")
- elif step == "generate-case":
- # Phase 1.5.5: 生成标准化 case.json
- from examples.process_pipeline.script.generate_case import generate_case_from_source
- result = await generate_case_from_source(raw_cases_dir)
- print(f" ✓ case.json: cases={result['total_cases']}")
- elif step == "workflow-extract":
- # Phase 1.6a: 提取 workflow 到 case.json
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run --only-step generate-case first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- # 临时只保留目标 case
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.extract_workflow import extract_workflow
- result = await extract_workflow(
- case_file_to_use,
- workflow_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- # 更新原始文件中的对应 case
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink() # 删除临时文件
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + workflow: success={result['success']}, failed={result['failed']}")
- elif step == "capability-extract":
- # Phase 1.6b 已废弃:capability 现在由 workflow-extract 一并写入 case.json
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run --only-step generate-case first.")
- sys.exit(1)
- print(" ⏭️ capability-extract is integrated into workflow-extract; skipping old extractor.")
- elif step == "apply-grounding":
- # Phase 1.7: 将 apply_to_draft 映射为正式 apply_to
- case_file = output_dir / "case.json"
- if not case_file.exists():
- print(f" ❌ case.json not found. Run workflow-extract first.")
- sys.exit(1)
- # 如果指定了 --case-index,先过滤 case.json
- if args.case_index is not None:
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- original_cases = case_data.get("cases", [])
- target_case = next((c for c in original_cases if c.get("index") == args.case_index), None)
- if not target_case:
- print(f" ❌ Case with index {args.case_index} not found in case.json")
- sys.exit(1)
- case_data["cases"] = [target_case]
- temp_case_file = output_dir / f"case_temp_{args.case_index}.json"
- with open(temp_case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- print(f" [Target] Filtering to case index {args.case_index}: {target_case.get('title', 'untitled')[:30]}")
- case_file_to_use = temp_case_file
- else:
- case_file_to_use = case_file
- from examples.process_pipeline.script.apply_to_grounding import apply_grounding
- result = await apply_grounding(
- case_file_to_use,
- claude_llm_call, model=claude_model
- )
- # 如果使用了临时文件,需要合并回原始 case.json
- if args.case_index is not None:
- with open(case_file_to_use, "r", encoding="utf-8") as f:
- updated_data = json.load(f)
- updated_case = updated_data["cases"][0]
- with open(case_file, "r", encoding="utf-8") as f:
- original_data = json.load(f)
- for i, c in enumerate(original_data["cases"]):
- if c.get("index") == args.case_index:
- original_data["cases"][i] = updated_case
- break
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original_data, f, ensure_ascii=False, indent=2)
- temp_case_file.unlink()
- print(f" ✓ Merged case {args.case_index} back to case.json")
- print(f" ✓ case.json + apply_to: grounded={result['grounded']}/{result['total']}, cost=${result['total_cost']:.4f}")
- elif step == "process-cluster":
- # Phase 2.1.1: 工序聚类
- if not detailed_file.exists():
- print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
- sys.exit(1)
- from examples.process_pipeline.script.cluster_processes import cluster_processes
- result = await cluster_processes(
- source_file=source_file, detailed_file=detailed_file,
- output_file=blueprint_temp_file, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ blueprint_temp.json: clusters={result['clusters']}")
- elif step == "process-score":
- # Phase 2.1.2: 工序打分
- if not blueprint_temp_file.exists():
- print(f" ❌ blueprint_temp.json not found. Run --only-step process-cluster first.")
- sys.exit(1)
- from examples.process_pipeline.script.score_processes import score_blueprints
- result = await score_blueprints(
- blueprint_file=blueprint_temp_file, output_file=process_file,
- requirement=requirement, llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ process.json: scored={result['scored']}")
- elif step == "capability-extract":
- # Phase 2.2.1: 能力初步聚类
- if not detailed_file.exists():
- print(f" ❌ case_detailed.json not found. Run --only-step case-detailed first.")
- sys.exit(1)
- from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
- result = await extract_capabilities_workflow(
- detailed_file=detailed_file, source_file=source_file,
- output_file=capabilities_temp_file, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ capabilities_temp.json: capabilities={result['capabilities']}")
- elif step == "capability-enrich":
- # Phase 2.2.2: 能力丰富化
- if not capabilities_temp_file.exists():
- print(f" ❌ capabilities_temp.json not found. Run --only-step capability-extract first.")
- sys.exit(1)
- if not source_file.exists():
- print(f" ❌ source.json not found. Run --only-step source first.")
- sys.exit(1)
- from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
- result = await enrich_all_capabilities(
- capabilities_temp_file=capabilities_temp_file, source_file=source_file,
- output_file=capabilities_file, llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ capabilities.json: enriched={result['enriched']}/{result['total_capabilities']}")
- elif step == "strategy":
- # Phase 3: 策略组装
- if not process_file.exists():
- print(f" ❌ process.json not found. Run --only-step process-score first.")
- sys.exit(1)
- if not capabilities_file.exists():
- print(f" ❌ capabilities.json not found. Run --only-step capability-enrich first.")
- sys.exit(1)
- strategy_file_path = output_dir / "strategy.json"
- from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
- result = await assemble_strategy(
- process_file=process_file, capabilities_file=capabilities_file,
- output_file=strategy_file_path, requirement=requirement,
- llm_call=claude_llm_call, model=claude_model,
- )
- print(f" ✓ strategy.json: workflow_steps={result['workflow_steps']}")
- elapsed = time.time() - start_time
- print(f"\n[Single Step Done] {step} completed in {elapsed:.1f}s")
- return
- # ── 正常 pipeline 流程 ──────────────────────────────
- platforms = [p.strip() for p in args.platforms.split(",") if p.strip()]
- # Phase 0: Platform Selection (controlled by --platforms)
- if should_run("research"):
- print(f"\n--- Phase 0: Using specified platforms ---")
- print(f"📡 Will research platforms: {platforms}")
- # 注:不再跳过已有平台,因为 agent 是增量追加模式
- is_single_step = args.restart_mode.startswith("single_")
- # Phase 1 & 1.5: MAP (Parallel Search) and Source Extraction Loop
- if should_run("research") or should_run("source"):
- print(f"\n--- Phase 1: Distributed Research Map with Source Filter Loop ({qwen_model}) ---")
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- MAX_ROUNDS = 50
- platform_traces = {p: None for p in platforms}
- active_platforms = platforms.copy()
- phase1_trace_ids = {}
- if not should_run("research") and should_run("source"):
- try:
- src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=None)
- print(f"📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}")
- if src_stats.get("filtered_reasons"):
- for reason, cnt in src_stats["filtered_reasons"].items():
- print(f" - {reason}: {cnt}")
- except Exception as e:
- err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- elif should_run("research"):
- round_idx = 0
- last_src_stats = None # 保存上一轮的过滤统计
- last_platform_count = {} # 保存上一轮每个平台的合格数
- while active_platforms and round_idx < MAX_ROUNDS:
- print(f"\n >>> [Research Loop] Round {round_idx+1} - Active platforms: {active_platforms}")
- if active_platforms:
- phase1_tasks = []
- for p in active_platforms:
- task_desc = f"渠道:{p.upper()}。核心需求:{requirement}。目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=70、正文充实)。"
- out_file = str(raw_cases_dir / f"case_{p}.json")
- additional_msgs = None
- if round_idx > 0 and last_src_stats:
- # 构建带过滤详情的 feedback message
- p_count = last_platform_count.get(p, 0)
- # 筛选出该平台被过滤的条目
- p_filtered = [d for d in last_src_stats.get("filtered_details", []) if d.get("platform") == p]
- reason_summary = last_src_stats.get("filtered_reasons", {})
- feedback_lines = [
- f"【系统反馈】你在上一轮提取的有效案例数量未达标。",
- f"当前 {p.upper()} 合格案例:{p_count}/{TARGET_QUALIFIED_CASES}",
- f"过滤统计:{dict(reason_summary)}" if reason_summary else "",
- ]
- if p_filtered:
- feedback_lines.append(f"\n以下是你提交的被过滤掉的帖子(共{len(p_filtered)}条):")
- for item in p_filtered[:10]:
- feedback_lines.append(f" - [{item['case_id']}] {item['title']} → 原因: {item['filter_reason']}")
- if len(p_filtered) > 10:
- feedback_lines.append(f" ... 还有 {len(p_filtered) - 10} 条未列出")
- feedback_lines.append(
- f"\n请继续搜索并提取更多**全新的、不同的**高质量案例,**追加**写入到你一直在维护的文件中。"
- f"注意:不要重复之前已找过的案例!针对上述被过滤的原因,请确保新案例有详实的正文内容且评分准确。"
- )
- additional_msgs = [{
- "role": "user",
- "content": "\n".join(line for line in feedback_lines if line)
- }]
- kwargs = {
- "task": task_desc,
- "output_file": out_file
- }
- phase1_tasks.append(
- run_agent_task(
- runner_qwen, "researcher", kwargs,
- f"P1_Research_{p}_R{round_idx+1}", qwen_model,
- skip_validation=is_single_step,
- start_trace_id=platform_traces[p],
- additional_messages=additional_msgs
- )
- )
- phase1_results = await asyncio.gather(*phase1_tasks)
- for (task_cost, task_errors, trace_id), p in zip(phase1_results, active_platforms):
- total_cost += task_cost
- cost_key = f"P1_Research_{p}"
- costs_breakdown[cost_key] = costs_breakdown.get(cost_key, 0.0) + round(task_cost, 4)
- platform_traces[p] = trace_id
- phase1_trace_ids[f"P1_Research_{p}"] = trace_id
- global_errors.extend(task_errors)
- expected_file = Path(raw_cases_dir / f"case_{p}.json")
- if not expected_file.exists():
- err_msg = f"Missing case file for {p}! Agent likely hit max_iterations without saving."
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- if should_run("source"):
- try:
- trace_id_list = [tid for tid in phase1_trace_ids.values() if tid]
- src_stats = extract_sources_to_json(raw_cases_dir, trace_ids=trace_id_list)
- last_src_stats = src_stats
- print(f" 📎 [Source Extraction] matched={src_stats['total_matched']}, filtered={src_stats['filtered_total']} → {raw_cases_dir / 'source.json'}")
- if src_stats.get("filtered_reasons"):
- for reason, cnt in src_stats["filtered_reasons"].items():
- print(f" - {reason}: {cnt}")
- except Exception as e:
- err_msg = f"Source extraction failed: {type(e).__name__}: {e}"
- print(f" ⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # 判断是否达标:直接看 source.json 里每个平台的条目数
- source_file = raw_cases_dir / "source.json"
- if source_file.exists():
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- platform_count = {}
- for s in source_data.get("sources", []):
- p = s.get("platform")
- if p:
- platform_count[p] = platform_count.get(p, 0) + 1
- last_platform_count = platform_count
- print(f" 📊 [Source Count] Target: >={TARGET_QUALIFIED_CASES} qualified items per platform")
- platforms_to_keep = []
- for p in platforms:
- count = platform_count.get(p, 0)
- if p in active_platforms:
- print(f" - {p}: {count}/{TARGET_QUALIFIED_CASES}")
- if count < TARGET_QUALIFIED_CASES:
- platforms_to_keep.append(p)
- active_platforms = platforms_to_keep
- if not active_platforms:
- print(f" ✅ All platforms reached the target {TARGET_QUALIFIED_CASES} qualified items!")
- break
- else:
- print(f" ⚠️ source.json not found, continuing loop to retry...")
- round_idx += 1
- if round_idx >= MAX_ROUNDS and active_platforms:
- print(f" ⚠️ [Max Rounds] Reached {MAX_ROUNDS} rounds. Remaining platforms: {active_platforms}")
- else:
- print("\n⏭️ [Skip] Phase 1 & 1.5 Skipped. Using existing cases...")
- # Phase 1.3: Generate case.json from source.json
- if should_run("generate-case"):
- source_file = raw_cases_dir / "source.json"
- case_file = output_dir / "case.json"
- if source_file.exists():
- try:
- from examples.process_pipeline.script.generate_case import generate_case
- print(f"\n--- Phase 1.3: Generate case.json ---")
- case_stats = await generate_case(source_file, case_file)
- print(
- f"📦 [Case Generation] "
- f"generated {case_stats.get('total', 0)} cases "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Case generation failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 1.6: Extract workflow and capability together → case.json
- if should_run("workflow-extract"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.extract_workflow import extract_workflow
- print(f"\n--- Phase 1.6a: Workflow Extraction ({claude_model}) ---")
- workflow_stats = await extract_workflow(
- case_file,
- workflow_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += workflow_stats.get("total_cost", 0.0)
- costs_breakdown["P1.6a_WorkflowExtraction"] = round(workflow_stats.get("total_cost", 0.0), 4)
- print(
- f"🔍 [Workflow Extraction] "
- f"success={workflow_stats['success']} "
- f"failed={workflow_stats['failed']}"
- )
- except Exception as e:
- err_msg = f"Workflow extraction failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- if should_run("capability-extract"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- print("\n--- Phase 1.6b: Capability Extraction ---")
- print("🧩 [Capability Extraction] integrated into workflow-extract; skipping old extractor.")
- # Phase 1.7: Apply grounding (map apply_to_draft to apply_to)
- if should_run("apply-grounding"):
- case_file = output_dir / "case.json"
- if case_file.exists():
- try:
- from examples.process_pipeline.script.apply_to_grounding import apply_grounding
- print(f"\n--- Phase 1.7: Apply Grounding ({claude_model}) ---")
- grounding_stats = await apply_grounding(
- case_file,
- claude_llm_call,
- model=claude_model,
- max_concurrent=3
- )
- total_cost += grounding_stats.get("total_cost", 0.0)
- costs_breakdown["P1.7_ApplyGrounding"] = round(grounding_stats.get("total_cost", 0.0), 4)
- print(
- f"🗺️ [Apply Grounding] "
- f"grounded={grounding_stats['grounded']}/{grounding_stats['total']} "
- f"→ {case_file}"
- )
- except Exception as e:
- err_msg = f"Apply grounding failed: {type(e).__name__}: {e}"
- print(f"⚠️ [Warning] {err_msg}")
- global_errors.append(err_msg)
- # Phase 2: Parallel Workflow (Process + Capabilities) uses Claude
- if any(should_run(s) for s in ["process-cluster", "process-score", "capability-enrich", "strategy"]):
- print(f"\n--- Phase 2: Parallel Workflow ({claude_model}) ---")
- # 输出文件
- process_file = str(output_dir / "process.json")
- capabilities_file = str(output_dir / "capabilities.json")
- # 中间文件
- blueprint_temp_file = str(output_dir / "blueprint_temp.json")
- capabilities_temp_file = str(output_dir / "capabilities_temp.json")
- # 优先使用结构化数据:source.json + case_detailed.json
- detailed_file = raw_cases_dir / "case_detailed.json"
- source_file = raw_cases_dir / "source.json"
- if detailed_file.exists():
- input_files_glob = str(raw_cases_dir / "{source,case_detailed}.json").replace("\\", "/")
- print(f" Using structured data: source.json + case_detailed.json")
- else:
- input_files_glob = str(raw_cases_dir / "case_*.json").replace("\\", "/")
- print(f" Fallback to raw cases: case_*.json")
- force_strategy_rerun = False
- force_active = active_steps is not None
- # ── Step 1: 并行执行 2.1.1 (cluster_processes) 和 2.2.1 (extract_capabilities) ──
- async def run_cluster_processes():
- """2.1.1: 工序聚类 → blueprint_temp.json"""
- if Path(blueprint_temp_file).exists() and not force_active:
- print(f" [2.1.1] ⏭️ blueprint_temp.json exists, skipping")
- return 0.0
- print(f" [2.1.1] Clustering processes...")
- try:
- from examples.process_pipeline.script.cluster_processes import cluster_processes
- result = await cluster_processes(
- source_file=source_file,
- detailed_file=detailed_file,
- output_file=Path(blueprint_temp_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.1.1] ✓ Distilled {result.get('distilled_cases', 0)} cases, "
- f"generated {result.get('blueprints', 0)} blueprints")
- return result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.1.1 ClusterProcesses failed: {type(e).__name__}: {e}"
- print(f" [2.1.1] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- async def run_extract_capabilities():
- """2.2.1: 能力初步聚类 → capabilities_temp.json"""
- if Path(capabilities_temp_file).exists() and not force_active:
- print(f" [2.2.1] ⏭️ capabilities_temp.json exists, skipping")
- return 0.0
- print(f" [2.2.1] Extracting capabilities...")
- try:
- from examples.process_pipeline.script.extract_capabilities_workflow import extract_capabilities_workflow
- result = await extract_capabilities_workflow(
- detailed_file=detailed_file,
- source_file=source_file,
- output_file=Path(capabilities_temp_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.2.1] ✓ Extracted {result.get('capabilities', 0)} capabilities")
- return result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.2.1 ExtractCapabilities failed: {type(e).__name__}: {e}"
- print(f" [2.2.1] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- if not Path(blueprint_temp_file).exists() or not Path(capabilities_temp_file).exists():
- force_strategy_rerun = True
- step1_costs = await asyncio.gather(run_cluster_processes(), run_extract_capabilities())
- for cost, name in zip(step1_costs, ["P2.1.1_ClusterProcesses", "P2.2.1_ExtractCapabilities"]):
- total_cost += cost
- costs_breakdown[name] = round(cost, 4)
- # ── Step 2: 并行执行 2.1.2 和 2.2.2 ──────────────────────────────
- async def run_score_processes():
- """2.1.2: 工序匹配度打分"""
- if Path(process_file).exists() and not force_active:
- print(f" [2.1.2] ⏭️ process.json exists, skipping")
- return 0.0
- if not Path(blueprint_temp_file).exists():
- print(f" [2.1.2] ⚠️ blueprint_temp.json not found, skipping")
- return 0.0
- print(f" [2.1.2] Scoring processes...")
- try:
- from examples.process_pipeline.script.score_processes import score_blueprints
- score_result = await score_blueprints(
- blueprint_file=Path(blueprint_temp_file),
- output_file=Path(process_file),
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.1.2] ✓ Scored {score_result.get('scored', 0)} blueprints")
- return score_result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.1.2 ScoreProcesses failed: {e}"
- print(f" [2.1.2] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- async def run_enrich_capabilities():
- """2.2.2: 能力丰富化"""
- if Path(capabilities_file).exists() and not force_active:
- print(f" [2.2.2] ⏭️ capabilities.json exists, skipping")
- return 0.0
- if not Path(capabilities_temp_file).exists():
- print(f" [2.2.2] ⚠️ capabilities_temp.json not found, skipping")
- return 0.0
- if not source_file.exists():
- print(f" [2.2.2] ⚠️ source.json not found, skipping")
- return 0.0
- print(f" [2.2.2] Enriching capabilities...")
- try:
- from examples.process_pipeline.script.enrich_capabilities import enrich_all_capabilities
- enrich_result = await enrich_all_capabilities(
- capabilities_temp_file=Path(capabilities_temp_file),
- source_file=source_file,
- output_file=Path(capabilities_file),
- llm_call=claude_llm_call,
- model=claude_model,
- )
- print(f" [2.2.2] ✓ Enriched {enrich_result.get('enriched', 0)} capabilities")
- return enrich_result.get("total_cost", 0.0)
- except Exception as e:
- err_msg = f"P2.2.2 EnrichCapabilities failed: {e}"
- print(f" [2.2.2] ⚠️ {err_msg}")
- global_errors.append(err_msg)
- return 0.0
- # 并行执行 Step 2
- step2_costs = await asyncio.gather(run_score_processes(), run_enrich_capabilities())
- for cost, name in zip(step2_costs, ["P2.1.2_ScoreProcesses", "P2.2.2_EnrichCaps"]):
- total_cost += cost
- costs_breakdown[name] = round(cost, 4)
- # Phase 3: REDUCE 2 (Final Assembly) uses Claude
- print(f"\n--- Phase 3: Final Strategy Assembly ({claude_model}) ---")
- strategy_file_path = output_dir / "strategy.json"
- if args.restart_mode == "single":
- force_strategy_rerun = False
- if strategy_file_path.exists() and not force_strategy_rerun and not force_active:
- print(f"⏭️ [Skip P3] strategy.json already exists. Skipping P3_Assembler.")
- else:
- if strategy_file_path.exists():
- print(f"⚠️ [Force P3] Upstream dependencies were regenerated. Forcing re-run of P3_Assembler...")
- print(" > Using [Workflow Core]")
- from examples.process_pipeline.script.assemble_strategy_workflow import assemble_strategy
- try:
- phase3_result = await assemble_strategy(
- process_file=Path(process_file),
- capabilities_file=Path(capabilities_file),
- output_file=strategy_file_path,
- requirement=requirement,
- llm_call=claude_llm_call,
- model=claude_model,
- )
- phase3_cost = phase3_result.get("total_cost", 0.0)
- print(f" ✓ Generated workflow with {phase3_result.get('workflow_steps', 0)} steps")
- except Exception as e:
- err_msg = f"P3_AssembleStrategy failed: {type(e).__name__}: {e}"
- print(f" ⚠️ {err_msg}")
- global_errors.append(err_msg)
- phase3_cost = 0.0
- total_cost += phase3_cost
- costs_breakdown["P3_Assembler"] = round(phase3_cost, 4)
- else:
- print("\n--- [Research Only] Stopping early. Skipping Phase 2 and Phase 3 ---")
-
- end_time = time.time()
- elapsed_sec = end_time - start_time
-
- # Save Metrics
- metrics_file = base_dir / "run_metrics.json"
- metrics_data = []
- if metrics_file.exists():
- with open(metrics_file, "r", encoding="utf-8") as f:
- try:
- metrics_data = json.load(f)
- except json.JSONDecodeError:
- pass
-
- # Collect trace_ids from all phases
- trace_ids = {}
- if 'phase1_trace_ids' in dir():
- trace_ids.update(phase1_trace_ids)
- metrics_data.append({
- "index": args.index,
- "requirement": requirement[:80] + "...",
- "duration_seconds": round(elapsed_sec, 2),
- "total_cost_usd": round(total_cost, 4),
- "costs_breakdown": costs_breakdown,
- "trace_ids": trace_ids,
- "errors": global_errors,
- "timestamp": datetime.now().isoformat()
- })
-
- with open(metrics_file, "w", encoding="utf-8") as f:
- json.dump(metrics_data, f, indent=2, ensure_ascii=False)
-
- print(f"\n📊 [Metrics] Pipeline completed in {elapsed_sec:.1f}s. Total Cost: ${total_cost:.4f}")
-
- finally:
- pass
-
- print("✅ Pipeline run finished.")
- if strategy_file:
- print("✅ Strategy saved to:", strategy_file)
- if __name__ == "__main__":
- asyncio.run(main())
|