""" Decode-Workflow Step(旁路):用 DecodeProcessAgent (LangChain + Gemini) 提取工序 跟现有 `extract_workflow.py` 并存: - 不替换 case.json 里的 workflow_groups(那个仍由 extract_workflow.py 产出) - 输出落到 output//decode_workflows/.json + .html - 旁路存储 — 让你能对比 DecodeProcessAgent vs 当前 extract_workflow 的产出质量 使用方式(通过 run_pipeline.py): python run_pipeline.py --index 107 --only-step decode-workflow 依赖(首次用需要装): pip install langchain langchain-google-genai .env 中加 GOOGLE_API_KEY=... """ import asyncio import json import os import sys from pathlib import Path from typing import Any, Dict, List, Optional # 默认模型(DecodeProcessAgent 的初始配置) DEFAULT_DECODE_MODEL = os.getenv("DECODE_WORKFLOW_MODEL", "google_genai:gemini-3-flash-preview") # 并发 case 数(注意:新版 DecodeProcessAgent.run_batch 已改为同进程顺序执行, # 值 >1 会被忽略并打 warning。env var 保留为了向后兼容,默认值改为 1。 # 想恢复真并发需先把 WorkflowContext 改成 ContextVar 隔离) DEFAULT_DECODE_CONCURRENCY = int(os.getenv("DECODE_WORKFLOW_CONCURRENCY", "1")) # 是否把 decode 输出合并回 case.json 的 workflow_groups[0].workflow.steps(默认开 — 用户的要求) # 0 时只走旁路,不动 case.json DEFAULT_MERGE_TO_CASE = os.getenv("DECODE_MERGE_TO_CASE", "1") not in ("0", "false", "False", "") # 每个 case 最多传给 agent 的图片张数 — 只对 base64 模式生效(防 Gemini 1M context 爆炸); # URL 模式不限(LangChain → Google File API 走 image tokens 不会爆) DEFAULT_MAX_IMAGES_PER_CASE = int(os.getenv("DECODE_MAX_IMAGES_PER_CASE", "50")) def _resolve_local_images(case: Dict[str, Any], case_file: Path, max_images: int = DEFAULT_MAX_IMAGES_PER_CASE) -> Optional[List[str]]: """ 可选:用本地图片(raw_cases/images//)转 base64 data URI 替代远端 URL。 通过环境变量 DECODE_USE_LOCAL_IMAGES=1 opt-in 启用(默认走 URL,因为 base64 会让 agent 每轮 invoke 重传 user message 累积 → 撞 Gemini 1M context)。 返回 None 表示本地没有图片目录或目录为空,调用方应 fallback 到 case.images URL 列表。 """ raw = case.get("_raw") or {} case_id = raw.get("case_id") or raw.get("channel_content_id") if not case_id: return None images_dir = case_file.parent / "raw_cases" / "images" / case_id if not images_dir.is_dir(): return None img_exts = {".jpg", ".jpeg", ".png", ".webp", ".gif"} all_img_files = sorted(p for p in images_dir.iterdir() if p.suffix.lower() in img_exts) if not all_img_files: return None truncated = len(all_img_files) > max_images img_files = all_img_files[:max_images] import base64 import mimetypes data_uris: List[str] = [] for p in img_files: try: mime = mimetypes.guess_type(p.name)[0] or "image/jpeg" b64 = base64.b64encode(p.read_bytes()).decode("ascii") data_uris.append(f"data:{mime};base64,{b64}") except Exception as e: print(f" [decode-workflow] WARN: failed to read local image {p.name}: {e}", flush=True) if truncated: print( f" [decode-workflow] truncated images: kept first {len(img_files)}/{len(all_img_files)} " f"(防 Gemini 1M context 爆炸;通过环境变量 DECODE_MAX_IMAGES_PER_CASE 调整)", flush=True, ) return data_uris or None def _build_decode_input(case: Dict[str, Any], case_file: Optional[Path] = None) -> Dict[str, Any]: """ 从 case_item 构造 DecodeProcessAgent 期望的输入 JSON。 images 字段**默认用 case.images 里的远端 URL** —— LangChain 收到 URL 后会 download → bytes → 上传给 Gemini File API,按 image tokens 算(一张 ~258 tokens), 不会撞 1M context。 **本地 base64 模式是 opt-in**(DECODE_USE_LOCAL_IMAGES=1),因为 base64 data URI 在 LangChain 内部可能当 text 塞进 history,每轮 invoke 累积会撞 Gemini 1M context(实测 14 张图 9 个 step 必爆)。 """ raw = case.get("_raw") or {} images = case.get("images") or [] use_local = os.getenv("DECODE_USE_LOCAL_IMAGES", "0") not in ("0", "false", "False", "") if use_local and case_file is not None: local = _resolve_local_images(case, case_file) if local: print( f" [decode-workflow] case index={case.get('index')}: " f"using {len(local)} local images (base64, opt-in via DECODE_USE_LOCAL_IMAGES)", flush=True, ) images = local return { "channel_content_id": raw.get("channel_content_id") or raw.get("case_id") or f"case_{case.get('index', 0)}", "title": case.get("title") or "", "body_text": case.get("body") or case.get("body_text") or "", "images": images, } def _ensure_decode_agent_importable() -> Path: """确保 decode_workflow_agent 目录在 sys.path 内(它的 import 是相对的)。""" decode_dir = Path(__file__).resolve().parent / "decode_workflow_agent" if not decode_dir.exists(): raise FileNotFoundError(f"decode_workflow_agent 目录不存在: {decode_dir}") if str(decode_dir) not in sys.path: sys.path.insert(0, str(decode_dir)) return decode_dir def _setup_gemini_env() -> None: """ 兼容多种 Gemini API key 命名,把它们 alias 到 langchain-google-genai 期望的 GOOGLE_API_KEY。 langchain-google-genai 默认只读 GOOGLE_API_KEY;用户 .env 里可能是 GEMINI_API_KEY 等命名。 同时检测 GEMINI_API_BASE — 若设了,警告"langchain-google-genai 不支持 base URL 覆盖"。 """ if not os.environ.get("GOOGLE_API_KEY"): for alt in ("GEMINI_API_KEY", "gemini_api_key", "GOOGLE_GENAI_API_KEY"): val = os.environ.get(alt) if val: os.environ["GOOGLE_API_KEY"] = val print(f"[decode-workflow] aliased {alt} → GOOGLE_API_KEY", flush=True) break else: print( "[decode-workflow] ⚠ no Gemini API key found in env " "(tried GOOGLE_API_KEY / GEMINI_API_KEY / GOOGLE_GENAI_API_KEY). " "Gemini calls will likely fail. Set GOOGLE_API_KEY= in .env", flush=True, ) if os.environ.get("GEMINI_API_BASE"): print( "[decode-workflow] ⚠⚠⚠ GEMINI_API_BASE is set — but langchain-google-genai uses\n" " Google's official SDK and does NOT honor base URL overrides; all calls go to\n" " generativelanguage.googleapis.com regardless.\n" " If your GEMINI_API_KEY is for a 3rd-party proxy (OneAPI / yescode / OpenRouter):\n" " → DecodeProcessAgent will get 401/404 from Google's real endpoint.\n" " → Consider using OpenRouter Gemini via the main pipeline's --model gemini instead.\n" " If your key is from Google AI Studio (https://aistudio.google.com): you can ignore this.", flush=True, ) def _merge_decode_into_case( case_file: Path, target_case_index: int, decode_workflow: Dict[str, Any], ) -> bool: """ 把 decode 输出原样存到 case.cases[i].decode_workflow 顶层字段(跟 workflow_groups 平级)。 设计选择:不动 workflow_groups / capability,让旧字段保留作对照;decode 输出作为新的 独立字段存在,下游需要的话单独读 case.decode_workflow.steps。 Returns: True 成功合并,False 没找到目标 case(不报错,只警告) """ with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) target = None for case in case_data.get("cases", []): if case.get("index") == target_case_index: target = case break if not target: print( f" [decode-workflow] merge SKIPPED: case index={target_case_index} not in case.json", flush=True, ) return False # 只塞 steps + 元信息(不塞 source — source.images 可能含 base64,几 MB 级,污染 case.json) target["decode_workflow"] = { "channel_content_id": (decode_workflow or {}).get("channel_content_id"), "steps": (decode_workflow or {}).get("steps", []) or [], "summary": (decode_workflow or {}).get("summary", ""), "status": (decode_workflow or {}).get("status", ""), } with open(case_file, "w", encoding="utf-8") as f: json.dump(case_data, f, ensure_ascii=False, indent=2) step_count = len(target["decode_workflow"]["steps"]) status = target["decode_workflow"]["status"] print( f" [decode-workflow] merged into case.json: " f"case index={target_case_index} → cases[].decode_workflow " f"({step_count} steps, status={status!r}, source stripped)", flush=True, ) return True # 跨 case 共享的 case.json 写锁(防多进程/并发场景下的损坏) _case_write_lock = asyncio.Lock() async def _merge_decode_into_case_async(case_file: Path, target_case_index: int, decode_workflow: Dict[str, Any]) -> bool: """异步包装 _merge_decode_into_case,串行化对 case.json 的写入。""" async with _case_write_lock: return _merge_decode_into_case(case_file, target_case_index, decode_workflow) async def extract_decode_workflow( case_file: Path, llm_call: Any = None, # 兼容旧签名,不使用(DecodeProcessAgent 走自己的 LangChain 链路) model: str = DEFAULT_DECODE_MODEL, max_concurrent: int = DEFAULT_DECODE_CONCURRENCY, case_index: Optional[int] = None, # 单 case 模式:只跑 case.index==case_index 的那一个 merge_to_case: Optional[bool] = None, # 是否把输出合并回 case.json;None 时读 env DECODE_MERGE_TO_CASE skip_existing: bool = False, # True = decode_workflows/>.json 已存在则跳过;默认全覆盖 ) -> Dict[str, Any]: """ 顶层入口:对 case.json 里的每个 case 跑 DecodeProcessAgent,输出到 case_dir/decode_workflows/ case_index 传值时:只处理那一个 case;不传则处理全部。 merge_to_case=True 时:跑完每个 case 后把 decode 输出的 workflow.steps 覆盖回 case.json[cases[i]].workflow_groups[0].workflow.steps(不管引用一致性)。 Returns: 统计字典:total, succeeded, skipped, failed, merged, total_cost """ case_file = Path(case_file) if not case_file.exists(): raise FileNotFoundError(f"case.json 不存在: {case_file}") if merge_to_case is None: merge_to_case = DEFAULT_MERGE_TO_CASE print(f"[decode-workflow] merge_to_case = {merge_to_case}", flush=True) # 跑前快照:如果要合并回 case.json,先做一份历史备份(万一不满意能回滚) if merge_to_case: try: from examples.process_pipeline.script.case_history import snapshot_case_file snap = snapshot_case_file(case_file, step="decode_workflow_merge") if snap: print(f"[decode-workflow] snapshot saved → {snap.name}", flush=True) except Exception as e: print(f"[decode-workflow] snapshot SKIPPED: {type(e).__name__}: {e}", flush=True) # 输出目录:output//decode_workflows/ output_dir = case_file.parent / "decode_workflows" output_dir.mkdir(parents=True, exist_ok=True) inputs_dir = output_dir / "_inputs" # 保存喂给 agent 的 input 文件(audit + skip 检查靠这个) inputs_dir.mkdir(parents=True, exist_ok=True) # 给 DecodeProcessAgent 设环境变量 → 它会把每个输出落到这里 os.environ["DECODE_OUTPUT_DIR"] = str(output_dir.resolve()) print(f"[decode-workflow] output_dir = {output_dir}", flush=True) # 兼容 .env 里的 GEMINI_API_KEY 等命名 + 检测 GEMINI_API_BASE 用代理的情况 _setup_gemini_env() # Lazy import — 不用这个 step 时不会 import langchain _ensure_decode_agent_importable() try: from DecodeProcessAgent import DecodeProcessAgent # noqa: I001 except ImportError as e: raise RuntimeError( f"无法导入 DecodeProcessAgent。请确保安装了依赖:\n" f" pip install langchain langchain-google-genai\n" f"以及 .env 中配置 GOOGLE_API_KEY=...\n" f"原始错误: {type(e).__name__}: {e}" ) from e # 读 case.json with open(case_file, "r", encoding="utf-8") as f: case_data = json.load(f) cases = case_data.get("cases") or [] if not cases: print("[decode-workflow] no cases to process", flush=True) return {"total": 0, "succeeded": 0, "skipped": 0, "failed": 0, "total_cost": 0.0} # 单 case 模式过滤 if case_index is not None: target = [c for c in cases if c.get("index") == case_index] if not target: available = [c.get("index") for c in cases] raise ValueError( f"case index={case_index} not found in {case_file}. Available indices: {available}" ) print(f"[decode-workflow] filtered to single case index={case_index}", flush=True) cases = target # 把每个 case 写成 DecodeProcessAgent 的 input 文件(落在 _inputs/ 下) # 文件名用 case_.json — DecodeProcessAgent 用 input_stem 命名 output, # 所以最终 decode_workflows/>.json + .html 也跟着用这个命名。 print(f"[decode-workflow] preparing {len(cases)} input files in {inputs_dir}", flush=True) prepared_paths: List[tuple] = [] # [(input_path, case_index), ...] for case in cases: decode_input = _build_decode_input(case, case_file=case_file) if not decode_input["images"]: # 没图片就跳过 — DecodeProcessAgent 会 raise print(f" [decode-workflow] skip case index={case.get('index')}: no images", flush=True) continue idx = case.get("index", 0) input_path = inputs_dir / f"case_{idx}.json" input_path.write_text(json.dumps(decode_input, ensure_ascii=False, indent=2), encoding="utf-8") prepared_paths.append((input_path, idx)) if not prepared_paths: print("[decode-workflow] no eligible cases (all skipped — no images)", flush=True) return {"total": len(cases), "succeeded": 0, "skipped": len(cases), "failed": 0, "total_cost": 0.0} agent = DecodeProcessAgent(model_name=model) merged_count = 0 # 单 case 模式:直接 agent.run(),不走 run_batch(避免跑 _inputs/ 里的残留旧文件) if case_index is not None or len(prepared_paths) == 1: target_path, target_idx = prepared_paths[0] print(f"[decode-workflow] single-case mode: run {target_path.name}", flush=True) try: result = await agent.run(str(target_path)) # 合并回 case.json if merge_to_case and result.get("workflow"): try: ok = await _merge_decode_into_case_async(case_file, target_idx, result["workflow"]) if ok: merged_count = 1 except Exception as e: print(f" [decode-workflow] merge FAILED: {type(e).__name__}: {e}", flush=True) return { "total": 1, "succeeded": 1, "skipped": 0, "failed": 0, "merged": merged_count, "total_input_tokens": result.get("input_tokens", 0), "total_output_tokens": result.get("output_tokens", 0), "total_cost": result.get("cost_usd", 0.0), "output_dir": str(output_dir), } except Exception as e: print(f" [decode-workflow] FAILED: {type(e).__name__}: {e}", flush=True) # Fallback:agent 异常但 WorkflowContext 可能已经增量写过 step 到磁盘 — # 尝试读磁盘上的 case_.json 做 partial merge(即使 status='in_progress' 也合并) partial_merged = 0 if merge_to_case: disk_path = output_dir / f"case_{target_idx}.json" if disk_path.exists(): try: with open(disk_path, "r", encoding="utf-8") as f: partial_workflow = json.load(f) ok = await _merge_decode_into_case_async(case_file, target_idx, partial_workflow) if ok: partial_merged = 1 print( f" [decode-workflow] ⚠ partial merge: case index={target_idx} status=" f"{partial_workflow.get('status')!r} (agent 异常前的部分进度已存)", flush=True, ) except Exception as me: print( f" [decode-workflow] partial merge attempt FAILED: {type(me).__name__}: {me}", flush=True, ) return { "total": 1, "succeeded": 0, "skipped": 0, "failed": 1, "merged": partial_merged, "total_input_tokens": 0, "total_output_tokens": 0, "total_cost": 0.0, "output_dir": str(output_dir), "error": f"{type(e).__name__}: {e}", } # 全量模式:run_batch 处理整个 _inputs/ 目录(同主进程顺序执行 + 自带 retry + skip_existing) # 加 try/finally:即使 run_batch 中途因网络问题/SDK bug 崩了,已生成的旁路文件也要 merge summary = {"succeeded": [], "skipped": [], "failed": [], "total_input_tokens": 0, "total_output_tokens": 0, "total_cost_usd": 0.0} run_batch_error: Optional[Exception] = None try: summary = await agent.run_batch( input_dir=str(inputs_dir), skip_existing=skip_existing, concurrency=max_concurrent, ) except Exception as e: run_batch_error = e print( f"\n[decode-workflow] ⚠ run_batch 中途异常: {type(e).__name__}: {e}\n" f" → 仍会尝试 merge 已生成的旁路文件到 case.json", flush=True, ) # 合并每个成功 case 的输出回 case.json — run_batch 返回的 summary 不含 workflow 数据, # 要从磁盘上的 output 文件读。这里**不依赖 summary**,直接扫已存在的 case_.json, # 这样 run_batch 中途崩了也能救回已完成的部分。 if merge_to_case: for path, idx in prepared_paths: decode_out_path = output_dir / f"case_{idx}.json" if not decode_out_path.exists(): continue # 该 case 跑失败了或还没到 try: with open(decode_out_path, "r", encoding="utf-8") as f: decode_workflow = json.load(f) ok = await _merge_decode_into_case_async(case_file, idx, decode_workflow) if ok: merged_count += 1 except Exception as e: print( f" [decode-workflow] merge case index={idx} FAILED: {type(e).__name__}: {e}", flush=True, ) result = { "total": len(cases), "succeeded": len(summary.get("succeeded", [])), "skipped": len(summary.get("skipped", [])), "failed": len(summary.get("failed", [])), "merged": merged_count, "total_input_tokens": summary.get("total_input_tokens", 0), "total_output_tokens": summary.get("total_output_tokens", 0), "total_cost": summary.get("total_cost_usd", 0.0), "output_dir": str(output_dir), } if run_batch_error is not None: result["run_batch_error"] = f"{type(run_batch_error).__name__}: {run_batch_error}" return result if __name__ == "__main__": import argparse p = argparse.ArgumentParser(description="DecodeProcessAgent 旁路 workflow 提取") p.add_argument("--case-file", required=True, help="case.json 路径") p.add_argument("--model", default=DEFAULT_DECODE_MODEL) p.add_argument("--concurrency", type=int, default=DEFAULT_DECODE_CONCURRENCY) args = p.parse_args() result = asyncio.run(extract_decode_workflow( case_file=Path(args.case_file), model=args.model, max_concurrent=args.concurrency, )) print(json.dumps(result, ensure_ascii=False, indent=2))