| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- """
- 示例(流程对齐版)
- 参考 examples/research/run.py:
- 1. 使用框架 InteractiveController 统一交互流程
- 2. 使用 config.py 管理运行参数
- 3. 保留 create 场景特有的 prompt 注入与详细消息打印
- """
- import argparse
- import asyncio
- import copy
- import json
- import os
- import sys
- from pathlib import Path
- from typing import Any
- # Clash Verge TUN 模式兼容:禁止 httpx/urllib 自动检测系统 HTTP 代理
- os.environ.setdefault("no_proxy", "*")
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.cli import InteractiveController
- from agent.core.presets import AgentPreset, register_preset
- from agent.core.runner import AgentRunner
- from agent.llm import create_openrouter_llm_call
- from agent.llm.prompts import SimplePrompt
- from agent.trace import FileSystemTraceStore, Message, Trace
- from agent.utils import setup_logging
- from examples.create.html import trace_to_html
- # 导入项目配置
- from config import DEBUG, LOG_FILE, LOG_LEVEL, RUN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH
- def _format_json(obj: Any, indent: int = 2) -> str:
- """格式化 JSON 对象为字符串"""
- try:
- return json.dumps(obj, indent=indent, ensure_ascii=False)
- except (TypeError, ValueError):
- return str(obj)
- def _print_message_details(message: Message):
- """完整打印消息的详细信息"""
- print("\n" + "=" * 80)
- print(f"[Message #{message.sequence}] {message.role.upper()}")
- print("=" * 80)
- if message.goal_id:
- print(f"Goal ID: {message.goal_id}")
- if message.parent_sequence is not None:
- print(f"Parent Sequence: {message.parent_sequence}")
- if message.tool_call_id:
- print(f"Tool Call ID: {message.tool_call_id}")
- if message.role == "user":
- print("\n[输入内容]")
- print("-" * 80)
- if isinstance(message.content, str):
- print(message.content)
- else:
- print(_format_json(message.content))
- elif message.role == "assistant":
- content = message.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- if text:
- print("\n[LLM 文本回复]")
- print("-" * 80)
- print(text)
- if tool_calls:
- print(f"\n[工具调用] (共 {len(tool_calls)} 个)")
- print("-" * 80)
- for idx, tc in enumerate(tool_calls, 1):
- func = tc.get("function", {})
- tool_name = func.get("name", "unknown")
- tool_id = tc.get("id", "unknown")
- arguments = func.get("arguments", {})
- print(f"\n工具 #{idx}: {tool_name}")
- print(f" Call ID: {tool_id}")
- print(" 参数:")
- if isinstance(arguments, str):
- try:
- parsed_args = json.loads(arguments)
- print(_format_json(parsed_args, indent=4))
- except json.JSONDecodeError:
- print(f" {arguments}")
- else:
- print(_format_json(arguments, indent=4))
- elif isinstance(content, str):
- print("\n[LLM 文本回复]")
- print("-" * 80)
- print(content)
- else:
- print("\n[内容]")
- print("-" * 80)
- print(_format_json(content))
- if message.finish_reason:
- print(f"\n完成原因: {message.finish_reason}")
- elif message.role == "tool":
- content = message.content
- print("\n[工具执行结果]")
- print("-" * 80)
- if isinstance(content, dict):
- tool_name = content.get("tool_name", "unknown")
- result = content.get("result", content)
- print(f"工具名称: {tool_name}")
- print("\n返回结果:")
- if isinstance(result, str):
- print(result)
- elif isinstance(result, list):
- for idx, item in enumerate(result, 1):
- if isinstance(item, dict) and item.get("type") == "image_url":
- print(f" [{idx}] 图片 (base64, 已省略显示)")
- else:
- print(f" [{idx}] {item}")
- else:
- print(_format_json(result))
- else:
- print(str(content) if content is not None else "(无内容)")
- elif message.role == "system":
- print("\n[系统提示]")
- print("-" * 80)
- if isinstance(message.content, str):
- print(message.content)
- else:
- print(_format_json(message.content))
- if message.prompt_tokens is not None or message.completion_tokens is not None:
- print("\n[Token 使用]")
- print("-" * 80)
- if message.prompt_tokens is not None:
- print(f" 输入 Tokens: {message.prompt_tokens:,}")
- if message.completion_tokens is not None:
- print(f" 输出 Tokens: {message.completion_tokens:,}")
- if message.reasoning_tokens is not None:
- print(f" 推理 Tokens: {message.reasoning_tokens:,}")
- if message.cache_creation_tokens is not None:
- print(f" 缓存创建 Tokens: {message.cache_creation_tokens:,}")
- if message.cache_read_tokens is not None:
- print(f" 缓存读取 Tokens: {message.cache_read_tokens:,}")
- if message.tokens:
- print(f" 总计 Tokens: {message.tokens:,}")
- if message.cost is not None:
- print(f"\n[成本] ${message.cost:.6f}")
- if message.duration_ms is not None:
- print(f"[执行时间] {message.duration_ms}ms")
- print("=" * 80 + "\n")
- def _apply_prompt_placeholders(base_dir: Path, prompt: SimplePrompt):
- """把 PRD 文件内容注入 prompt 占位符。"""
- system_md_path = base_dir / "PRD" / "system.md"
- if system_md_path.exists():
- system_content = system_md_path.read_text(encoding="utf-8")
- if "system" in prompt._messages and "{system}" in prompt._messages["system"]:
- prompt._messages["system"] = prompt._messages["system"].replace("{system}", system_content)
- else:
- print(f" - 警告: system.md 文件不存在: {system_md_path}")
- create_process_md_path = base_dir / "PRD" / "create_process.md"
- if create_process_md_path.exists():
- create_process_content = create_process_md_path.read_text(encoding="utf-8")
- if "system" in prompt._messages and "{create_process}" in prompt._messages["system"]:
- prompt._messages["system"] = prompt._messages["system"].replace("{create_process}", create_process_content)
- print(" - 已替换 create_process.md 内容到 prompt")
- else:
- print(" - 警告: prompt 中未找到 {create_process} 占位符")
- else:
- print(f" - 警告: create_process.md 文件不存在: {create_process_md_path}")
- input_md_path = base_dir / "PRD" / "input.md"
- if input_md_path.exists():
- user_content = input_md_path.read_text(encoding="utf-8")
- if "user" in prompt._messages and "{input}" in prompt._messages["user"]:
- prompt._messages["user"] = prompt._messages["user"].replace("{input}", user_content)
- print(" - 已替换 input.md 内容到 prompt")
- else:
- print(" - 警告: prompt 中未找到 {input} 占位符")
- else:
- print(f" - 警告: input.md 文件不存在: {input_md_path}")
- output_md_path = base_dir / "PRD" / "output.md"
- if output_md_path.exists():
- output_content = output_md_path.read_text(encoding="utf-8")
- if "user" in prompt._messages and "{output}" in prompt._messages["user"]:
- prompt._messages["user"] = prompt._messages["user"].replace("{output}", output_content)
- print(" - 已替换 output.md 内容到 prompt")
- else:
- print(" - 警告: prompt 中未找到 {output} 占位符")
- else:
- print(f" - 警告: output.md 文件不存在: {output_md_path}")
- async def main():
- parser = argparse.ArgumentParser(description="任务 (Agent 模式 + 交互增强)")
- parser.add_argument(
- "--trace",
- type=str,
- default=None,
- help="已有的 Trace ID,用于恢复继续执行(不指定则新建)",
- )
- args = parser.parse_args()
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "create.prompt"
- output_dir = base_dir / "output_1"
- output_dir.mkdir(exist_ok=True)
- setup_logging(level=LOG_LEVEL, file=LOG_FILE)
- print("2. 加载 presets...")
- presets_path = base_dir / "presets.json"
- if presets_path.exists():
- with open(presets_path, "r", encoding="utf-8") as f:
- project_presets = json.load(f)
- for name, cfg in project_presets.items():
- register_preset(name, AgentPreset(**cfg))
- print(f" - 已加载项目 presets: {list(project_presets.keys())}")
- print("3. 加载 prompt...")
- prompt = SimplePrompt(prompt_path)
- _apply_prompt_placeholders(base_dir, prompt)
- print("\n替换后的 prompt:")
- print("=" * 60)
- print("System:")
- print("-" * 60)
- print(prompt._messages.get("system", ""))
- print("=" * 60)
- if "user" in prompt._messages:
- print("\nUser:")
- print("-" * 60)
- print(prompt._messages["user"])
- print("=" * 60)
- print()
- print("4. 构建任务消息...")
- messages = prompt.build_messages()
- print("5. 创建 Agent Runner...")
- print(" - 加载自定义工具: topic_search")
- import examples.create.tool # noqa: F401
- model_from_prompt = prompt.config.get("model")
- model_from_config = RUN_CONFIG.model
- default_model = f"anthropic/{model_from_config}" if "/" not in model_from_config else model_from_config
- model = model_from_prompt or default_model
- skills_dir = str((base_dir / SKILLS_DIR).resolve()) if not Path(SKILLS_DIR).is_absolute() else SKILLS_DIR
- print(f" - Skills 目录: {skills_dir}")
- print(f" - 模型: {model}")
- store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
- runner = AgentRunner(
- trace_store=store,
- llm_call=create_openrouter_llm_call(model=model),
- skills_dir=skills_dir,
- debug=DEBUG,
- )
- interactive = InteractiveController(
- runner=runner,
- store=store,
- enable_stdin_check=True,
- )
- task_name = RUN_CONFIG.name or base_dir.name
- print("=" * 60)
- print(task_name)
- print("=" * 60)
- print("💡 交互提示:")
- print(" - 执行过程中输入 'p' 或 'pause' 暂停并进入交互模式")
- print(" - 执行过程中输入 'q' 或 'quit' 停止执行")
- print("=" * 60)
- print()
- resume_trace_id = args.trace
- if resume_trace_id:
- existing_trace = await store.get_trace(resume_trace_id)
- if not existing_trace:
- print(f"\n错误: Trace 不存在: {resume_trace_id}")
- sys.exit(1)
- print(f"恢复已有 Trace: {resume_trace_id[:8]}...")
- print(f" - 状态: {existing_trace.status}")
- print(f" - 消息数: {existing_trace.total_messages}")
- else:
- print("启动新 Agent...")
- print()
- final_response = ""
- current_trace_id = resume_trace_id
- current_sequence = 0
- should_exit = False
- try:
- run_config = copy.deepcopy(RUN_CONFIG)
- run_config.model = model
- run_config.temperature = float(prompt.config.get("temperature", run_config.temperature))
- run_config.max_iterations = int(prompt.config.get("max_iterations", run_config.max_iterations))
- if resume_trace_id:
- initial_messages = None
- run_config.trace_id = resume_trace_id
- else:
- initial_messages = messages
- run_config.name = "社交媒体内容解构、建构、评估任务"
- while not should_exit:
- if current_trace_id:
- run_config.trace_id = current_trace_id
- final_response = ""
- if current_trace_id and initial_messages is None:
- check_trace = await store.get_trace(current_trace_id)
- if check_trace and check_trace.status in ("completed", "failed"):
- if check_trace.status == "completed":
- print("\n[Trace] ✅ 已完成")
- print(f" - Total messages: {check_trace.total_messages}")
- print(f" - Total cost: ${check_trace.total_cost:.4f}")
- else:
- print(f"\n[Trace] ❌ 已失败: {check_trace.error_message}")
- current_sequence = check_trace.head_sequence
- menu_result = await interactive.show_menu(current_trace_id, current_sequence)
- if menu_result["action"] == "stop":
- break
- if menu_result["action"] == "continue":
- new_messages = menu_result.get("messages", [])
- if new_messages:
- initial_messages = new_messages
- run_config.after_sequence = menu_result.get("after_sequence")
- else:
- initial_messages = []
- run_config.after_sequence = None
- continue
- break
- initial_messages = []
- print(f"{'▶️ 开始执行...' if not current_trace_id else '▶️ 继续执行...'}")
- paused = False
- try:
- async for item in runner.run(messages=initial_messages, config=run_config):
- cmd = interactive.check_stdin()
- if cmd == "pause":
- print("\n⏸️ 正在暂停执行...")
- if current_trace_id:
- await runner.stop(current_trace_id)
- await asyncio.sleep(0.5)
- menu_result = await interactive.show_menu(current_trace_id, current_sequence)
- if menu_result["action"] == "stop":
- should_exit = True
- paused = True
- break
- if menu_result["action"] == "continue":
- new_messages = menu_result.get("messages", [])
- if new_messages:
- initial_messages = new_messages
- after_seq = menu_result.get("after_sequence")
- if after_seq is not None:
- run_config.after_sequence = after_seq
- else:
- initial_messages = []
- run_config.after_sequence = None
- paused = True
- break
- elif cmd == "quit":
- print("\n🛑 用户请求停止...")
- if current_trace_id:
- await runner.stop(current_trace_id)
- should_exit = True
- break
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- print(f"[Trace] 开始: {item.trace_id[:8]}...")
- elif item.status == "completed":
- print("\n[Trace] ✅ 完成")
- print(f" - Total messages: {item.total_messages}")
- print(f" - Total tokens: {item.total_tokens}")
- print(f" - Total cost: ${item.total_cost:.4f}")
- elif item.status == "failed":
- print(f"\n[Trace] ❌ 失败: {item.error_message}")
- elif item.status == "stopped":
- print("\n[Trace] ⏸️ 已停止")
- elif isinstance(item, Message):
- current_sequence = item.sequence
- _print_message_details(item)
- if item.role == "assistant":
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- if text and not tool_calls:
- final_response = text
- except Exception as e:
- print(f"\n执行出错: {e}")
- import traceback
- traceback.print_exc()
- if paused:
- if should_exit:
- break
- continue
- if should_exit:
- break
- if current_trace_id:
- menu_result = await interactive.show_menu(current_trace_id, current_sequence)
- if menu_result["action"] == "stop":
- break
- if menu_result["action"] == "continue":
- new_messages = menu_result.get("messages", [])
- if new_messages:
- initial_messages = new_messages
- run_config.after_sequence = menu_result.get("after_sequence")
- else:
- initial_messages = []
- run_config.after_sequence = None
- continue
- break
- except KeyboardInterrupt:
- print("\n\n用户中断 (Ctrl+C)")
- if current_trace_id:
- await runner.stop(current_trace_id)
- finally:
- if current_trace_id:
- try:
- html_path = store.base_path / current_trace_id / "messages.html"
- await trace_to_html(current_trace_id, html_path, base_path=str(store.base_path))
- print(f"\n✓ Messages 可视化已保存: {html_path}")
- except Exception as e:
- print(f"\n⚠ 生成 HTML 失败: {e}")
- if final_response:
- print()
- print("=" * 60)
- print("Agent 响应:")
- print("=" * 60)
- print(final_response)
- print("=" * 60)
- print()
- output_file = output_dir / "result.txt"
- with open(output_file, "w", encoding="utf-8") as f:
- f.write(final_response)
- print(f"✓ 结果已保存到: {output_file}")
- print()
- if current_trace_id:
- html_path = store.base_path / current_trace_id / "messages.html"
- print("=" * 60)
- print("可视化:")
- print("=" * 60)
- print(f"1. 本地 HTML: {html_path}")
- print()
- print("2. API Server:")
- print(" python3 api_server.py")
- print(" http://localhost:8000/api/traces")
- print()
- print(f"3. Trace ID: {current_trace_id}")
- print("=" * 60)
- if __name__ == "__main__":
- asyncio.run(main())
|