| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- import asyncio
- import json
- import logging
- import sys
- from pathlib import Path
- from typing import Dict, Any, Optional
- # 确保项目路径可用
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from agent.core.runner import AgentRunner
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.prompts import SimplePrompt
- logger = logging.getLogger(__name__)
- # 文件保存 trace 映射关系,持久化续跑
- TRACE_MAP_FILE = Path(".cache/research_trace_map.json")
- def _load_trace_map() -> Dict[str, str]:
- if TRACE_MAP_FILE.exists():
- return json.loads(TRACE_MAP_FILE.read_text(encoding="utf-8"))
- return {}
- def _save_trace_map(mapping: Dict[str, str]):
- TRACE_MAP_FILE.parent.mkdir(parents=True, exist_ok=True)
- TRACE_MAP_FILE.write_text(json.dumps(mapping, indent=2, ensure_ascii=False), encoding="utf-8")
- def get_research_trace_id(caller_trace_id: str) -> Optional[str]:
- """根据调用方 trace_id 查找对应的 Research trace_id"""
- if not caller_trace_id:
- return None
- mapping = _load_trace_map()
- return mapping.get(caller_trace_id)
- def set_research_trace_id(caller_trace_id: str, research_trace_id: str):
- """记录映射"""
- if not caller_trace_id:
- return
- mapping = _load_trace_map()
- mapping[caller_trace_id] = research_trace_id
- _save_trace_map(mapping)
- # ===== 单例 Runner =====
- _runner: Optional[AgentRunner] = None
- _prompt_messages = None
- _initialized = False
- def _ensure_initialized():
- """延迟初始化 Runner 和 Prompt(首次调用时执行)"""
- global _runner, _prompt_messages, _initialized
- if _initialized:
- return
- _initialized = True
- # 初始化 Runner。工具会自动从 __file__.parent.parent.parent / agent / tools 加载吗?
- # 根据用户环境,内置通用工具大概是在 agent/tools,或者自动全局识别
- # 在这里,我们将 skills_dir 也设为此处寻找特定技能,如果需要的话可以扩展。
- skills_dir = Path(__file__).parent / "skills"
-
- _runner = AgentRunner(
- trace_store=FileSystemTraceStore(base_path=".trace"),
- llm_call=create_qwen_llm_call(model="qwen3.5-plus"), # prompt使用sonnet,但如果想和系统对齐可保留qwen,按照之前的设定
- skills_dir=str(skills_dir) if skills_dir.exists() else None,
- debug=True,
- logger_name="agents.research",
- )
- prompt_path = Path(__file__).parent / "research_agent.prompt"
- if prompt_path.exists():
- prompt = SimplePrompt(prompt_path)
- _prompt_messages = prompt.build_messages()
-
- # 尝试通过 prompt meta 获取模型设置
- if getattr(prompt, "meta", None) and prompt.meta.get("model"):
- model_name = prompt.meta["model"]
- _runner.llm_call = create_qwen_llm_call(model=model_name)
- else:
- _prompt_messages = []
- logger.warning(f"Research prompt 文件不存在: {prompt_path}")
- logger.info("✓ Research Agent 已初始化")
- # ===== 核心方法 =====
- async def research(query: str, caller_trace_id: str = "") -> Dict[str, Any]:
- """
- 同步执行深度调研。运行 Research Agent,返回调查结果。
- Args:
- query: 用户设定的研究主题或查询
- caller_trace_id: 调用方 trace_id,用于续跑
- Returns:
- {"response": str, "source_ids": [str], "sources": [dict]}
- """
- _ensure_initialized()
- # 初始化云端无头浏览器(因为是部署在线上,必须防卡顿并自动分配独立环境)
- try:
- from agent.tools.builtin.browser import init_browser_session
- await init_browser_session(browser_type="cloud")
- except Exception as e:
- logger.warning(f"Failed to init cloud browser: {e}")
- # 查找或创建 trace
- research_trace_id = get_research_trace_id(caller_trace_id)
- from agent.core.runner import RunConfig
- config = RunConfig(
- model="qwen3.5-plus",
- temperature=0.3,
- max_iterations=200,
- tool_groups=["core", "content", "browser"],
- skills=["planning", "research", "browser"],
- )
- config.trace_id = research_trace_id # None = 新建, 有值 = 续跑
- # 构建消息
- content = f"[RESEARCH TASK] {query}"
- if research_trace_id is None:
- messages = _prompt_messages + [{"role": "user", "content": content}]
- else:
- messages = [{"role": "user", "content": content}]
- # 运行 Agent
- response_text = ""
- actual_trace_id = None
- async for item in _runner.run(
- messages=messages,
- config=config,
- ):
- if isinstance(item, Trace):
- actual_trace_id = item.trace_id
- elif isinstance(item, Message):
- if item.role == "assistant":
- msg_content = item.content
- if isinstance(msg_content, dict):
- text = msg_content.get("text", "")
- if text:
- response_text = text
- elif isinstance(msg_content, str) and msg_content:
- response_text = msg_content
- # 记录 trace 映射
- if actual_trace_id and caller_trace_id:
- set_research_trace_id(caller_trace_id, actual_trace_id)
- return {
- "response": response_text,
- "source_ids": [],
- "sources": [],
- }
|