| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- """
- Decode-Workflow Step(旁路):用 DecodeProcessAgent (LangChain + Gemini) 提取工序
- 跟现有 `extract_workflow.py` 并存:
- - 不替换 case.json 里的 workflow_groups(那个仍由 extract_workflow.py 产出)
- - 输出落到 output/<NNN>/decode_workflows/<case_id>.json + <case_id>.html
- - 旁路存储 — 让你能对比 DecodeProcessAgent vs 当前 extract_workflow 的产出质量
- 使用方式(通过 run_pipeline.py):
- python run_pipeline.py --index 107 --only-step decode-workflow
- 依赖(首次用需要装):
- pip install langchain langchain-google-genai
- .env 中加 GOOGLE_API_KEY=...
- """
- import asyncio
- import json
- import os
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- # 默认模型(DecodeProcessAgent 的初始配置)
- DEFAULT_DECODE_MODEL = os.getenv("DECODE_WORKFLOW_MODEL", "google_genai:gemini-3-flash-preview")
- # 并发 case 数(注意:新版 DecodeProcessAgent.run_batch 已改为同进程顺序执行,
- # 值 >1 会被忽略并打 warning。env var 保留为了向后兼容,默认值改为 1。
- # 想恢复真并发需先把 WorkflowContext 改成 ContextVar 隔离)
- DEFAULT_DECODE_CONCURRENCY = int(os.getenv("DECODE_WORKFLOW_CONCURRENCY", "1"))
- # 是否把 decode 输出合并回 case.json 的 workflow_groups[0].workflow.steps(默认开 — 用户的要求)
- # 0 时只走旁路,不动 case.json
- DEFAULT_MERGE_TO_CASE = os.getenv("DECODE_MERGE_TO_CASE", "1") not in ("0", "false", "False", "")
- # 每个 case 最多传给 agent 的图片张数 — 只对 base64 模式生效(防 Gemini 1M context 爆炸);
- # URL 模式不限(LangChain → Google File API 走 image tokens 不会爆)
- DEFAULT_MAX_IMAGES_PER_CASE = int(os.getenv("DECODE_MAX_IMAGES_PER_CASE", "50"))
- def _resolve_local_images(case: Dict[str, Any], case_file: Path, max_images: int = DEFAULT_MAX_IMAGES_PER_CASE) -> Optional[List[str]]:
- """
- 可选:用本地图片(raw_cases/images/<case_id>/)转 base64 data URI 替代远端 URL。
- 通过环境变量 DECODE_USE_LOCAL_IMAGES=1 opt-in 启用(默认走 URL,因为 base64 会让
- agent 每轮 invoke 重传 user message 累积 → 撞 Gemini 1M context)。
- 返回 None 表示本地没有图片目录或目录为空,调用方应 fallback 到 case.images URL 列表。
- """
- raw = case.get("_raw") or {}
- case_id = raw.get("case_id") or raw.get("channel_content_id")
- if not case_id:
- return None
- images_dir = case_file.parent / "raw_cases" / "images" / case_id
- if not images_dir.is_dir():
- return None
- img_exts = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
- all_img_files = sorted(p for p in images_dir.iterdir() if p.suffix.lower() in img_exts)
- if not all_img_files:
- return None
- truncated = len(all_img_files) > max_images
- img_files = all_img_files[:max_images]
- import base64
- import mimetypes
- data_uris: List[str] = []
- for p in img_files:
- try:
- mime = mimetypes.guess_type(p.name)[0] or "image/jpeg"
- b64 = base64.b64encode(p.read_bytes()).decode("ascii")
- data_uris.append(f"data:{mime};base64,{b64}")
- except Exception as e:
- print(f" [decode-workflow] WARN: failed to read local image {p.name}: {e}", flush=True)
- if truncated:
- print(
- f" [decode-workflow] truncated images: kept first {len(img_files)}/{len(all_img_files)} "
- f"(防 Gemini 1M context 爆炸;通过环境变量 DECODE_MAX_IMAGES_PER_CASE 调整)",
- flush=True,
- )
- return data_uris or None
- def _build_decode_input(case: Dict[str, Any], case_file: Optional[Path] = None) -> Dict[str, Any]:
- """
- 从 case_item 构造 DecodeProcessAgent 期望的输入 JSON。
- images 字段**默认用 case.images 里的远端 URL** —— LangChain 收到 URL 后会
- download → bytes → 上传给 Gemini File API,按 image tokens 算(一张 ~258 tokens),
- 不会撞 1M context。
- **本地 base64 模式是 opt-in**(DECODE_USE_LOCAL_IMAGES=1),因为 base64 data URI 在
- LangChain 内部可能当 text 塞进 history,每轮 invoke 累积会撞 Gemini 1M context(实测
- 14 张图 9 个 step 必爆)。
- """
- raw = case.get("_raw") or {}
- images = case.get("images") or []
- use_local = os.getenv("DECODE_USE_LOCAL_IMAGES", "0") not in ("0", "false", "False", "")
- if use_local and case_file is not None:
- local = _resolve_local_images(case, case_file)
- if local:
- print(
- f" [decode-workflow] case index={case.get('index')}: "
- f"using {len(local)} local images (base64, opt-in via DECODE_USE_LOCAL_IMAGES)",
- flush=True,
- )
- images = local
- return {
- "channel_content_id": raw.get("channel_content_id") or raw.get("case_id") or f"case_{case.get('index', 0)}",
- "title": case.get("title") or "",
- "body_text": case.get("body") or case.get("body_text") or "",
- "images": images,
- }
- def _ensure_decode_agent_importable() -> Path:
- """确保 decode_workflow_agent 目录在 sys.path 内(它的 import 是相对的)。"""
- decode_dir = Path(__file__).resolve().parent / "decode_workflow_agent"
- if not decode_dir.exists():
- raise FileNotFoundError(f"decode_workflow_agent 目录不存在: {decode_dir}")
- if str(decode_dir) not in sys.path:
- sys.path.insert(0, str(decode_dir))
- return decode_dir
- def _setup_gemini_env() -> None:
- """
- 兼容多种 Gemini API key 命名,把它们 alias 到 langchain-google-genai 期望的 GOOGLE_API_KEY。
- langchain-google-genai 默认只读 GOOGLE_API_KEY;用户 .env 里可能是 GEMINI_API_KEY 等命名。
- 同时检测 GEMINI_API_BASE — 若设了,警告"langchain-google-genai 不支持 base URL 覆盖"。
- """
- if not os.environ.get("GOOGLE_API_KEY"):
- for alt in ("GEMINI_API_KEY", "gemini_api_key", "GOOGLE_GENAI_API_KEY"):
- val = os.environ.get(alt)
- if val:
- os.environ["GOOGLE_API_KEY"] = val
- print(f"[decode-workflow] aliased {alt} → GOOGLE_API_KEY", flush=True)
- break
- else:
- print(
- "[decode-workflow] ⚠ no Gemini API key found in env "
- "(tried GOOGLE_API_KEY / GEMINI_API_KEY / GOOGLE_GENAI_API_KEY). "
- "Gemini calls will likely fail. Set GOOGLE_API_KEY=<your_key> in .env",
- flush=True,
- )
- if os.environ.get("GEMINI_API_BASE"):
- print(
- "[decode-workflow] ⚠⚠⚠ GEMINI_API_BASE is set — but langchain-google-genai uses\n"
- " Google's official SDK and does NOT honor base URL overrides; all calls go to\n"
- " generativelanguage.googleapis.com regardless.\n"
- " If your GEMINI_API_KEY is for a 3rd-party proxy (OneAPI / yescode / OpenRouter):\n"
- " → DecodeProcessAgent will get 401/404 from Google's real endpoint.\n"
- " → Consider using OpenRouter Gemini via the main pipeline's --model gemini instead.\n"
- " If your key is from Google AI Studio (https://aistudio.google.com): you can ignore this.",
- flush=True,
- )
- def _merge_decode_into_case(
- case_file: Path,
- target_case_index: int,
- decode_workflow: Dict[str, Any],
- ) -> bool:
- """
- 把 decode 输出原样存到 case.cases[i].decode_workflow 顶层字段(跟 workflow_groups 平级)。
- 设计选择:不动 workflow_groups / capability,让旧字段保留作对照;decode 输出作为新的
- 独立字段存在,下游需要的话单独读 case.decode_workflow.steps。
- Returns:
- True 成功合并,False 没找到目标 case(不报错,只警告)
- """
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- target = None
- for case in case_data.get("cases", []):
- if case.get("index") == target_case_index:
- target = case
- break
- if not target:
- print(
- f" [decode-workflow] merge SKIPPED: case index={target_case_index} not in case.json",
- flush=True,
- )
- return False
- # 只塞 steps + 元信息(不塞 source — source.images 可能含 base64,几 MB 级,污染 case.json)
- target["decode_workflow"] = {
- "channel_content_id": (decode_workflow or {}).get("channel_content_id"),
- "steps": (decode_workflow or {}).get("steps", []) or [],
- "summary": (decode_workflow or {}).get("summary", ""),
- "status": (decode_workflow or {}).get("status", ""),
- }
- with open(case_file, "w", encoding="utf-8") as f:
- json.dump(case_data, f, ensure_ascii=False, indent=2)
- step_count = len(target["decode_workflow"]["steps"])
- status = target["decode_workflow"]["status"]
- print(
- f" [decode-workflow] merged into case.json: "
- f"case index={target_case_index} → cases[].decode_workflow "
- f"({step_count} steps, status={status!r}, source stripped)",
- flush=True,
- )
- return True
- # 跨 case 共享的 case.json 写锁(防多进程/并发场景下的损坏)
- _case_write_lock = asyncio.Lock()
- async def _merge_decode_into_case_async(case_file: Path, target_case_index: int, decode_workflow: Dict[str, Any]) -> bool:
- """异步包装 _merge_decode_into_case,串行化对 case.json 的写入。"""
- async with _case_write_lock:
- return _merge_decode_into_case(case_file, target_case_index, decode_workflow)
- async def extract_decode_workflow(
- case_file: Path,
- llm_call: Any = None, # 兼容旧签名,不使用(DecodeProcessAgent 走自己的 LangChain 链路)
- model: str = DEFAULT_DECODE_MODEL,
- max_concurrent: int = DEFAULT_DECODE_CONCURRENCY,
- case_index: Optional[int] = None, # 单 case 模式:只跑 case.index==case_index 的那一个
- merge_to_case: Optional[bool] = None, # 是否把输出合并回 case.json;None 时读 env DECODE_MERGE_TO_CASE
- skip_existing: bool = False, # True = decode_workflows/<case_<idx>>.json 已存在则跳过;默认全覆盖
- ) -> Dict[str, Any]:
- """
- 顶层入口:对 case.json 里的每个 case 跑 DecodeProcessAgent,输出到 case_dir/decode_workflows/
- case_index 传值时:只处理那一个 case;不传则处理全部。
- merge_to_case=True 时:跑完每个 case 后把 decode 输出的 workflow.steps 覆盖回
- case.json[cases[i]].workflow_groups[0].workflow.steps(不管引用一致性)。
- Returns:
- 统计字典:total, succeeded, skipped, failed, merged, total_cost
- """
- case_file = Path(case_file)
- if not case_file.exists():
- raise FileNotFoundError(f"case.json 不存在: {case_file}")
- if merge_to_case is None:
- merge_to_case = DEFAULT_MERGE_TO_CASE
- print(f"[decode-workflow] merge_to_case = {merge_to_case}", flush=True)
- # 跑前快照:如果要合并回 case.json,先做一份历史备份(万一不满意能回滚)
- if merge_to_case:
- try:
- from examples.process_pipeline.script.case_history import snapshot_case_file
- snap = snapshot_case_file(case_file, step="decode_workflow_merge")
- if snap:
- print(f"[decode-workflow] snapshot saved → {snap.name}", flush=True)
- except Exception as e:
- print(f"[decode-workflow] snapshot SKIPPED: {type(e).__name__}: {e}", flush=True)
- # 输出目录:output/<NNN>/decode_workflows/
- output_dir = case_file.parent / "decode_workflows"
- output_dir.mkdir(parents=True, exist_ok=True)
- inputs_dir = output_dir / "_inputs" # 保存喂给 agent 的 input 文件(audit + skip 检查靠这个)
- inputs_dir.mkdir(parents=True, exist_ok=True)
- # 给 DecodeProcessAgent 设环境变量 → 它会把每个输出落到这里
- os.environ["DECODE_OUTPUT_DIR"] = str(output_dir.resolve())
- print(f"[decode-workflow] output_dir = {output_dir}", flush=True)
- # 兼容 .env 里的 GEMINI_API_KEY 等命名 + 检测 GEMINI_API_BASE 用代理的情况
- _setup_gemini_env()
- # Lazy import — 不用这个 step 时不会 import langchain
- _ensure_decode_agent_importable()
- try:
- from DecodeProcessAgent import DecodeProcessAgent # noqa: I001
- except ImportError as e:
- raise RuntimeError(
- f"无法导入 DecodeProcessAgent。请确保安装了依赖:\n"
- f" pip install langchain langchain-google-genai\n"
- f"以及 .env 中配置 GOOGLE_API_KEY=...\n"
- f"原始错误: {type(e).__name__}: {e}"
- ) from e
- # 读 case.json
- with open(case_file, "r", encoding="utf-8") as f:
- case_data = json.load(f)
- cases = case_data.get("cases") or []
- if not cases:
- print("[decode-workflow] no cases to process", flush=True)
- return {"total": 0, "succeeded": 0, "skipped": 0, "failed": 0, "total_cost": 0.0}
- # 单 case 模式过滤
- if case_index is not None:
- target = [c for c in cases if c.get("index") == case_index]
- if not target:
- available = [c.get("index") for c in cases]
- raise ValueError(
- f"case index={case_index} not found in {case_file}. Available indices: {available}"
- )
- print(f"[decode-workflow] filtered to single case index={case_index}", flush=True)
- cases = target
- # 把每个 case 写成 DecodeProcessAgent 的 input 文件(落在 _inputs/ 下)
- # 文件名用 case_<index>.json — DecodeProcessAgent 用 input_stem 命名 output,
- # 所以最终 decode_workflows/<case_<idx>>.json + .html 也跟着用这个命名。
- print(f"[decode-workflow] preparing {len(cases)} input files in {inputs_dir}", flush=True)
- prepared_paths: List[tuple] = [] # [(input_path, case_index), ...]
- for case in cases:
- decode_input = _build_decode_input(case, case_file=case_file)
- if not decode_input["images"]:
- # 没图片就跳过 — DecodeProcessAgent 会 raise
- print(f" [decode-workflow] skip case index={case.get('index')}: no images", flush=True)
- continue
- idx = case.get("index", 0)
- input_path = inputs_dir / f"case_{idx}.json"
- input_path.write_text(json.dumps(decode_input, ensure_ascii=False, indent=2), encoding="utf-8")
- prepared_paths.append((input_path, idx))
- if not prepared_paths:
- print("[decode-workflow] no eligible cases (all skipped — no images)", flush=True)
- return {"total": len(cases), "succeeded": 0, "skipped": len(cases), "failed": 0, "total_cost": 0.0}
- agent = DecodeProcessAgent(model_name=model)
- merged_count = 0
- # 单 case 模式:直接 agent.run(),不走 run_batch(避免跑 _inputs/ 里的残留旧文件)
- if case_index is not None or len(prepared_paths) == 1:
- target_path, target_idx = prepared_paths[0]
- print(f"[decode-workflow] single-case mode: run {target_path.name}", flush=True)
- try:
- result = await agent.run(str(target_path))
- # 合并回 case.json
- if merge_to_case and result.get("workflow"):
- try:
- ok = await _merge_decode_into_case_async(case_file, target_idx, result["workflow"])
- if ok:
- merged_count = 1
- except Exception as e:
- print(f" [decode-workflow] merge FAILED: {type(e).__name__}: {e}", flush=True)
- return {
- "total": 1,
- "succeeded": 1,
- "skipped": 0,
- "failed": 0,
- "merged": merged_count,
- "total_input_tokens": result.get("input_tokens", 0),
- "total_output_tokens": result.get("output_tokens", 0),
- "total_cost": result.get("cost_usd", 0.0),
- "output_dir": str(output_dir),
- }
- except Exception as e:
- print(f" [decode-workflow] FAILED: {type(e).__name__}: {e}", flush=True)
- # Fallback:agent 异常但 WorkflowContext 可能已经增量写过 step 到磁盘 —
- # 尝试读磁盘上的 case_<idx>.json 做 partial merge(即使 status='in_progress' 也合并)
- partial_merged = 0
- if merge_to_case:
- disk_path = output_dir / f"case_{target_idx}.json"
- if disk_path.exists():
- try:
- with open(disk_path, "r", encoding="utf-8") as f:
- partial_workflow = json.load(f)
- ok = await _merge_decode_into_case_async(case_file, target_idx, partial_workflow)
- if ok:
- partial_merged = 1
- print(
- f" [decode-workflow] ⚠ partial merge: case index={target_idx} status="
- f"{partial_workflow.get('status')!r} (agent 异常前的部分进度已存)",
- flush=True,
- )
- except Exception as me:
- print(
- f" [decode-workflow] partial merge attempt FAILED: {type(me).__name__}: {me}",
- flush=True,
- )
- return {
- "total": 1, "succeeded": 0, "skipped": 0, "failed": 1, "merged": partial_merged,
- "total_input_tokens": 0, "total_output_tokens": 0,
- "total_cost": 0.0, "output_dir": str(output_dir),
- "error": f"{type(e).__name__}: {e}",
- }
- # 全量模式:run_batch 处理整个 _inputs/ 目录(同主进程顺序执行 + 自带 retry + skip_existing)
- # 加 try/finally:即使 run_batch 中途因网络问题/SDK bug 崩了,已生成的旁路文件也要 merge
- summary = {"succeeded": [], "skipped": [], "failed": [], "total_input_tokens": 0, "total_output_tokens": 0, "total_cost_usd": 0.0}
- run_batch_error: Optional[Exception] = None
- try:
- summary = await agent.run_batch(
- input_dir=str(inputs_dir),
- skip_existing=skip_existing,
- concurrency=max_concurrent,
- )
- except Exception as e:
- run_batch_error = e
- print(
- f"\n[decode-workflow] ⚠ run_batch 中途异常: {type(e).__name__}: {e}\n"
- f" → 仍会尝试 merge 已生成的旁路文件到 case.json",
- flush=True,
- )
- # 合并每个成功 case 的输出回 case.json — run_batch 返回的 summary 不含 workflow 数据,
- # 要从磁盘上的 output 文件读。这里**不依赖 summary**,直接扫已存在的 case_<idx>.json,
- # 这样 run_batch 中途崩了也能救回已完成的部分。
- if merge_to_case:
- for path, idx in prepared_paths:
- decode_out_path = output_dir / f"case_{idx}.json"
- if not decode_out_path.exists():
- continue # 该 case 跑失败了或还没到
- try:
- with open(decode_out_path, "r", encoding="utf-8") as f:
- decode_workflow = json.load(f)
- ok = await _merge_decode_into_case_async(case_file, idx, decode_workflow)
- if ok:
- merged_count += 1
- except Exception as e:
- print(
- f" [decode-workflow] merge case index={idx} FAILED: {type(e).__name__}: {e}",
- flush=True,
- )
- result = {
- "total": len(cases),
- "succeeded": len(summary.get("succeeded", [])),
- "skipped": len(summary.get("skipped", [])),
- "failed": len(summary.get("failed", [])),
- "merged": merged_count,
- "total_input_tokens": summary.get("total_input_tokens", 0),
- "total_output_tokens": summary.get("total_output_tokens", 0),
- "total_cost": summary.get("total_cost_usd", 0.0),
- "output_dir": str(output_dir),
- }
- if run_batch_error is not None:
- result["run_batch_error"] = f"{type(run_batch_error).__name__}: {run_batch_error}"
- return result
- if __name__ == "__main__":
- import argparse
- p = argparse.ArgumentParser(description="DecodeProcessAgent 旁路 workflow 提取")
- p.add_argument("--case-file", required=True, help="case.json 路径")
- p.add_argument("--model", default=DEFAULT_DECODE_MODEL)
- p.add_argument("--concurrency", type=int, default=DEFAULT_DECODE_CONCURRENCY)
- args = p.parse_args()
- result = asyncio.run(extract_decode_workflow(
- case_file=Path(args.case_file),
- model=args.model,
- max_concurrent=args.concurrency,
- ))
- print(json.dumps(result, ensure_ascii=False, indent=2))
|