""" 新搜索测试 - 测试 Research Agent 与 Knowledge Manager 的 IM 通信 """ import argparse import os import sys import asyncio from pathlib import Path os.environ.setdefault("no_proxy", "*") sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner, RunConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.cli import InteractiveController from agent.utils import setup_logging from agent.tools.builtin.browser.baseClass import init_browser_session, kill_browser_session from config import ( RUN_CONFIG, SKILLS_DIR, TRACE_STORE_PATH, DEBUG, LOG_LEVEL, LOG_FILE, BROWSER_TYPE, HEADLESS, OUTPUT_DIR, IM_ENABLED, IM_CONTACT_ID, IM_SERVER_URL, IM_WINDOW_MODE, IM_NOTIFY_INTERVAL, KNOWLEDGE_MANAGER_ENABLED, KNOWLEDGE_MANAGER_CONTACT_ID, ) async def main(): parser = argparse.ArgumentParser(description="新搜索测试(KM 通信)") parser.add_argument("--trace", type=str, default=None, help="恢复 Trace ID") args = parser.parse_args() base_dir = Path(__file__).parent project_root = base_dir.parent.parent prompt_path = base_dir / "new_search.prompt" output_dir = project_root / OUTPUT_DIR output_dir.mkdir(parents=True, exist_ok=True) # 1. 日志 setup_logging(level=LOG_LEVEL, file=LOG_FILE) # 2. Prompt print("1. 加载 prompt...") prompt = SimplePrompt(prompt_path) messages = prompt.build_messages(output_dir=str(output_dir)) # 3. 浏览器 print("2. 初始化浏览器...") await init_browser_session(browser_type=BROWSER_TYPE, headless=HEADLESS, url="https://www.google.com/", profile_name="") print(" ✅ 浏览器就绪\n") # 4. IM Client + Knowledge Manager km_task = None if IM_ENABLED: from agent.tools.builtin.im.chat import im_setup, im_open_window print("3. 初始化 IM Client...") print(f" - 身份: {IM_CONTACT_ID}, 服务器: {IM_SERVER_URL}") result = await im_setup( contact_id=IM_CONTACT_ID, server_url=IM_SERVER_URL, notify_interval=IM_NOTIFY_INTERVAL, ) print(f" ✅ {result.output}") if IM_WINDOW_MODE: window_result = await im_open_window(contact_id=IM_CONTACT_ID) print(f" ✅ {window_result.output}\n") if KNOWLEDGE_MANAGER_ENABLED: print("4. 启动 Knowledge Manager...") print(f" - Contact ID: {KNOWLEDGE_MANAGER_CONTACT_ID}") try: sys.path.insert(0, str(Path(__file__).parent.parent.parent / "knowhub")) from agents.knowledge_manager import start_knowledge_manager km_task = asyncio.create_task(start_knowledge_manager( contact_id=KNOWLEDGE_MANAGER_CONTACT_ID, server_url=IM_SERVER_URL, chat_id="main" )) # 等待一下让 KM 连接完成 await asyncio.sleep(2) print(f" ✅ Knowledge Manager 已启动\n") except Exception as e: print(f" ⚠️ 启动失败: {e}\n") # 5. Agent Runner print("5. 创建 Agent Runner...") prompt_model = prompt.config.get("model", None) model_for_llm = prompt_model or RUN_CONFIG.model print(f" - 模型: {model_for_llm}") store = FileSystemTraceStore(base_path=TRACE_STORE_PATH) runner = AgentRunner( trace_store=store, llm_call=create_qwen_llm_call(model=model_for_llm), skills_dir=SKILLS_DIR, debug=DEBUG, logger_name="agents.research_agent" ) interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True) runner.stdin_check = interactive.check_stdin # 6. 执行 task_name = RUN_CONFIG.name or base_dir.name print("=" * 60) print(f"{task_name}") print("=" * 60) print("💡 输入 'p' 暂停,'q' 退出") print("=" * 60) print() run_config = RUN_CONFIG current_trace_id = args.trace current_sequence = 0 # 注入 IM 配置到 context(用于周期性通知检查) if IM_ENABLED: run_config.context["im_config"] = { "contact_id": IM_CONTACT_ID, "chat_id": "main" } if current_trace_id: run_config.trace_id = current_trace_id initial_messages = None else: initial_messages = messages try: async for item in runner.run(messages=initial_messages, config=run_config): cmd = interactive.check_stdin() if cmd == 'quit': print("\n🛑 停止...") if current_trace_id: await runner.stop(current_trace_id) break elif cmd == 'pause': print("\n⏸️ 暂停...") if current_trace_id: await runner.stop(current_trace_id) break if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": print(f"[Trace] 开始: {item.trace_id[:8]}...") elif item.status == "completed": print(f"\n[Trace] ✅ 完成 (消息: {item.total_messages}, 费用: ${item.total_cost:.4f})") elif item.status == "failed": print(f"\n[Trace] ❌ 失败: {item.error_message}") elif isinstance(item, Message): current_sequence = item.sequence if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") if text and not tool_calls: print(f"\n[Response] {text}") elif text: preview = text[:150] + "..." if len(text) > 150 else text print(f"[Assistant] {preview}") elif item.role == "tool": content = item.content tool_name = content.get("tool_name", "unknown") if isinstance(content, dict) else "unknown" desc = item.description or "" if desc and desc != tool_name: desc = desc[:80] print(f"[Tool] ✅ {tool_name}: {desc}...") else: print(f"[Tool] ✅ {tool_name}") except KeyboardInterrupt: print("\n\n用户中断 (Ctrl+C)") if current_trace_id: await runner.stop(current_trace_id) finally: if km_task and not km_task.done(): print("正在关闭 Knowledge Manager...") km_task.cancel() try: await km_task except asyncio.CancelledError: pass try: await kill_browser_session() except Exception: pass if current_trace_id: print(f"\nTrace ID: {current_trace_id}") if __name__ == "__main__": asyncio.run(main())