""" 通用 prompt 执行脚本:输入若干文件 + 一段 prompt 字符串,调用 LLM 输出到 txt。 四种 mode(默认 claude-agent): - claude-agent → **Agent 模式**(默认):开放 Read/Grep/Glob 工具,LLM 自己按需读文件 适合处理超 context window 的大文件 — LLM 不会一次性读完整文件, 而是用工具分片读 / 关键词检索后再读 — 类似 claude.ai 网页处理大文件的方式 - openrouter → OpenRouter HTTP(一次性调用,文件全文塞 prompt,受 context window 限制) - claude-sdk → Claude Agent SDK / OAuth 单次调用(不开工具,与其它一次性 mode 等价) - anthropic → Anthropic 原生 API(一次性调用) 用法: # 1) 默认 claude-agent — 直接给文件路径,LLM 自己 Read / Grep python run_prompt.py --prompt "在这个 JSON 里找 X 相关的条目" --file large.json --output out.txt # 2) 强制走一次性调用(小文件、想要确定性输出时) python run_prompt.py --mode openrouter --prompt "总结以下文件" --file a.json --output out.txt # 3) Prompt 从 stdin Get-Content prompt.txt | python run_prompt.py --prompt - --file a.json 脚本通过探测 .git/pyproject.toml 自动定位项目根,可以放在仓库内任意位置。 """ import argparse import asyncio import json import os import sys from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional # Windows 控制台 UTF-8 for _s in (sys.stdout, sys.stderr): try: _s.reconfigure(encoding="utf-8") except (AttributeError, OSError): pass # 智能探测项目根:沿父目录上爬,找到含 .git / pyproject.toml 的目录。 # 这样脚本无论放在 scratch/ 还是 examples/process_pipeline/scratch/ 都能正确工作。 def _find_project_root(start: Path) -> Path: p = start.resolve() for ancestor in [p, *p.parents]: if (ancestor / ".git").exists() or (ancestor / "pyproject.toml").exists(): return ancestor return start.resolve().parent PROJECT_ROOT = _find_project_root(Path(__file__)) SCRIPT_DIR = Path(__file__).resolve().parent # agent mode 的默认工作目录 sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv(PROJECT_ROOT / ".env") # 所有输出都落在脚本目录下的 outputs/ 沙盒里。 # 把脚本/数据/输出物理隔离,方便 .gitignore 也方便定位结果。 OUTPUTS_DIR = SCRIPT_DIR / "outputs" def default_output_path(mode: str) -> Path: """不传 --output 时的默认输出文件:outputs/ 下带时间戳的 txt,避免覆盖。""" ts = datetime.now().strftime("%Y%m%d_%H%M%S") return OUTPUTS_DIR / f"result_{mode}_{ts}.txt" def resolve_user_output(rel_path: str) -> Path: """ 把用户的 --output 解析到 OUTPUTS_DIR 下。 - 必须是相对路径 - 解析后必须落在 OUTPUTS_DIR 之内(防 `..` 越界) """ p = Path(rel_path) if p.is_absolute(): raise SystemExit( f"ERROR: --output 必须是相对路径(相对 outputs/),不能是绝对路径: {rel_path!r}" ) target = (OUTPUTS_DIR / p).resolve() outputs_root = OUTPUTS_DIR.resolve() try: target.relative_to(outputs_root) except ValueError: raise SystemExit( f"ERROR: --output 解析后必须仍在 outputs/ 内,但 {rel_path!r} 越界到了 " f"{target}(不允许使用 '..' 跳出沙盒)" ) return target # ───────────────────────────── LLM mode factories ──────────────────────────── DEFAULT_MODELS = { "openrouter": "claude-sonnet-4-6", "claude-sdk": "claude-sonnet-4-6", "anthropic": "claude-sonnet-4-5", "claude-agent": "claude-sonnet-4-6", # Agent 模式:开放工具按需读文件 } # 每个模型族的输出 token 上限(来自 Anthropic/OpenAI 文档)。 # 按 substring 匹配模型名,第一个命中的为准;未命中走 _DEFAULT_MAX_OUTPUT。 _MAX_OUTPUT_TOKENS = [ # Claude 4.x — Anthropic 公布的输出上限 ("sonnet-4", 64000), ("opus-4", 32000), ("haiku-4", 8192), # Claude 3.5 / 3.7 ("sonnet-3-5", 8192), ("sonnet-3.5", 8192), ("haiku-3", 4096), ("opus-3", 4096), # OpenRouter 上 OpenAI / Google 常见模型 ("gpt-5", 16384), ("gpt-4", 16384), ("gemini", 8192), ("deepseek", 8192), ] _DEFAULT_MAX_OUTPUT = 8192 def resolve_max_output_tokens(model: str) -> int: """按模型名前缀匹配输出 token 上限,未匹配回退默认值。""" m = model.lower() for key, cap in _MAX_OUTPUT_TOKENS: if key in m: return cap return _DEFAULT_MAX_OUTPUT def build_llm_call(mode: str, model: str) -> Callable: """ 根据 mode 实例化 llm_call。三种一次性 mode 都返回相同契约的 async 函数: async (messages, model=..., **kwargs) -> {"content": str, "usage": {...}} (claude-agent 模式不走这里,单独分叉到 run_claude_agent_mode。) """ if mode == "openrouter": from agent.llm.openrouter import create_openrouter_llm_call return create_openrouter_llm_call(model=model) if mode == "claude-sdk": from agent.llm.claude_code_oauth import create_claude_code_oauth_llm_call return create_claude_code_oauth_llm_call(model=model) if mode == "anthropic": from agent.llm.claude import create_claude_llm_call return create_claude_llm_call(model=model) raise ValueError(f"Unknown mode: {mode!r}. Choose from {list(DEFAULT_MODELS)}") # ────────────────────────────── prompt assembly ────────────────────────────── def read_prompt(prompt_arg: Optional[str], prompt_file: Optional[str]) -> str: """从 --prompt / --prompt-file / stdin 三选一拿 prompt 字符串。""" if prompt_file: return Path(prompt_file).read_text(encoding="utf-8").strip() if prompt_arg == "-": return sys.stdin.read().strip() if prompt_arg: return prompt_arg raise SystemExit("ERROR: must provide --prompt TEXT, --prompt - (stdin), or --prompt-file PATH") def read_file_for_prompt(path: str) -> str: """读单个文件内容。大文件不做客户端预检 —— 信任 LLM 端的报错。""" p = Path(path) if not p.exists(): raise FileNotFoundError(f"File not found: {path}") return p.read_text(encoding="utf-8", errors="replace") def assemble_prompt(prompt: str, files: List[str]) -> str: """拼接最终 prompt:用户 prompt 在前,每个文件用 `=== file: ===` 分隔附在后面。""" if not files: return prompt blocks = [prompt.rstrip(), ""] for path in files: content = read_file_for_prompt(path) blocks.append(f"=== file: {path} ({len(content):,} chars) ===") blocks.append(content) blocks.append("") return "\n".join(blocks).rstrip() + "\n" # ─────────────────────────────── response handling ─────────────────────────── _TRUNCATION_REASONS = {"length", "max_tokens", "MAX_TOKENS"} def extract_text_and_usage(response: Dict[str, Any]) -> tuple: """从 llm_call 返回值抽 content / usage / finish_reason。三个一次性 mode 契约一致。""" content = response.get("content", "") if isinstance(content, list): parts = [] for block in content: if isinstance(block, dict): parts.append(block.get("text") or "") else: parts.append(str(block)) content = "".join(parts) elif not isinstance(content, str): content = str(content) usage = response.get("usage") or {} if hasattr(usage, "__dict__") and not isinstance(usage, dict): usage = {k: getattr(usage, k) for k in dir(usage) if not k.startswith("_") and not callable(getattr(usage, k))} in_tok = usage.get("input_tokens") or usage.get("prompt_tokens") or 0 out_tok = usage.get("output_tokens") or usage.get("completion_tokens") or 0 finish_reason = response.get("finish_reason") or response.get("stop_reason") return content, { "input_tokens": in_tok, "output_tokens": out_tok, "finish_reason": finish_reason, "raw": usage, } # ────────────────────────── claude-agent mode (tools) ─────────────────────── DEFAULT_AGENT_TOOLS = ["Read", "Grep", "Glob"] async def run_claude_agent_mode(args: argparse.Namespace, prompt: str, files: List[str]) -> int: """ Agent 模式:用 ClaudeSDKClient 开放工具,让 LLM 自己 Read/Grep 文件。 与其它三个 mode 的根本区别:不把文件全文塞 prompt,而是把"路径 + 工具能力"给 LLM。 适合处理超 context window 的大文件。 实现参考 agent/llm/claude_code_oauth.py,但关键区别: - allowed_tools 开放(而非 []) - max_turns > 1(而非 1) - cwd 设到脚本所在目录(让相对路径文件能直接被 Read) """ try: from claude_agent_sdk import ( AssistantMessage, ClaudeAgentOptions, ClaudeSDKClient, ClaudeSDKError, ResultMessage, TextBlock, ) except ImportError as e: print(f"!!! ERROR: claude_agent_sdk not installed: {e}", file=sys.stderr) print("!!! pip install claude-agent-sdk", file=sys.stderr) return 1 # 抹掉 API key 让 SDK 走 OAuth(复用 claude_code_oauth.py 的处理) override_env = { "ANTHROPIC_API_KEY": "", "ANTHROPIC_BASE_URL": "", "ANTHROPIC_AUTH_TOKEN": "", } # 构造 prompt:把文件路径作为引用而非内联全文 if files: abs_files = [str(Path(f).resolve()) for f in files] file_listing = "\n".join(f"- {p}" for p in abs_files) full_prompt = ( f"{prompt.rstrip()}\n\n" f"---\n" f"可用文件(用 Read/Grep/Glob 工具按需读取,**不要**一次性读完整文件):\n" f"{file_listing}\n\n" f"## 工具使用规则(重要 — 违反会导致 SDK 子进程 crash)\n\n" f"SDK 子进程的 stdin/stdout JSON 消息**硬上限是 1MB**,单次工具调用返回数据" f"超过该上限会让整个 agent 进程崩溃。所以:\n\n" f"- **Read**:单次 `limit` 不要超过 500 行;大文件请多次 Read 用 `offset` 翻页。\n" f"- **Grep**:必须显式设 `head_limit`(≤ 200);**永远不要**设 `head_limit=0`" f"(在 SDK 里等同于无限制)。如果只是想知道有没有命中,用 " f"`output_mode=\"files_with_matches\"` 或 `output_mode=\"count\"`。\n" f"- **Glob**:返回的文件列表自然受 head_limit 控制,但避免对大目录用 `**/*` 等过宽的 pattern。\n" f"- 工作策略:先小范围探测(结构、字段、行数),再有针对性地读局部 — 不要试图把全文倒进 context。\n" ) else: full_prompt = prompt allowed_tools = args.allowed_tools or DEFAULT_AGENT_TOOLS max_turns = args.max_turns stderr_lines: List[str] = [] def _capture_stderr(line: str) -> None: if line: stderr_lines.append(line) options = ClaudeAgentOptions( model=args.model, allowed_tools=allowed_tools, max_turns=max_turns, cwd=str(SCRIPT_DIR), # 工具的工作目录设为脚本所在目录 env=override_env, stderr=_capture_stderr, setting_sources=[], # 屏蔽用户级 ~/.claude/ 配置注入 ) print( f"[info] mode=claude-agent model={args.model} " f"allowed_tools={allowed_tools} max_turns={max_turns}", file=sys.stderr, ) print(f"[info] cwd={SCRIPT_DIR}", file=sys.stderr) print(f"[info] prompt: {len(full_prompt):,} chars files={len(files)}", file=sys.stderr) if args.show_prompt: print("─── assembled prompt ───", file=sys.stderr) print(full_prompt, file=sys.stderr) print("─── end prompt ───", file=sys.stderr) text_parts: List[str] = [] usage: Dict[str, Any] = {} is_error = False result_subtype: Optional[str] = None result_errors: List[str] = [] try: async with ClaudeSDKClient(options=options) as client: await client.query(full_prompt) async for msg in client.receive_response(): msg_type = type(msg).__name__ if isinstance(msg, AssistantMessage): for block in msg.content: if hasattr(block, "thinking"): continue # thinking 内容跳过 elif isinstance(block, TextBlock): text_parts.append(block.text) preview = block.text.replace("\n", " ")[:160] print(f"[agent text] {preview}", file=sys.stderr) elif hasattr(block, "name") and hasattr(block, "input"): tool_input_str = json.dumps( block.input, ensure_ascii=False )[:240] print( f"[agent tool_use] {block.name}({tool_input_str})", file=sys.stderr, ) else: print( f"[agent {type(block).__name__}] {block!r}"[:240], file=sys.stderr, ) elif isinstance(msg, ResultMessage): if msg.usage: usage = dict(msg.usage) is_error = msg.is_error result_subtype = msg.subtype result_errors = list(msg.errors or []) print( f"[info] agent done: turns={msg.num_turns} " f"duration={msg.duration_ms}ms " f"in={usage.get('input_tokens', 0)} " f"out={usage.get('output_tokens', 0)} " f"is_error={is_error}", file=sys.stderr, ) elif msg_type == "SystemMessage": subtype = getattr(msg, "subtype", "?") print(f"[agent system] subtype={subtype}", file=sys.stderr) except ClaudeSDKError as e: import traceback print("\n" + "!" * 78, file=sys.stderr) print(f"!!! ClaudeSDKError: {type(e).__name__}: {e}", file=sys.stderr) if stderr_lines: print("!!! CLI stderr (last 20 lines):", file=sys.stderr) for line in stderr_lines[-20:]: print(line, file=sys.stderr) print("!" * 78, file=sys.stderr) traceback.print_exc(file=sys.stderr) return 1 if is_error: print( f"\n!!! agent reported is_error=True subtype={result_subtype} " f"errors={result_errors}", file=sys.stderr, ) return 1 content = "".join(text_parts).strip() _write_output(content, args, mode="claude-agent") return 0 def _write_output(content: str, args: argparse.Namespace, mode: str) -> None: """统一输出处理:默认写 outputs/result__.txt,--stdout 才打 stdout。""" if args.stdout: print(content) return out_path = resolve_user_output(args.output) if args.output else default_output_path(mode) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(content, encoding="utf-8") print( f"[info] written to {out_path} ({len(content)} chars)", file=sys.stderr, ) # ─────────────────────────────────── main ──────────────────────────────────── async def run(args: argparse.Namespace) -> int: """ Return codes: 0 — success 1 — LLM call or other runtime error (full traceback + server body printed) 2 — output truncated (hit max_tokens); partial result still written """ # Fail-fast:先校验 --output 合法性,免得 agent 跑 5 分钟才报路径错 if args.output and not args.stdout: resolve_user_output(args.output) # 不合法会 SystemExit OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) prompt = read_prompt(args.prompt, args.prompt_file) # Agent 模式:分叉到 ClaudeSDKClient 路径(不走 build_llm_call) if args.mode == "claude-agent": return await run_claude_agent_mode(args, prompt, args.file or []) full_prompt = assemble_prompt(prompt, args.file or []) # max_tokens: 默认按 model 取该模型族的最大输出 token effective_max_tokens = args.max_tokens or resolve_max_output_tokens(args.model) print( f"[info] mode={args.mode} model={args.model} " f"max_tokens={effective_max_tokens}", file=sys.stderr, ) print( f"[info] prompt: {len(full_prompt):,} chars files={len(args.file or [])}", file=sys.stderr, ) if args.show_prompt: print("─── assembled prompt ───", file=sys.stderr) print(full_prompt, file=sys.stderr) print("─── end prompt ───", file=sys.stderr) llm_call = build_llm_call(args.mode, args.model) messages = [{"role": "user", "content": full_prompt}] call_kwargs: Dict[str, Any] = { "messages": messages, "model": args.model, "temperature": args.temperature, "max_tokens": effective_max_tokens, } try: response = await llm_call(**call_kwargs) except Exception as e: import traceback print("\n" + "!" * 78, file=sys.stderr) print(f"!!! LLM CALL FAILED: {type(e).__name__}: {e}", file=sys.stderr) # httpx HTTPStatusError 等会带 response.text — 服务端的错误 body 才有用 for attr in ("response", "body"): obj = getattr(e, attr, None) if obj is not None: try: text = obj.text if hasattr(obj, "text") else str(obj) status = getattr(obj, "status_code", None) if status is not None: print(f"!!! server HTTP {status} body:", file=sys.stderr) else: print(f"!!! server {attr}:", file=sys.stderr) print(text[:4000], file=sys.stderr) except Exception: pass print("!" * 78, file=sys.stderr) traceback.print_exc(file=sys.stderr) return 1 content, usage = extract_text_and_usage(response) finish_reason = usage["finish_reason"] print( f"[info] usage: in={usage['input_tokens']} out={usage['output_tokens']} " f"finish_reason={finish_reason!r}", file=sys.stderr, ) truncated = finish_reason in _TRUNCATION_REASONS if truncated: print("", file=sys.stderr) print("!" * 78, file=sys.stderr) print( f"!!! WARNING: OUTPUT TRUNCATED (finish_reason={finish_reason!r}, " f"output_tokens={usage['output_tokens']} reached max_tokens={effective_max_tokens})", file=sys.stderr, ) print( f"!!! 模型还想继续输出但被 max_tokens 截断了。要拿完整输出,请:", file=sys.stderr, ) print( f"!!! 1) 提高 --max-tokens(当前模型 {args.model} 的理论上限为 " f"{resolve_max_output_tokens(args.model)})", file=sys.stderr, ) print( f"!!! 2) 或缩小输入 / 拆分任务 / 让 prompt 要求更简洁的回答", file=sys.stderr, ) print("!" * 78, file=sys.stderr) print("", file=sys.stderr) _write_output(content, args, mode=args.mode) return 2 if truncated else 0 def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="把若干文件附在 prompt 后面发给 LLM,结果写入 txt 文件。", formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument( "--mode", default="claude-agent", choices=list(DEFAULT_MODELS), help="LLM 调用方式(默认 claude-agent):claude-agent | openrouter | claude-sdk | anthropic。" " claude-agent 开放 Read/Grep/Glob 工具让 LLM 自己按需读文件,能处理大文件;" " 其它三种是一次性调用,文件全文塞 prompt(受 context window 限制)。", ) p.add_argument( "--model", default=None, help=f"模型名。各 mode 默认值:{DEFAULT_MODELS}", ) p.add_argument("--prompt", help="prompt 字符串。传 '-' 从 stdin 读") p.add_argument("--prompt-file", help="从文件读 prompt(与 --prompt 二选一)") p.add_argument("--file", action="append", help="附加到 prompt 后的输入文件,可多次传") p.add_argument( "--output", help="输出文件路径,**只能是相对路径**,相对 <脚本目录>/outputs/ 解析;" " 绝对路径或 '..' 越界会被拒绝。" " 不传则自动写到 outputs/result__.txt。" " 用 --stdout 可强制走 stdout 而非文件。", ) p.add_argument( "--stdout", action="store_true", help="强制把结果打到 stdout 而不是文件(旧默认行为)", ) p.add_argument("--temperature", type=float, default=0.1) p.add_argument( "--max-tokens", type=int, default=None, help="最大输出 token 数。不传则按 model 自动取上限(sonnet-4→64K, opus-4→32K, " "haiku-4→8K, gpt-5→16K, 其他→8K)。仅对 openrouter/anthropic mode 有效。", ) p.add_argument("--show-prompt", action="store_true", help="把拼好的完整 prompt 也打到 stderr,方便调试") # ── claude-agent mode 专用参数 ── p.add_argument( "--allowed-tools", action="append", help="agent mode 允许使用的工具,可多次传。默认 Read/Grep/Glob。" " 可选:Read, Grep, Glob, Bash, Edit, Write, WebFetch, WebSearch", ) p.add_argument( "--max-turns", type=int, default=30, help="agent mode 最大对话轮数(默认 30)", ) return p def main(): """全局兜底:任何未捕获异常都打 traceback + 非零退码,避免出现孤儿 exit code。""" try: args = build_parser().parse_args() if args.model is None: args.model = DEFAULT_MODELS[args.mode] code = asyncio.run(run(args)) sys.exit(code) except KeyboardInterrupt: print("\n[info] interrupted by user (Ctrl+C)", file=sys.stderr) sys.exit(130) except SystemExit: raise except BaseException as e: import traceback print(f"\n!!! UNEXPECTED ERROR: {type(e).__name__}: {e}", file=sys.stderr) traceback.print_exc(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()