| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807 |
- """
- Pipeline: Research → Source → Generate-Case → Decode-Workflow [→ Apply-Grounding]
- ================================================================================
- CLI 速查
- ────────────────────────────────────────────────────────────────────────────────
- 必填:
- --index N 需求索引 (0-based)。输出目录 output/{(index+1):03d}/。
- 步骤拓扑(线性 5 步):
- research → source → generate-case → decode-workflow → apply-grounding
- ↑──────┘
- (phase-1 内部 research⇄source 循环,对 CLI 透明)
- 默认行为:
- 跑 research → source → generate-case → decode-workflow(4 步)。
- apply-grounding 仅手工触发(--only-step apply-grounding 或显式 --end-at)。
- 模式选择(互斥):
- 默认 跑 research..decode-workflow
- --only-step STEP 只跑单步
- --start-from / --end-at 区间跑(含两端),任填一端
- 可选参数:
- --case-index N 仅 decode-workflow / apply-grounding 支持
- --platforms xhs,zhihu,gzh,youtube,douyin,sph research 阶段平台过滤
- --skip-existing 仅在某 case 还没生成 decode 输出时才跑(增量模式)。
- 默认行为是全覆盖:每次跑都把所有 case 重新生成。
- 仅对 decode-workflow 批量模式生效;单 case 模式本身就总是重跑。
- --use-claude-sdk apply-grounding 走 Anthropic 官方 SDK
- --model {claude,gpt,gemini} apply-grounding 走 OpenRouter 后端
- 注:--use-claude-sdk 与 --model 互斥,且只在 active_steps 含 apply-grounding 时生效
- 典型用法:
- # 默认跑完前 4 步
- python run_pipeline.py --index 107
- # 只重跑某 case 的 decode-workflow
- python run_pipeline.py --index 107 --only-step decode-workflow --case-index 3
- # 单独跑 apply-grounding(默认全 case)
- python run_pipeline.py --index 107 --only-step apply-grounding
- # 只采集 xhs / zhihu
- python run_pipeline.py --index 107 --platforms xhs,zhihu
- """
- import argparse
- import asyncio
- import json
- import os
- import sys
- import time
- from dataclasses import dataclass, field
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Set
- PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
- os.chdir(PROJECT_ROOT)
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner, RunConfig
- from agent.tools.builtin.knowledge import KnowledgeConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.utils import setup_logging
- from examples.process_research.config import (
- TRACE_STORE_PATH, SKILLS_DIR, LOG_LEVEL, LOG_FILE,
- )
- from examples.process_pipeline.script.case_history import set_run_id, snapshot_case_file
- from examples.process_pipeline.script.extract_sources import extract_sources_to_json
- from examples.process_pipeline.script.generate_case import generate_case_from_source
- from examples.process_pipeline.script.extract_decode_workflow import extract_decode_workflow
- from examples.process_pipeline.script.apply_to_grounding_agent import apply_grounding
- from examples.process_pipeline.script.validate_schema import validate_case
- from examples.process_pipeline.script.fix_json_quotes import try_fix_and_parse
- # ──── Topology / Constants ────────────────────────────────────────────────────
- STEPS: List[str] = [
- "research", "source", "generate-case", "decode-workflow", "apply-grounding",
- ]
- DEFAULT_END = "decode-workflow" # apply-grounding 不进默认 pipeline
- CASE_INDEX_STEPS = {"decode-workflow", "apply-grounding"}
- LLM_CONFIGURABLE_STEPS = {"apply-grounding"}
- TARGET_QUALIFIED_CASES = 15
- MAX_RESEARCH_ROUNDS = 50
- QWEN_MODEL = "qwen3.5-plus"
- # ──── Logging Tee ─────────────────────────────────────────────────────────────
- class _Tee:
- def __init__(self, *streams):
- self.streams = streams
- def write(self, s):
- for st in self.streams:
- try:
- st.write(s)
- except Exception:
- pass
- self.flush()
- def flush(self):
- for st in self.streams:
- try:
- st.flush()
- except Exception:
- pass
- def isatty(self):
- return False
- # ──── Pipeline Context ────────────────────────────────────────────────────────
- @dataclass
- class PipelineContext:
- args: argparse.Namespace
- requirement: str
- output_dir: Path
- raw_cases_dir: Path
- base_dir: Path
- runner_qwen: AgentRunner
- active_steps: Set[str]
- costs_breakdown: Dict[str, float] = field(default_factory=dict)
- errors: List[str] = field(default_factory=list)
- total_cost: float = 0.0
- phase1_trace_ids: Dict[str, Optional[str]] = field(default_factory=dict)
- def track(self, name: str, cost: float) -> None:
- self.total_cost += cost
- self.costs_breakdown[name] = round(self.costs_breakdown.get(name, 0.0) + cost, 4)
- def error(self, msg: str) -> None:
- print(f"⚠️ [Error] {msg}")
- self.errors.append(msg)
- # ──── researcher runner (the only LLM-driven step left) ──────────────────────
- async def run_agent_task(
- runner: AgentRunner,
- prompt_name: str,
- kwargs: dict,
- task_name: str,
- model_name: str,
- *,
- start_trace_id: Optional[str] = None,
- additional_messages: Optional[list] = None,
- ):
- """
- 跑 prompt → agent loop,监听 write_file/write_json → 立即 schema 校验 + auto-fix。
- 失败时最多 3 次:能续 trace 就续 trace(带错误反馈),否则重新启动。
- 返回 (cost, errors, trace_id)。
- """
- prompt_path = Path(__file__).parent / "prompts" / f"{prompt_name}.prompt"
- prompt = SimplePrompt(prompt_path)
- base_messages = prompt.build_messages(**kwargs)
- if additional_messages:
- base_messages = list(base_messages) + list(additional_messages)
- out_file = kwargs.get("output_file")
- knowledge = KnowledgeConfig(
- enable_completion_extraction=False, enable_extraction=False, enable_injection=False,
- )
- def _instant_validate() -> Optional[str]:
- """文件写入后立即校验,必要时自动修复 JSON 语法。返回错误描述或 None。"""
- if not out_file or not Path(out_file).exists():
- return None
- try:
- with open(out_file, "r", encoding="utf-8") as f:
- raw = f.read()
- try:
- data = json.loads(raw)
- except json.JSONDecodeError:
- ok, data, desc = try_fix_and_parse(raw)
- if not ok:
- return "JSON parse failed, auto-fix unsuccessful"
- with open(out_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" 🔧 [Auto-Fix] {desc}")
- filename = Path(out_file).name
- if filename.startswith("case_"):
- err = validate_case(data)
- if err:
- print(f" ⚠️ [Validation] {err}")
- return err
- print(f" ✅ [Validation] {filename} OK")
- return None
- except Exception as e:
- return str(e)
- async def _run_attempt(messages: list, attempt_name: str, trace_id: Optional[str], temperature: float):
- cost = 0.0
- errs: List[str] = []
- last_tid = trace_id
- cfg = RunConfig(
- model=prompt.config.get("model") or model_name,
- temperature=temperature,
- name=attempt_name,
- agent_type=prompt_name,
- tool_groups=["core", "content"] if prompt_name == "researcher" else ["core"],
- trace_id=trace_id,
- knowledge=knowledge,
- )
- try:
- async for item in runner.run(messages=messages, config=cfg):
- if isinstance(item, Trace):
- last_tid = item.trace_id
- if item.status == "completed":
- cost += item.total_cost
- elif item.status == "failed":
- errs.append(f"{attempt_name} failed: {item.error_message}")
- elif isinstance(item, Message) and item.role == "tool":
- content = item.content if isinstance(item.content, dict) else {}
- if content.get("tool_name") in ("write_file", "write_json"):
- print(f" 💾 [Write] {task_name}")
- _instant_validate()
- except Exception as e:
- errs.append(f"{attempt_name} crashed: {type(e).__name__}: {e}")
- print(f"❌ [Exception] {attempt_name}: {e}")
- return cost, errs, last_tid
- total_cost = 0.0
- total_errors: List[str] = []
- last_trace_id = start_trace_id
- last_validation_error: Optional[str] = None
- default_temp = prompt.config.get("temperature") or 0.3
- for attempt in range(3):
- if attempt == 0:
- print(f"🚀 [Launch] {task_name}")
- cost, errs, last_trace_id = await _run_attempt(
- base_messages, f"{task_name}_A0", last_trace_id, default_temp,
- )
- elif last_trace_id and last_validation_error:
- # 接着上次 trace 跑,告诉它哪里错了
- print(f"🔄 [Continue {attempt}/2] {task_name}")
- fix_msg = [{
- "role": "user",
- "content": (
- f"【系统校验失败】你上一次写入的文件 `{out_file}` 未通过 schema 校验。\n"
- f"错误:{last_validation_error}\n\n"
- f"请重新读取该文件,根据错误修复后再次 write_json 到 `{out_file}`。"
- f"只改有问题的部分,不要丢弃已正确的内容。"
- ),
- }]
- cost, errs, last_trace_id = await _run_attempt(
- fix_msg, f"{task_name}_Fix{attempt}", last_trace_id, 0.1,
- )
- else:
- # 没 trace 可续,从头来
- print(f"🔄 [Retry {attempt}/2] {task_name}")
- if out_file and Path(out_file).exists():
- Path(out_file).unlink()
- cost, errs, last_trace_id = await _run_attempt(
- base_messages, f"{task_name}_A{attempt}", None, default_temp,
- )
- total_cost += cost
- total_errors.extend(errs)
- # Recovery:output 文件没写出来时强制让 agent 写一个
- if out_file and not Path(out_file).exists() and last_trace_id:
- print(f"⚠️ [Recovery] {task_name} missing output, forcing wrap-up")
- rec_msg = [{
- "role": "user",
- "content": (
- f"【系统强制指令】任务终止但未写入文件。请立刻调用 write_json,"
- f"将已搜集到的结构化内容作为 json_data 写入到 `{out_file}`。"
- f"即使空也要写入。"
- ),
- }]
- r_cost, r_errs, last_trace_id = await _run_attempt(
- rec_msg, f"{task_name}_Rec", last_trace_id, 0.1,
- )
- total_cost += r_cost
- total_errors.extend(r_errs)
- # 最终校验
- if not out_file or not str(out_file).endswith(".json"):
- return total_cost, total_errors, last_trace_id
- if not Path(out_file).exists():
- print(f"❌ [Missing] {task_name}: no output file after recovery")
- last_validation_error = None
- continue
- err = _instant_validate()
- if err is None:
- return total_cost, total_errors, last_trace_id
- last_validation_error = err
- total_errors.append(f"{task_name} validation: {err}")
- print(f"❌ [Retry Limit] {task_name} exhausted retries")
- return total_cost, total_errors, last_trace_id
- # ──── Step 1+2: Research ⇄ Source ─────────────────────────────────────────────
- def _build_research_feedback(
- platform: str,
- last_platform_count: Dict[str, int],
- last_src_stats: Dict[str, Any],
- ) -> List[Dict[str, str]]:
- p_count = last_platform_count.get(platform, 0)
- p_filtered = [
- d for d in last_src_stats.get("filtered_details", [])
- if d.get("platform") == platform
- ]
- reason_summary = last_src_stats.get("filtered_reasons", {})
- lines = [
- f"【系统反馈】你在上一轮提取的有效案例数量未达标。",
- f"当前 {platform.upper()} 合格案例:{p_count}/{TARGET_QUALIFIED_CASES}",
- ]
- if reason_summary:
- lines.append(f"过滤统计:{dict(reason_summary)}")
- if p_filtered:
- lines.append(f"\n以下是你提交的被过滤掉的帖子(共 {len(p_filtered)} 条):")
- for item in p_filtered[:10]:
- lines.append(
- f" - [{item['case_id']}] {item['title']} → 原因: {item['filter_reason']}"
- )
- if len(p_filtered) > 10:
- lines.append(f" ... 还有 {len(p_filtered) - 10} 条未列出")
- lines.append(
- "\n请继续搜索并提取更多**全新的、不同的**高质量案例,**追加**写入到原文件。"
- "不要重复之前已找过的案例!针对过滤原因,确保正文详实、评分准确。"
- )
- return [{"role": "user", "content": "\n".join(lines)}]
- async def _run_research_round(
- ctx: PipelineContext,
- active_platforms: List[str],
- round_idx: int,
- platform_traces: Dict[str, Optional[str]],
- last_src_stats: Optional[Dict[str, Any]],
- last_platform_count: Dict[str, int],
- ) -> None:
- """跑一轮 research,多平台并行,结果存进 platform_traces。"""
- tasks = []
- for p in active_platforms:
- task_desc = (
- f"渠道:{p.upper()}。核心需求:{ctx.requirement}。"
- f"目标:至少收集 {TARGET_QUALIFIED_CASES} 条高质量案例(评分>=70、正文充实)。"
- )
- out_file = str(ctx.raw_cases_dir / f"case_{p}.json")
- kwargs = {"task": task_desc, "output_file": out_file}
- additional_msgs = None
- if round_idx > 0 and last_src_stats:
- additional_msgs = _build_research_feedback(p, last_platform_count, last_src_stats)
- tasks.append(run_agent_task(
- ctx.runner_qwen, "researcher", kwargs,
- f"P1_Research_{p}_R{round_idx+1}", QWEN_MODEL,
- start_trace_id=platform_traces[p],
- additional_messages=additional_msgs,
- ))
- results = await asyncio.gather(*tasks)
- for (cost, errs, tid), p in zip(results, active_platforms):
- ctx.track(f"P1_Research_{p}", cost)
- platform_traces[p] = tid
- ctx.phase1_trace_ids[f"P1_Research_{p}"] = tid
- ctx.errors.extend(errs)
- if not (ctx.raw_cases_dir / f"case_{p}.json").exists():
- ctx.error(f"Missing case file for {p}; agent likely hit max_iterations")
- def _extract_sources(ctx: PipelineContext, trace_ids: Optional[List[str]]) -> Optional[Dict[str, Any]]:
- try:
- stats = extract_sources_to_json(ctx.raw_cases_dir, trace_ids=trace_ids)
- print(
- f"📎 [Source] matched={stats['total_matched']}, "
- f"filtered={stats['filtered_total']} → {ctx.raw_cases_dir / 'source.json'}"
- )
- for reason, cnt in stats.get("filtered_reasons", {}).items():
- print(f" - {reason}: {cnt}")
- return stats
- except Exception as e:
- ctx.error(f"Source extraction failed: {type(e).__name__}: {e}")
- return None
- async def run_research_source_loop(ctx: PipelineContext) -> None:
- """完整 phase-1:research ⇄ source 循环,每轮按平台合格数判断停止。"""
- platforms = [p.strip() for p in ctx.args.platforms.split(",") if p.strip()]
- print(f"\n--- Phase 1: Research ⇄ Source loop ({QWEN_MODEL}) ---")
- print(f"📡 Platforms: {platforms}")
- platform_traces: Dict[str, Optional[str]] = {p: None for p in platforms}
- active_platforms = list(platforms)
- last_src_stats: Optional[Dict[str, Any]] = None
- last_platform_count: Dict[str, int] = {}
- round_idx = 0
- while active_platforms and round_idx < MAX_RESEARCH_ROUNDS:
- print(f"\n >>> [Round {round_idx+1}] Active: {active_platforms}")
- await _run_research_round(
- ctx, active_platforms, round_idx, platform_traces,
- last_src_stats, last_platform_count,
- )
- trace_id_list = [tid for tid in ctx.phase1_trace_ids.values() if tid]
- last_src_stats = _extract_sources(ctx, trace_id_list)
- source_file = ctx.raw_cases_dir / "source.json"
- if not source_file.exists():
- print(" ⚠️ source.json not found, continuing loop")
- round_idx += 1
- continue
- with open(source_file, "r", encoding="utf-8") as f:
- source_data = json.load(f)
- platform_count: Dict[str, int] = {}
- for s in source_data.get("sources", []):
- p = s.get("platform")
- if p:
- platform_count[p] = platform_count.get(p, 0) + 1
- last_platform_count = platform_count
- print(f" 📊 Target: >={TARGET_QUALIFIED_CASES}/platform")
- next_active = []
- for p in platforms:
- count = platform_count.get(p, 0)
- if p in active_platforms:
- print(f" - {p}: {count}/{TARGET_QUALIFIED_CASES}")
- if count < TARGET_QUALIFIED_CASES:
- next_active.append(p)
- active_platforms = next_active
- if not active_platforms:
- print(f" ✅ All platforms reached target {TARGET_QUALIFIED_CASES}")
- break
- round_idx += 1
- if round_idx >= MAX_RESEARCH_ROUNDS and active_platforms:
- print(f" ⚠️ Max {MAX_RESEARCH_ROUNDS} rounds reached. Remaining: {active_platforms}")
- async def run_research_only(ctx: PipelineContext) -> None:
- """单步 research:跑一轮,所有 --platforms 都跑,不接 source 校验循环。"""
- platforms = [p.strip() for p in ctx.args.platforms.split(",") if p.strip()]
- if not platforms:
- print(" ❌ No platforms specified")
- sys.exit(1)
- print(f"\n--- Single Step: Research ({QWEN_MODEL}) ---")
- print(f"📡 Platforms: {platforms}")
- platform_traces: Dict[str, Optional[str]] = {p: None for p in platforms}
- await _run_research_round(ctx, platforms, 0, platform_traces, None, {})
- async def run_source_only(ctx: PipelineContext) -> None:
- """单步 source:从已有 case_*.json 提取 source.json。"""
- print(f"\n--- Single Step: Source Extraction ---")
- _extract_sources(ctx, trace_ids=None)
- # ──── Step 3: generate-case ──────────────────────────────────────────────────
- async def run_generate_case(ctx: PipelineContext) -> None:
- print(f"\n--- Phase 1.3: Generate case.json ---")
- source_file = ctx.raw_cases_dir / "source.json"
- if not source_file.exists():
- ctx.error("source.json not found; run research/source first")
- return
- try:
- result = await generate_case_from_source(ctx.raw_cases_dir)
- print(f"📦 [Generate Case] cases={result['total_cases']} → {result['output_file']}")
- except Exception as e:
- ctx.error(f"Generate case failed: {type(e).__name__}: {e}")
- # ──── Step 4: decode-workflow ────────────────────────────────────────────────
- async def run_decode_workflow(ctx: PipelineContext) -> None:
- print(f"\n--- Phase 2: Decode Workflow (Gemini + LangChain) ---")
- case_file = ctx.output_dir / "case.json"
- if not case_file.exists():
- ctx.error("case.json not found; run generate-case first")
- return
- try:
- result = await extract_decode_workflow(
- case_file=case_file,
- case_index=ctx.args.case_index, # None = all cases
- skip_existing=ctx.args.skip_existing,
- )
- ctx.track("Decode_Workflow", result.get("total_cost", 0.0))
- print(
- f" ✓ decode-workflow: succeeded={result['succeeded']}/{result['total']} "
- f"skipped={result['skipped']} failed={result['failed']} "
- f"merged={result.get('merged', 0)} cost=${result['total_cost']}"
- )
- print(f" output_dir: {result['output_dir']}")
- except ImportError as e:
- ctx.error(
- f"decode-workflow 依赖未安装: {e}; "
- f"需要 pip install langchain langchain-google-genai 以及 .env GOOGLE_API_KEY"
- )
- except Exception as e:
- ctx.error(f"Decode workflow failed: {type(e).__name__}: {e}")
- # ──── Step 5: apply-grounding ────────────────────────────────────────────────
- def _build_main_llm_call(args: argparse.Namespace) -> tuple:
- """根据 --use-claude-sdk / --model 创建 grounding 用的 llm_call。"""
- if args.use_claude_sdk:
- model = "claude-sonnet-4-6"
- from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call
- print(f"✅ apply-grounding via Claude Agent SDK (OAuth): {model}")
- return create_claude_code_oauth_llm_call(model=model), model
- model_map = {
- "claude": "claude-sonnet-4-6",
- "gpt": "gpt-5.4",
- "gemini": "~google/gemini-pro-latest",
- }
- model = model_map.get(args.model, "claude-sonnet-4-6")
- from agent.llm.openrouter import create_openrouter_llm_call
- print(f"✅ apply-grounding via OpenRouter: {model}")
- return create_openrouter_llm_call(model=model), model
- def _filter_case_to_single(case_file: Path, case_index: int) -> Path:
- """挑出指定 case 写入临时文件,返回临时文件路径。"""
- with open(case_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- cases = data.get("cases", [])
- target = next((c for c in cases if c.get("index") == case_index), None)
- if not target:
- raise ValueError(f"Case with index {case_index} not found in {case_file.name}")
- data["cases"] = [target]
- temp_file = case_file.parent / f"case_temp_{case_index}.json"
- with open(temp_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- print(f" [Target] case {case_index}: {target.get('title', 'untitled')[:30]}")
- return temp_file
- def _merge_single_case_back(case_file: Path, temp_file: Path, case_index: int) -> None:
- """把临时文件里改过的 case 合并回原 case.json,留快照后写回。"""
- with open(temp_file, "r", encoding="utf-8") as f:
- updated_case = json.load(f)["cases"][0]
- with open(case_file, "r", encoding="utf-8") as f:
- original = json.load(f)
- for i, c in enumerate(original["cases"]):
- if c.get("index") == case_index:
- original["cases"][i] = updated_case
- break
- snap = snapshot_case_file(case_file, step="grounding_merge")
- if snap:
- print(f" [snapshot] {snap.name}")
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(original, f, ensure_ascii=False, indent=2)
- temp_file.unlink()
- print(f" ✓ Merged case {case_index} back to case.json")
- async def run_apply_grounding(ctx: PipelineContext) -> None:
- print(f"\n--- Phase 2: Apply Grounding ---")
- case_file = ctx.output_dir / "case.json"
- if not case_file.exists():
- ctx.error("case.json not found; run generate-case first")
- return
- llm_call, model = _build_main_llm_call(ctx.args)
- if ctx.args.case_index is not None:
- try:
- target_file = _filter_case_to_single(case_file, ctx.args.case_index)
- except ValueError as e:
- ctx.error(str(e))
- return
- else:
- target_file = case_file
- try:
- result = await apply_grounding(
- target_file, llm_call, model=model, max_concurrent=3,
- )
- ctx.track("Apply_Grounding", result.get("total_cost", 0.0))
- print(
- f"🗺️ [Grounding] grounded={result['grounded']}/{result['total']} "
- f"cost=${result.get('total_cost', 0.0):.4f}"
- )
- except Exception as e:
- ctx.error(f"Apply grounding failed: {type(e).__name__}: {e}")
- if ctx.args.case_index is not None and target_file != case_file:
- target_file.unlink(missing_ok=True)
- return
- if ctx.args.case_index is not None:
- _merge_single_case_back(case_file, target_file, ctx.args.case_index)
- # ──── CLI parsing & validation ───────────────────────────────────────────────
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="AIGC Process Pipeline (5-step)")
- parser.add_argument("--index", type=int, required=True,
- help="Index of requirement in db_requirements.json (0-based)")
- parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube,douyin,sph",
- help="Comma-separated platforms for research step")
- parser.add_argument("--case-index", type=int, default=None,
- help="Re-run a single case in decode-workflow / apply-grounding")
- parser.add_argument("--skip-existing", action="store_true",
- help="Skip cases whose output already exists "
- "(default: re-run / overwrite everything; "
- "currently affects decode-workflow batch mode)")
- parser.add_argument("--only-step", type=str, choices=STEPS,
- help="Run only a single step (mutex with --start-from/--end-at)")
- parser.add_argument("--start-from", type=str, choices=STEPS,
- help="Start from this step (inclusive)")
- parser.add_argument("--end-at", type=str, choices=STEPS,
- help="End at this step (inclusive)")
- llm_group = parser.add_mutually_exclusive_group()
- llm_group.add_argument("--use-claude-sdk", action="store_true",
- help="apply-grounding via Anthropic SDK (mutex with --model)")
- llm_group.add_argument("--model", type=str, choices=["claude", "gpt", "gemini"],
- default="claude",
- help="apply-grounding LLM family via OpenRouter")
- args = parser.parse_args()
- if args.only_step and (args.start_from or args.end_at):
- parser.error("--only-step is mutually exclusive with --start-from / --end-at")
- return args
- def _resolve_active_steps(args: argparse.Namespace) -> Set[str]:
- if args.only_step:
- return {args.only_step}
- start = args.start_from or STEPS[0]
- end = args.end_at or DEFAULT_END
- start_idx = STEPS.index(start)
- end_idx = STEPS.index(end)
- if start_idx > end_idx:
- print(f"❌ --start-from '{start}' is after --end-at '{end}'")
- sys.exit(1)
- return set(STEPS[start_idx:end_idx + 1])
- def _validate_args(args: argparse.Namespace, active_steps: Set[str]) -> None:
- if args.case_index is not None and not (active_steps & CASE_INDEX_STEPS):
- print(
- f"❌ --case-index only applies to {sorted(CASE_INDEX_STEPS)}; "
- f"none of those are in active steps {sorted(active_steps)}"
- )
- sys.exit(1)
- # --use-claude-sdk / --model 仅在 apply-grounding active 时生效
- llm_flags_explicit = (
- args.use_claude_sdk
- or any(a == "--model" or a.startswith("--model=") for a in sys.argv[1:])
- )
- if llm_flags_explicit and not (active_steps & LLM_CONFIGURABLE_STEPS):
- print(
- f"⚠️ --use-claude-sdk / --model are only used by apply-grounding; "
- f"ignored because active steps {sorted(active_steps)} don't include it"
- )
- # ──── Bootstrap ──────────────────────────────────────────────────────────────
- def _setup_run_log(output_dir: Path) -> Path:
- """新建本次运行的 history/<run_id>/,开 Tee 把 stdout/stderr 写进 run.log。"""
- run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
- set_run_id(run_id)
- run_dir = output_dir / "history" / run_id
- run_dir.mkdir(parents=True, exist_ok=True)
- log_path = run_dir / "run.log"
- log_file = open(log_path, "w", encoding="utf-8")
- initial_case = output_dir / "case.json"
- if initial_case.exists():
- snapshot_case_file(initial_case, step="run_start")
- sys.stdout = _Tee(sys.__stdout__, log_file)
- sys.stderr = _Tee(sys.__stderr__, log_file)
- import atexit
- atexit.register(log_file.close)
- print(f"[run-log] tee active → {log_path}")
- return log_path
- def _load_requirement(base_dir: Path, index: int) -> str:
- req_path = base_dir / "db_requirements.json"
- with open(req_path, encoding="utf-8") as f:
- reqs = json.load(f)
- if index < 0 or index >= len(reqs):
- print(f"❌ Index {index} out of bounds (db has {len(reqs)} entries)")
- sys.exit(1)
- return reqs[index]
- def _save_metrics(ctx: PipelineContext, elapsed_sec: float) -> None:
- metrics_file = ctx.base_dir / "run_metrics.json"
- metrics_data: List[Any] = []
- if metrics_file.exists():
- try:
- with open(metrics_file, "r", encoding="utf-8") as f:
- metrics_data = json.load(f)
- except json.JSONDecodeError:
- pass
- metrics_data.append({
- "index": ctx.args.index,
- "requirement": ctx.requirement[:80] + "...",
- "duration_seconds": round(elapsed_sec, 2),
- "total_cost_usd": round(ctx.total_cost, 4),
- "costs_breakdown": ctx.costs_breakdown,
- "trace_ids": {k: v for k, v in ctx.phase1_trace_ids.items() if v},
- "errors": ctx.errors,
- "active_steps": sorted(ctx.active_steps),
- "timestamp": datetime.now().isoformat(),
- })
- with open(metrics_file, "w", encoding="utf-8") as f:
- json.dump(metrics_data, f, indent=2, ensure_ascii=False)
- # ──── Main dispatch ──────────────────────────────────────────────────────────
- async def main() -> None:
- args = _parse_args()
- active_steps = _resolve_active_steps(args)
- _validate_args(args, active_steps)
- base_dir = Path(__file__).parent
- requirement = _load_requirement(base_dir, args.index)
- output_dir = base_dir / "output" / f"{(args.index+1):03d}"
- raw_cases_dir = output_dir / "raw_cases"
- output_dir.mkdir(parents=True, exist_ok=True)
- raw_cases_dir.mkdir(parents=True, exist_ok=True)
- _setup_run_log(output_dir)
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
- print("=" * 60)
- print(f"Pipeline | Demand: [{args.index+1:03d}] {requirement[:40]}...")
- print(f"Active steps: {' → '.join(s for s in STEPS if s in active_steps)}")
- print("=" * 60)
- # Load agent presets if available
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- from agent.core.presets import load_presets_from_json
- load_presets_from_json(str(presets_path))
- print("✅ Loaded agent presets")
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- runner_qwen = AgentRunner(
- trace_store=store,
- llm_call=create_qwen_llm_call(model=QWEN_MODEL),
- skills_dir=SKILLS_DIR,
- )
- ctx = PipelineContext(
- args=args,
- requirement=requirement,
- output_dir=output_dir,
- raw_cases_dir=raw_cases_dir,
- base_dir=base_dir,
- runner_qwen=runner_qwen,
- active_steps=active_steps,
- )
- start_time = time.time()
- try:
- # research ⇄ source 是耦合的:两者都 active 时走循环,否则各跑各的
- if "research" in active_steps and "source" in active_steps:
- await run_research_source_loop(ctx)
- elif "research" in active_steps:
- await run_research_only(ctx)
- elif "source" in active_steps:
- await run_source_only(ctx)
- if "generate-case" in active_steps:
- await run_generate_case(ctx)
- if "decode-workflow" in active_steps:
- await run_decode_workflow(ctx)
- if "apply-grounding" in active_steps:
- await run_apply_grounding(ctx)
- elapsed = time.time() - start_time
- _save_metrics(ctx, elapsed)
- print(f"\n📊 [Metrics] Completed in {elapsed:.1f}s. Total cost: ${ctx.total_cost:.4f}")
- if ctx.errors:
- print(f"⚠️ {len(ctx.errors)} error(s) encountered:")
- for e in ctx.errors[:10]:
- print(f" - {e}")
- finally:
- print("✅ Pipeline run finished.")
- if __name__ == "__main__":
- asyncio.run(main())
|