| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- """
- reflect 工具 — 轻量反思
- 用普通模型快速评估当前搜索结果,输出简短思路和下一步建议。
- 继承调用者的完整对话历史作为上下文。
- """
- import logging
- import os
- from typing import Any, Dict, List, Optional
- from agent.llm.qwen import qwen_llm_call
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- logger = logging.getLogger(__name__)
- # 反思模型配置(普通模式,非 thinking)
- REFLECT_MODEL = os.getenv("REFLECT_MODEL", "qwen-plus")
- REFLECT_SYSTEM_PROMPT = """你是调研助手。根据对话历史和当前搜索结果,简要回答:
- 1. 本轮搜到了什么有价值的信息?缺了什么?
- 2. 下一轮搜什么?给出 1-3 个具体 query 词
- 3. 是否需要换搜索渠道或角度?
- 要求:直接输出思路,不要分析框架,不要长篇大论。3-5 句话即可。"""
- REFLECT_USER_TEMPLATE = """需求:{question}
- 本轮搜索结果:
- {findings}
- 简要反思,给出下一步 query。"""
- async def _fetch_caller_history(context: Optional[Dict[str, Any]]) -> List[Dict]:
- """从 context 中获取调用者的历史消息队列"""
- if not context:
- return []
- store = context.get("store")
- trace_id = context.get("trace_id")
- if not store or not trace_id:
- return []
- try:
- trace = await store.get_trace(trace_id)
- if not trace:
- return []
- messages = await store.get_main_path_messages(
- trace_id, trace.head_sequence
- )
- # 转为 LLM 消息格式
- history = []
- for msg in messages:
- llm_dict = msg.to_llm_dict()
- if llm_dict:
- history.append(llm_dict)
- logger.info(f"reflect: 获取到 {len(history)} 条历史消息")
- return history
- except Exception as e:
- logger.warning(f"reflect: 获取历史消息失败: {e}")
- return []
- @tool(
- description="轻量反思:快速评估本轮搜索结果,输出简短思路和下一步 query 建议",
- hidden_params=["context"],
- )
- async def reflect(
- question: str,
- findings: str,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 对本轮搜索结果进行轻量反思,给出下一步搜索思路。
- Args:
- question: 原始调研问题/需求描述
- findings: 本轮搜索结果摘要
- """
- # 获取调用者的历史消息
- # context 可能是 ToolContextImpl(有 .context 属性)或直接是 dict
- if context is None:
- caller_context = None
- elif isinstance(context, dict):
- caller_context = context
- else:
- caller_context = getattr(context, 'context', None)
- caller_history = await _fetch_caller_history(caller_context)
- # 构建消息:system + 调用者历史 + 反思请求
- messages = [{"role": "system", "content": REFLECT_SYSTEM_PROMPT}]
- if caller_history:
- messages.extend(caller_history)
- messages.append({
- "role": "user",
- "content": REFLECT_USER_TEMPLATE.format(
- question=question,
- findings=findings,
- ),
- })
- try:
- result = await qwen_llm_call(
- messages=messages,
- model=REFLECT_MODEL,
- temperature=0.2,
- )
- content = result["content"]
- msg_count = len(caller_history)
- cost = result.get("cost", 0.0)
- reasoning_tokens = result.get("reasoning_tokens", 0)
- return ToolResult(
- title=f"反思完成 (model: {REFLECT_MODEL}, 继承 {msg_count} 条历史)",
- output=content,
- tool_usage={
- "model": REFLECT_MODEL,
- "prompt_tokens": result.get("prompt_tokens", 0),
- "completion_tokens": result.get("completion_tokens", 0),
- "reasoning_tokens": reasoning_tokens,
- "cost": cost,
- },
- )
- except Exception as e:
- return ToolResult(
- title="reflect 失败",
- output=f"调用反思模型出错: {str(e)}",
- )
|