""" Pipeline: Research → Source → Generate-Case → Decode-Workflow [→ Apply-Grounding] ================================================================================ CLI 速查 ──────────────────────────────────────────────────────────────────────────────── 必填: --index N 需求索引 (0-based)。输出目录 output/{(index+1):03d}/。 步骤拓扑(线性 5 步): research → source → generate-case → decode-workflow → apply-grounding ↑──────┘ (phase-1 内部 research⇄source 循环,对 CLI 透明) 默认行为: 跑 research → source → generate-case → decode-workflow(4 步)。 apply-grounding 仅手工触发(--only-step apply-grounding 或显式 --end-at)。 模式选择(互斥): 默认 跑 research..decode-workflow --only-step STEP 只跑单步 --start-from / --end-at 区间跑(含两端),任填一端 可选参数: --case-index N 仅 decode-workflow / apply-grounding 支持 --platforms xhs,zhihu,gzh,youtube,douyin,sph research 阶段平台过滤 --skip-existing 仅在某 case 还没生成 decode 输出时才跑(增量模式)。 默认行为是全覆盖:每次跑都把所有 case 重新生成。 仅对 decode-workflow 批量模式生效;单 case 模式本身就总是重跑。 --use-claude-sdk apply-grounding 走 Anthropic 官方 SDK --model {claude,gpt,gemini} apply-grounding 走 OpenRouter 后端 注:--use-claude-sdk 与 --model 互斥,且只在 active_steps 含 apply-grounding 时生效 典型用法: # 默认跑完前 4 步 python run_pipeline.py --index 107 # 只重跑某 case 的 decode-workflow python run_pipeline.py --index 107 --only-step decode-workflow --case-index 3 # 单独跑 apply-grounding(默认全 case) python run_pipeline.py --index 107 --only-step apply-grounding # 只采集 xhs / zhihu python run_pipeline.py --index 107 --platforms xhs,zhihu """ import argparse import asyncio import json import os import sys import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent os.chdir(PROJECT_ROOT) sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner, RunConfig from agent.tools.builtin.knowledge import KnowledgeConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.utils import setup_logging from examples.process_research.config import ( TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE, ) from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file from examples.process_pipeline.script.extract_sources import extract_sources_to_json from examples.process_pipeline.script.llm_evaluate_sources import ( evaluate_sources_with_llm, build_eval_llm_call, EVAL_MODELS, DEFAULT_EVAL_MODEL, ) from examples.process_pipeline.script.generate_case import generate_case_from_source from examples.process_pipeline.script.extract_decode_workflow import extract_decode_workflow from examples.process_pipeline.script.apply_to_grounding_agent import apply_grounding from examples.process_pipeline.script.validate_schema import validate_case from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse # ──── Topology / Constants ──────────────────────────────────────────────────── STEPS: List[str] = [ "research", "source", "generate-case", "decode-workflow", "apply-grounding", ] DEFAULT_END = "decode-workflow" # apply-grounding 不进默认 pipeline CASE_INDEX_STEPS = {"decode-workflow", "apply-grounding"} LLM_CONFIGURABLE_STEPS = {"apply-grounding"} TARGET_QUALIFIED_CASES = 15 MAX_RESEARCH_ROUNDS = 50 QWEN_MODEL = "qwen3.5-plus" # source 阶段 LLM rubric 评估模型:可通过 --eval-model 切换(qwen/sonnet/gemini/gpt)。 # 第一步默认用 qwen。具体后端映射见 llm_evaluate_sources.EVAL_MODELS。 # 当前只标注不淘汰:LLM 评估写进每条帖的 llm_evaluation,但不据 decision 剔除。 # rubric 阈值/权重待标定,先采集评分数据;标定后改为 True 即启用质量门槛。 EVAL_APPLY_DECISION = False # ──── Logging Tee ───────────────────────────────────────────────────────────── class _Tee: def __init__(self, *streams): self.streams = streams def write(self, s): for st in self.streams: try: st.write(s) except Exception: pass self.flush() def flush(self): for st in self.streams: try: st.flush() except Exception: pass def isatty(self): return False # ──── Pipeline Context ──────────────────────────────────────────────────────── @dataclass class PipelineContext: args: argparse.Namespace requirement: str output_dir: Path raw_cases_dir: Path base_dir: Path runner_qwen: AgentRunner active_steps: Set[str] costs_breakdown: Dict[str, float] = field(default_factory=dict) errors: List[str] = field(default_factory=list) total_cost: float = 0.0 phase1_trace_ids: Dict[str, Optional[str]] = field(default_factory=dict) def track(self, name: str, cost: float) -> None: self.total_cost += cost self.costs_breakdown[name] = round(self.costs_breakdown.get(name, 0.0) + cost, 4) def error(self, msg: str) -> None: print(f"⚠️ [Error] {msg}") self.errors.append(msg) # ──── researcher runner (the only LLM-driven step left) ────────────────────── async def run_agent_task( runner: AgentRunner, prompt_name: str, kwargs: dict, task_name: str, model_name: str, *, start_trace_id: Optional[str] = None, additional_messages: Optional[list] = None, ): """ 跑 prompt → agent loop,监听 write_file/write_json → 立即 schema 校验 + auto-fix。 失败时最多 3 次:能续 trace 就续 trace(带错误反馈),否则重新启动。 返回 (cost, errors, trace_id)。 """ prompt_path = Path(__file__).parent / "prompts" / f"{prompt_name}.prompt" prompt = SimplePrompt(prompt_path) base_messages = prompt.build_messages(**kwargs) if additional_messages: base_messages = list(base_messages) + list(additional_messages) out_file = kwargs.get("output_file") knowledge = KnowledgeConfig( enable_completion_extraction=False, enable_extraction=False, enable_injection=False, ) def _instant_validate() -> Optional[str]: """文件写入后立即校验,必要时自动修复 JSON 语法。返回错误描述或 None。""" if not out_file or not Path(out_file).exists(): return None try: with open(out_file, "r", encoding="utf-8") as f: raw = f.read() try: data = json.loads(raw) except json.JSONDecodeError: ok, data, desc = try_fix_and_parse(raw) if not ok: return "JSON parse failed, auto-fix unsuccessful" with open(out_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f" 🔧 [Auto-Fix] {desc}") filename = Path(out_file).name if filename.startswith("case_"): err = validate_case(data) if err: print(f" ⚠️ [Validation] {err}") return err print(f" ✅ [Validation] {filename} OK") return None except Exception as e: return str(e) async def _run_attempt(messages: list, attempt_name: str, trace_id: Optional[str], temperature: float): cost = 0.0 errs: List[str] = [] last_tid = trace_id cfg = RunConfig( model=prompt.config.get("model") or model_name, temperature=temperature, name=attempt_name, agent_type=prompt_name, tool_groups=["core", "content"] if prompt_name == "researcher" else ["core"], trace_id=trace_id, knowledge=knowledge, ) try: async for item in runner.run(messages=messages, config=cfg): if isinstance(item, Trace): last_tid = item.trace_id if item.status == "completed": cost += item.total_cost elif item.status == "failed": errs.append(f"{attempt_name} failed: {item.error_message}") elif isinstance(item, Message) and item.role == "tool": content = item.content if isinstance(item.content, dict) else {} if content.get("tool_name") in ("write_file", "write_json"): print(f" 💾 [Write] {task_name}") _instant_validate() except Exception as e: errs.append(f"{attempt_name} crashed: {type(e).__name__}: {e}") print(f"❌ [Exception] {attempt_name}: {e}") return cost, errs, last_tid total_cost = 0.0 total_errors: List[str] = [] last_trace_id = start_trace_id last_validation_error: Optional[str] = None default_temp = prompt.config.get("temperature") or 0.3 for attempt in range(3): if attempt == 0: print(f"🚀 [Launch] {task_name}") cost, errs, last_trace_id = await _run_attempt( base_messages, f"{task_name}_A0", last_trace_id, default_temp, ) elif last_trace_id and last_validation_error: # 接着上次 trace 跑,告诉它哪里错了 print(f"🔄 [Continue {attempt}/2] {task_name}") fix_msg = [{ "role": "user", "content": ( f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n" f"错误:{last_validation_error}\n\n" f"请重新读取该文件,根据错误修复后再次 write_json 到 `{out_file}`。" f"只改有问题的部分,不要丢弃已正确的内容。" ), }] cost, errs, last_trace_id = await _run_attempt( fix_msg, f"{task_name}_Fix{attempt}", last_trace_id, 0.1, ) else: # 没 trace 可续,从头来 print(f"🔄 [Retry {attempt}/2] {task_name}") if out_file and Path(out_file).exists(): Path(out_file).unlink() cost, errs, last_trace_id = await _run_attempt( base_messages, f"{task_name}_A{attempt}", None, default_temp, ) total_cost += cost total_errors.extend(errs) # Recovery:output 文件没写出来时强制让 agent 写一个 if out_file and not Path(out_file).exists() and last_trace_id: print(f"⚠️ [Recovery] {task_name} missing output, forcing wrap-up") rec_msg = [{ "role": "user", "content": ( f"【系统强制指令】任务终止但未写入文件。请立刻调用 write_json," f"将已搜集到的结构化内容作为 json_data 写入到 `{out_file}`。" f"即使空也要写入。" ), }] r_cost, r_errs, last_trace_id = await _run_attempt( rec_msg, f"{task_name}_Rec", last_trace_id, 0.1, ) total_cost += r_cost total_errors.extend(r_errs) # 最终校验 if not out_file or not str(out_file).endswith(".json"): return total_cost, total_errors, last_trace_id if not Path(out_file).exists(): print(f"❌ [Missing] {task_name}: no output file after recovery") last_validation_error = None continue err = _instant_validate() if err is None: return total_cost, total_errors, last_trace_id last_validation_error = err total_errors.append(f"{task_name} validation: {err}") print(f"❌ [Retry Limit] {task_name} exhausted retries") return total_cost, total_errors, last_trace_id # ──── Step 1+2: Research ⇄ Source ───────────────────────────────────────────── def _build_research_feedback( platform: str, last_platform_count: Dict[str, int], last_src_stats: Dict[str, Any], ) -> List[Dict[str, str]]: p_count = last_platform_count.get(platform, 0) p_filtered = [ d for d in last_src_stats.get("filtered_details", []) if d.get("platform") == platform ] reason_summary = last_src_stats.get("filtered_reasons", {}) lines = [ f"【系统反馈】你在上一轮提取的有效案例数量未达标。", f"当前 {platform.upper()} 合格案例:{p_count}/{TARGET_QUALIFIED_CASES}", ] if reason_summary: lines.append(f"过滤统计:{dict(reason_summary)}") if p_filtered: lines.append(f"\n以下是你提交的被过滤掉的帖子(共 {len(p_filtered)} 条):") for item in p_filtered[:10]: lines.append( f" - [{item['case_id']}] {item['title']} → 原因: {item['filter_reason']}" ) if len(p_filtered) > 10: lines.append(f" ... 还有 {len(p_filtered) - 10} 条未列出") lines.append( "\n请继续搜索并提取更多**全新的、不同的**高质量案例,**追加**写入到原文件。" "不要重复之前已找过的案例!针对过滤原因,确保正文详实、点名工具与参数、重在教做法。" ) return [{"role": "user", "content": "\n".join(lines)}] async def _run_research_round( ctx: PipelineContext, active_platforms: List[str], round_idx: int, platform_traces: Dict[str, Optional[str]], last_src_stats: Optional[Dict[str, Any]], last_platform_count: Dict[str, int], ) -> None: """跑一轮 research,多平台并行,结果存进 platform_traces。""" tasks = [] for p in active_platforms: task_desc = ( f"渠道:{p.upper()}。核心需求:{ctx.requirement}。" f"目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例" f"(正文详实、点名所用工具+参数+prompt、有展示结果,重在『教做法』而非只炫成品)。" ) out_file = str(ctx.raw_cases_dir / f"case_{p}.json") kwargs = {"task": task_desc, "output_file": out_file} additional_msgs = None if round_idx > 0 and last_src_stats: additional_msgs = _build_research_feedback(p, last_platform_count, last_src_stats) tasks.append(run_agent_task( ctx.runner_qwen, "researcher", kwargs, f"P1_Research_{p}_R{round_idx+1}", QWEN_MODEL, start_trace_id=platform_traces[p], additional_messages=additional_msgs, )) results = await asyncio.gather(*tasks) for (cost, errs, tid), p in zip(results, active_platforms): ctx.track(f"P1_Research_{p}", cost) platform_traces[p] = tid ctx.phase1_trace_ids[f"P1_Research_{p}"] = tid ctx.errors.extend(errs) if not (ctx.raw_cases_dir / f"case_{p}.json").exists(): ctx.error(f"Missing case file for {p}; agent likely hit max_iterations") def _extract_sources(ctx: PipelineContext, trace_ids: Optional[List[str]]) -> Optional[Dict[str, Any]]: try: stats = extract_sources_to_json(ctx.raw_cases_dir, trace_ids=trace_ids) print( f"📎 [Source] matched={stats['total_matched']}, " f"filtered={stats['filtered_total']} → {ctx.raw_cases_dir / 'source.json'}" ) for reason, cnt in stats.get("filtered_reasons", {}).items(): print(f" - {reason}: {cnt}") return stats except Exception as e: ctx.error(f"Source extraction failed: {type(e).__name__}: {e}") return None async def run_evaluate_sources(ctx: PipelineContext) -> Optional[Dict[str, Any]]: """source 子环节:对 source.json 已匹配帖子做 LLM rubric 评估。 在 extract_sources 的廉价预筛之后跑,把 rubric 评估结果写进每条帖的 llm_evaluation。 当前 EVAL_APPLY_DECISION=False(只标注不淘汰):所有帖子保留,仅统计 would_discard; 阈值标定后置 True 即由 LLM decision 把关,discard 帖移入 filtered_cases.json。 """ source_file = ctx.raw_cases_dir / "source.json" if not source_file.exists(): return None llm_call, model_id = build_eval_llm_call(ctx.args.eval_model) try: stats = await evaluate_sources_with_llm( source_file=source_file, llm_call=llm_call, model=model_id, requirement=ctx.requirement, max_concurrent=3, apply_decision=EVAL_APPLY_DECISION, ) except Exception as e: ctx.error(f"LLM source evaluation failed: {type(e).__name__}: {e}") return None ctx.track("LLM_Eval_Sources", stats.get("total_cost", 0.0)) mode = "enforce" if EVAL_APPLY_DECISION else "annotate-only" print( f"🧠 [LLM-Eval/{ctx.args.eval_model}→{model_id}/{mode}] evaluated={stats['evaluated']} " f"report={stats['reported']} would_discard={stats.get('would_discard', 0)} " f"discarded={stats['discarded']} skipped={stats['skipped']} " f"cost=${stats.get('total_cost', 0.0):.4f}" ) return stats def _merge_eval_into_feedback( src_stats: Optional[Dict[str, Any]], eval_stats: Optional[Dict[str, Any]], ) -> None: """把 LLM 拒绝的帖子并入 source 统计,让下一轮研究反馈能告知 agent 为何被拒。""" if not src_stats or not eval_stats: return discards = eval_stats.get("llm_discard_details", []) if discards: src_stats.setdefault("filtered_details", []).extend(discards) n = eval_stats.get("discarded", 0) if n: reasons = src_stats.setdefault("filtered_reasons", {}) reasons["llm_discard"] = reasons.get("llm_discard", 0) + n async def run_research_source_loop(ctx: PipelineContext) -> None: """完整 phase-1:research ⇄ source 循环,每轮按平台合格数判断停止。""" platforms = [p.strip() for p in ctx.args.platforms.split(",") if p.strip()] print(f"\n--- Phase 1: Research ⇄ Source loop ({QWEN_MODEL}) ---") print(f"📡 Platforms: {platforms}") platform_traces: Dict[str, Optional[str]] = {p: None for p in platforms} active_platforms = list(platforms) last_src_stats: Optional[Dict[str, Any]] = None last_platform_count: Dict[str, int] = {} round_idx = 0 while active_platforms and round_idx < MAX_RESEARCH_ROUNDS: print(f"\n >>> [Round {round_idx+1}] Active: {active_platforms}") await _run_research_round( ctx, active_platforms, round_idx, platform_traces, last_src_stats, last_platform_count, ) trace_id_list = [tid for tid in ctx.phase1_trace_ids.values() if tid] last_src_stats = _extract_sources(ctx, trace_id_list) # source 子环节:LLM rubric 评估,把结果标注到每条帖的 llm_evaluation。 # 当前 EVAL_APPLY_DECISION=False(只标注不淘汰),不改变 source.json 条数, # 故下面按平台计数与评估前一致;将来开启淘汰后,计数会反映 LLM 通过的帖子。 # 放在计数之前,是为开启淘汰模式时计数能立即反映剔除结果。 eval_stats = await run_evaluate_sources(ctx) _merge_eval_into_feedback(last_src_stats, eval_stats) source_file = ctx.raw_cases_dir / "source.json" if not source_file.exists(): print(" ⚠️ source.json not found, continuing loop") round_idx += 1 continue with open(source_file, "r", encoding="utf-8") as f: source_data = json.load(f) platform_count: Dict[str, int] = {} for s in source_data.get("sources", []): p = s.get("platform") if p: platform_count[p] = platform_count.get(p, 0) + 1 last_platform_count = platform_count print(f" 📊 Target: >={TARGET_QUALIFIED_CASES}/platform") next_active = [] for p in platforms: count = platform_count.get(p, 0) if p in active_platforms: print(f" - {p}: {count}/{TARGET_QUALIFIED_CASES}") if count < TARGET_QUALIFIED_CASES: next_active.append(p) active_platforms = next_active if not active_platforms: print(f" ✅ All platforms reached target {TARGET_QUALIFIED_CASES}") break round_idx += 1 if round_idx >= MAX_RESEARCH_ROUNDS and active_platforms: print(f" ⚠️ Max {MAX_RESEARCH_ROUNDS} rounds reached. Remaining: {active_platforms}") async def run_research_only(ctx: PipelineContext) -> None: """单步 research:跑一轮,所有 --platforms 都跑,不接 source 校验循环。""" platforms = [p.strip() for p in ctx.args.platforms.split(",") if p.strip()] if not platforms: print(" ❌ No platforms specified") sys.exit(1) print(f"\n--- Single Step: Research ({QWEN_MODEL}) ---") print(f"📡 Platforms: {platforms}") platform_traces: Dict[str, Optional[str]] = {p: None for p in platforms} await _run_research_round(ctx, platforms, 0, platform_traces, None, {}) async def run_source_only(ctx: PipelineContext) -> None: """单步 source:从已有 case_*.json 提取 source.json,再跑 LLM rubric 评估。""" print(f"\n--- Single Step: Source Extraction ---") _extract_sources(ctx, trace_ids=None) await run_evaluate_sources(ctx) # ──── Step 3: generate-case ────────────────────────────────────────────────── async def run_generate_case(ctx: PipelineContext) -> None: print(f"\n--- Phase 1.3: Generate case.json ---") source_file = ctx.raw_cases_dir / "source.json" if not source_file.exists(): ctx.error("source.json not found; run research/source first") return try: result = await generate_case_from_source(ctx.raw_cases_dir) print(f"📦 [Generate Case] cases={result['total_cases']} → {result['output_file']}") except Exception as e: ctx.error(f"Generate case failed: {type(e).__name__}: {e}") # ──── Step 4: decode-workflow ──────────────────────────────────────────────── async def run_decode_workflow(ctx: PipelineContext) -> None: print(f"\n--- Phase 2: Decode Workflow (Gemini + LangChain) ---") case_file = ctx.output_dir / "case.json" if not case_file.exists(): ctx.error("case.json not found; run generate-case first") return try: result = await extract_decode_workflow( case_file=case_file, case_index=ctx.args.case_index, # None = all cases skip_existing=ctx.args.skip_existing, ) ctx.track("Decode_Workflow", result.get("total_cost", 0.0)) print( f" ✓ decode-workflow: succeeded={result['succeeded']}/{result['total']} " f"skipped={result['skipped']} failed={result['failed']} " f"merged={result.get('merged', 0)} cost=${result['total_cost']}" ) print(f" output_dir: {result['output_dir']}") except ImportError as e: ctx.error( f"decode-workflow 依赖未安装: {e}; " f"需要 pip install langchain langchain-google-genai 以及 .env GOOGLE_API_KEY" ) except Exception as e: ctx.error(f"Decode workflow failed: {type(e).__name__}: {e}") # ──── Step 5: apply-grounding ──────────────────────────────────────────────── def _build_main_llm_call(args: argparse.Namespace) -> tuple: """根据 --use-claude-sdk / --model 创建 grounding 用的 llm_call。""" if args.use_claude_sdk: model = "claude-sonnet-4-6" from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call print(f"✅ apply-grounding via Claude Agent SDK (OAuth): {model}") return create_claude_code_oauth_llm_call(model=model), model model_map = { "claude": "claude-sonnet-4-6", "gpt": "gpt-5.4", "gemini": "~google/gemini-pro-latest", } model = model_map.get(args.model, "claude-sonnet-4-6") from agent.llm.openrouter import create_openrouter_llm_call print(f"✅ apply-grounding via OpenRouter: {model}") return create_openrouter_llm_call(model=model), model def _filter_case_to_single(case_file: Path, case_index: int) -> Path: """挑出指定 case 写入临时文件,返回临时文件路径。""" with open(case_file, "r", encoding="utf-8") as f: data = json.load(f) cases = data.get("cases", []) target = next((c for c in cases if c.get("index") == case_index), None) if not target: raise ValueError(f"Case with index {case_index} not found in {case_file.name}") data["cases"] = [target] temp_file = case_file.parent / f"case_temp_{case_index}.json" with open(temp_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f" [Target] case {case_index}: {target.get('title', 'untitled')[:30]}") return temp_file def _merge_single_case_back(case_file: Path, temp_file: Path, case_index: int) -> None: """把临时文件里改过的 case 合并回原 case.json,留快照后写回。""" with open(temp_file, "r", encoding="utf-8") as f: updated_case = json.load(f)["cases"][0] with open(case_file, "r", encoding="utf-8") as f: original = json.load(f) for i, c in enumerate(original["cases"]): if c.get("index") == case_index: original["cases"][i] = updated_case break snap = snapshot_case_file(case_file, step="grounding_merge") if snap: print(f" [snapshot] {snap.name}") with open(case_file, "w", encoding="utf-8") as f: json.dump(original, f, ensure_ascii=False, indent=2) temp_file.unlink() print(f" ✓ Merged case {case_index} back to case.json") async def run_apply_grounding(ctx: PipelineContext) -> None: print(f"\n--- Phase 2: Apply Grounding ---") case_file = ctx.output_dir / "case.json" if not case_file.exists(): ctx.error("case.json not found; run generate-case first") return llm_call, model = _build_main_llm_call(ctx.args) if ctx.args.case_index is not None: try: target_file = _filter_case_to_single(case_file, ctx.args.case_index) except ValueError as e: ctx.error(str(e)) return else: target_file = case_file try: result = await apply_grounding( target_file, llm_call, model=model, max_concurrent=3, ) ctx.track("Apply_Grounding", result.get("total_cost", 0.0)) print( f"🗺️ [Grounding] grounded={result['grounded']}/{result['total']} " f"cost=${result.get('total_cost', 0.0):.4f}" ) except Exception as e: ctx.error(f"Apply grounding failed: {type(e).__name__}: {e}") if ctx.args.case_index is not None and target_file != case_file: target_file.unlink(missing_ok=True) return if ctx.args.case_index is not None: _merge_single_case_back(case_file, target_file, ctx.args.case_index) # ──── CLI parsing & validation ─────────────────────────────────────────────── def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="AIGC Process Pipeline (5-step)") parser.add_argument("--index", type=int, required=True, help="Index of requirement in db_requirements.json (0-based)") parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube,douyin,sph", help="Comma-separated platforms for research step") parser.add_argument("--case-index", type=int, default=None, help="Re-run a single case in decode-workflow / apply-grounding") parser.add_argument("--skip-existing", action="store_true", help="Skip cases whose output already exists " "(default: re-run / overwrite everything; " "currently affects decode-workflow batch mode)") parser.add_argument("--only-step", type=str, choices=STEPS, help="Run only a single step (mutex with --start-from/--end-at)") parser.add_argument("--start-from", type=str, choices=STEPS, help="Start from this step (inclusive)") parser.add_argument("--end-at", type=str, choices=STEPS, help="End at this step (inclusive)") llm_group = parser.add_mutually_exclusive_group() llm_group.add_argument("--use-claude-sdk", action="store_true", help="apply-grounding via Anthropic SDK (mutex with --model)") llm_group.add_argument("--model", type=str, choices=["claude", "gpt", "gemini"], default="claude", help="apply-grounding LLM family via OpenRouter") parser.add_argument("--eval-model", type=str, default=DEFAULT_EVAL_MODEL, choices=list(EVAL_MODELS), help=f"source 阶段 LLM 评估模型(默认 {DEFAULT_EVAL_MODEL})") args = parser.parse_args() if args.only_step and (args.start_from or args.end_at): parser.error("--only-step is mutually exclusive with --start-from / --end-at") return args def _resolve_active_steps(args: argparse.Namespace) -> Set[str]: if args.only_step: return {args.only_step} start = args.start_from or STEPS[0] end = args.end_at or DEFAULT_END start_idx = STEPS.index(start) end_idx = STEPS.index(end) if start_idx > end_idx: print(f"❌ --start-from '{start}' is after --end-at '{end}'") sys.exit(1) return set(STEPS[start_idx:end_idx + 1]) def _validate_args(args: argparse.Namespace, active_steps: Set[str]) -> None: if args.case_index is not None and not (active_steps & CASE_INDEX_STEPS): print( f"❌ --case-index only applies to {sorted(CASE_INDEX_STEPS)}; " f"none of those are in active steps {sorted(active_steps)}" ) sys.exit(1) # --use-claude-sdk / --model 仅在 apply-grounding active 时生效 llm_flags_explicit = ( args.use_claude_sdk or any(a == "--model" or a.startswith("--model=") for a in sys.argv[1:]) ) if llm_flags_explicit and not (active_steps & LLM_CONFIGURABLE_STEPS): print( f"⚠️ --use-claude-sdk / --model are only used by apply-grounding; " f"ignored because active steps {sorted(active_steps)} don't include it" ) # ──── Bootstrap ────────────────────────────────────────────────────────────── def _setup_run_log(output_dir: Path) -> Path: """新建本次运行的 history//,开 Tee 把 stdout/stderr 写进 run.log。""" run_id = datetime.now().strftime("%Y%m%d_%H%M%S") set_run_id(run_id) run_dir = output_dir / "history" / run_id run_dir.mkdir(parents=True, exist_ok=True) log_path = run_dir / "run.log" log_file = open(log_path, "w", encoding="utf-8") initial_case = output_dir / "case.json" if initial_case.exists(): snapshot_case_file(initial_case, step="run_start") sys.stdout = _Tee(sys.__stdout__, log_file) sys.stderr = _Tee(sys.__stderr__, log_file) import atexit atexit.register(log_file.close) print(f"[run-log] tee active → {log_path}") return log_path def _load_requirement(base_dir: Path, index: int) -> str: req_path = base_dir / "db_requirements.json" with open(req_path, encoding="utf-8") as f: reqs = json.load(f) if index < 0 or index >= len(reqs): print(f"❌ Index {index} out of bounds (db has {len(reqs)} entries)") sys.exit(1) return reqs[index] def _save_metrics(ctx: PipelineContext, elapsed_sec: float) -> None: metrics_file = ctx.base_dir / "run_metrics.json" metrics_data: List[Any] = [] if metrics_file.exists(): try: with open(metrics_file, "r", encoding="utf-8") as f: metrics_data = json.load(f) except json.JSONDecodeError: pass metrics_data.append({ "index": ctx.args.index, "requirement": ctx.requirement[:80] + "...", "duration_seconds": round(elapsed_sec, 2), "total_cost_usd": round(ctx.total_cost, 4), "costs_breakdown": ctx.costs_breakdown, "trace_ids": {k: v for k, v in ctx.phase1_trace_ids.items() if v}, "errors": ctx.errors, "active_steps": sorted(ctx.active_steps), "timestamp": datetime.now().isoformat(), }) with open(metrics_file, "w", encoding="utf-8") as f: json.dump(metrics_data, f, indent=2, ensure_ascii=False) # ──── Main dispatch ────────────────────────────────────────────────────────── async def main() -> None: args = _parse_args() active_steps = _resolve_active_steps(args) _validate_args(args, active_steps) base_dir = Path(__file__).parent requirement = _load_requirement(base_dir, args.index) output_dir = base_dir / "output" / f"{(args.index+1):03d}" raw_cases_dir = output_dir / "raw_cases" output_dir.mkdir(parents=True, exist_ok=True) raw_cases_dir.mkdir(parents=True, exist_ok=True) _setup_run_log(output_dir) setup_logging(level=LOG_LEVEL, file=LOG_FILE) print("=" * 60) print(f"Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...") print(f"Active steps: {' → '.join(s for s in STEPS if s in active_steps)}") print("=" * 60) # Load agent presets if available presets_path = base_dir / "presets.json" if presets_path.exists(): from agent.core.presets import load_presets_from_json load_presets_from_json(str(presets_path)) print("✅ Loaded agent presets") store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner_qwen = AgentRunner( trace_store=store, llm_call=create_qwen_llm_call(model=QWEN_MODEL), skills_dir=SKILLS_DIR, ) ctx = PipelineContext( args=args, requirement=requirement, output_dir=output_dir, raw_cases_dir=raw_cases_dir, base_dir=base_dir, runner_qwen=runner_qwen, active_steps=active_steps, ) start_time = time.time() try: # research ⇄ source 是耦合的:两者都 active 时走循环,否则各跑各的 if "research" in active_steps and "source" in active_steps: await run_research_source_loop(ctx) elif "research" in active_steps: await run_research_only(ctx) elif "source" in active_steps: await run_source_only(ctx) if "generate-case" in active_steps: await run_generate_case(ctx) if "decode-workflow" in active_steps: await run_decode_workflow(ctx) if "apply-grounding" in active_steps: await run_apply_grounding(ctx) elapsed = time.time() - start_time _save_metrics(ctx, elapsed) print(f"\n📊 [Metrics] Completed in {elapsed:.1f}s. Total cost: ${ctx.total_cost:.4f}") if ctx.errors: print(f"⚠️ {len(ctx.errors)} error(s) encountered:") for e in ctx.errors[:10]: print(f" - {e}") finally: print("✅ Pipeline run finished.") if __name__ == "__main__": asyncio.run(main())