|
|
@@ -0,0 +1,1908 @@
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import argparse
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+from agents import Agent, Runner
|
|
|
+from lib.my_trace import set_trace
|
|
|
+from typing import Literal
|
|
|
+from pydantic import BaseModel, Field
|
|
|
+
|
|
|
+from lib.utils import read_file_as_string
|
|
|
+from lib.client import get_model
|
|
|
+MODEL_NAME = "google/gemini-2.5-flash"
|
|
|
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
|
|
|
+from script.search.xiaohongshu_search import XiaohongshuSearch
|
|
|
+
|
|
|
+
|
|
|
+class RunContext(BaseModel):
|
|
|
+ version: str = Field(..., description="当前运行的脚本版本(文件名)")
|
|
|
+ input_files: dict[str, str] = Field(..., description="输入文件路径映射")
|
|
|
+ q_with_context: str
|
|
|
+ q_context: str
|
|
|
+ q: str
|
|
|
+ log_url: str
|
|
|
+ log_dir: str
|
|
|
+
|
|
|
+ # 步骤化日志
|
|
|
+ steps: list[dict] = Field(default_factory=list, description="执行步骤的详细记录")
|
|
|
+
|
|
|
+ # 探索阶段记录(保留用于向后兼容)
|
|
|
+ keywords: list[str] | None = Field(default=None, description="提取的关键词")
|
|
|
+ exploration_levels: list[dict] = Field(default_factory=list, description="每一层的探索结果")
|
|
|
+ level_analyses: list[dict] = Field(default_factory=list, description="每一层的主Agent分析")
|
|
|
+
|
|
|
+ # 最终结果
|
|
|
+ final_candidates: list[str] | None = Field(default=None, description="最终选出的候选query")
|
|
|
+ evaluation_results: list[dict] | None = Field(default=None, description="候选query的评估结果")
|
|
|
+ optimization_result: dict | None = Field(default=None, description="最终优化结果对象")
|
|
|
+ final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)")
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# Agent 1: 关键词提取专家
|
|
|
+# ============================================================================
|
|
|
+keyword_extraction_instructions = """
|
|
|
+你是关键词提取专家。给定一个搜索问题(含上下文),提取出**最细粒度的关键概念**。
|
|
|
+
|
|
|
+## 提取原则
|
|
|
+
|
|
|
+1. **细粒度优先**:拆分成最小的有意义单元
|
|
|
+ - 不要保留完整的长句
|
|
|
+ - 拆分成独立的、有搜索意义的词或短语
|
|
|
+
|
|
|
+2. **保留核心维度**:
|
|
|
+ - 地域/对象
|
|
|
+ - 时间
|
|
|
+ - 行为/意图:获取、教程、推荐、如何等
|
|
|
+ - 主题/领域
|
|
|
+ - 质量/属性
|
|
|
+
|
|
|
+3. **去掉无意义的虚词**:的、吗、呢等
|
|
|
+
|
|
|
+4. **保留领域专有词**:不要过度拆分专业术语
|
|
|
+ - 如果是常见的组合词,保持完整
|
|
|
+
|
|
|
+## 输出要求
|
|
|
+
|
|
|
+输出关键词列表,按重要性排序(最核心的在前)。
|
|
|
+""".strip()
|
|
|
+
|
|
|
+class KeywordList(BaseModel):
|
|
|
+ """关键词列表"""
|
|
|
+ keywords: list[str] = Field(..., description="提取的关键词,按重要性排序")
|
|
|
+ reasoning: str = Field(..., description="提取理由")
|
|
|
+
|
|
|
+keyword_extractor = Agent[None](
|
|
|
+ name="关键词提取专家",
|
|
|
+ instructions=keyword_extraction_instructions,
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ output_type=KeywordList,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# Agent 2: 层级探索分析专家
|
|
|
+# ============================================================================
|
|
|
+level_analysis_instructions = """
|
|
|
+你是搜索空间探索分析专家。基于当前层级的探索结果,决定下一步行动。
|
|
|
+
|
|
|
+## 你的任务
|
|
|
+
|
|
|
+分析当前已探索的词汇空间,判断:
|
|
|
+1. **发现了什么有价值的信号?**
|
|
|
+2. **是否已经可以评估候选了?**
|
|
|
+3. **如果还不够,下一层应该探索什么组合?**
|
|
|
+
|
|
|
+## 分析维度
|
|
|
+
|
|
|
+### 1. 信号识别(最重要)
|
|
|
+
|
|
|
+看推荐词里**出现了什么主题**:
|
|
|
+
|
|
|
+**关键问题:**
|
|
|
+- 哪些推荐词**最接近原始需求**?
|
|
|
+- 哪些推荐词**揭示了有价值的方向**(即使不完全匹配)?
|
|
|
+- 哪些推荐词可以作为**下一层探索的桥梁**?
|
|
|
+- 系统对哪些概念理解得好?哪些理解偏了?
|
|
|
+
|
|
|
+### 2. 组合策略
|
|
|
+
|
|
|
+基于发现的信号,设计下一层探索:
|
|
|
+
|
|
|
+**组合类型:**
|
|
|
+
|
|
|
+a) **关键词直接组合**
|
|
|
+ - 两个关键词组合成新query
|
|
|
+
|
|
|
+b) **利用推荐词作为桥梁**(重要!)
|
|
|
+ - 发现某个推荐词很有价值 → 直接探索这个推荐词
|
|
|
+ - 或在推荐词基础上加其他关键词
|
|
|
+
|
|
|
+c) **跨层级组合**
|
|
|
+ - 结合多层发现的有价值推荐词
|
|
|
+ - 组合成更复杂的query
|
|
|
+
|
|
|
+### 3. 停止条件
|
|
|
+
|
|
|
+**何时可以评估候选?**
|
|
|
+
|
|
|
+满足以下之一:
|
|
|
+- 推荐词中出现了**明确包含原始需求多个核心要素的query**
|
|
|
+- 已经探索到**足够复杂的组合**(3-4个关键词),且推荐词相关
|
|
|
+- 探索了**3-4层**,信息已经足够丰富
|
|
|
+
|
|
|
+**何时继续探索?**
|
|
|
+- 当前推荐词太泛,没有接近原始需求
|
|
|
+- 发现了有价值的信号,但需要进一步组合验证
|
|
|
+- 层数还少(< 3层)
|
|
|
+
|
|
|
+## 输出要求
|
|
|
+
|
|
|
+### 1. key_findings
|
|
|
+总结当前层发现的关键信息,包括:
|
|
|
+- 哪些推荐词最有价值?
|
|
|
+- 系统对哪些概念理解得好/不好?
|
|
|
+- 发现了什么意外的方向?
|
|
|
+
|
|
|
+### 2. promising_signals
|
|
|
+列出最有价值的推荐词(来自任何已探索的query),每个说明为什么有价值
|
|
|
+
|
|
|
+### 3. should_evaluate_now
|
|
|
+是否已经可以开始评估候选了?true/false
|
|
|
+
|
|
|
+### 4. candidates_to_evaluate
|
|
|
+如果should_evaluate_now=true,列出应该评估的候选query
|
|
|
+- 可以是推荐词
|
|
|
+- 可以是自己构造的组合
|
|
|
+
|
|
|
+### 5. next_combinations
|
|
|
+如果should_evaluate_now=false,列出下一层应该探索的query组合
|
|
|
+
|
|
|
+### 6. reasoning
|
|
|
+详细的推理过程
|
|
|
+
|
|
|
+## 重要原则
|
|
|
+
|
|
|
+1. **不要过早评估**:至少探索2层,除非第一层就发现了完美匹配
|
|
|
+2. **充分利用推荐词**:推荐词是系统给的提示,要善用
|
|
|
+3. **保持探索方向的多样性**:不要只盯着一个方向
|
|
|
+4. **识别死胡同**:如果某个方向的推荐词一直不相关,果断放弃
|
|
|
+""".strip()
|
|
|
+
|
|
|
+class PromisingSignal(BaseModel):
|
|
|
+ """有价值的推荐词信号"""
|
|
|
+ query: str = Field(..., description="推荐词")
|
|
|
+ from_level: int = Field(..., description="来自哪一层")
|
|
|
+ reason: str = Field(..., description="为什么有价值")
|
|
|
+
|
|
|
+class LevelAnalysis(BaseModel):
|
|
|
+ """层级分析结果"""
|
|
|
+ key_findings: str = Field(..., description="当前层的关键发现")
|
|
|
+ promising_signals: list[PromisingSignal] = Field(..., description="有价值的推荐词信号")
|
|
|
+ should_evaluate_now: bool = Field(..., description="是否应该开始评估候选")
|
|
|
+ candidates_to_evaluate: list[str] = Field(default_factory=list, description="如果should_evaluate_now=true,要评估的候选query列表")
|
|
|
+ next_combinations: list[str] = Field(default_factory=list, description="如果should_evaluate_now=false,下一层要探索的query组合")
|
|
|
+ reasoning: str = Field(..., description="详细的推理过程")
|
|
|
+
|
|
|
+level_analyzer = Agent[None](
|
|
|
+ name="层级探索分析专家",
|
|
|
+ instructions=level_analysis_instructions,
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ output_type=LevelAnalysis,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# Agent 3: 评估专家(简化版:意图匹配 + 相关性评分)
|
|
|
+# ============================================================================
|
|
|
+eval_instructions = """
|
|
|
+你是搜索query评估专家。给定原始问题和推荐query,评估两个维度。
|
|
|
+
|
|
|
+## 评估目标
|
|
|
+
|
|
|
+用这个推荐query搜索,能否找到满足原始需求的内容?
|
|
|
+
|
|
|
+## 两层评分
|
|
|
+
|
|
|
+### 1. intent_match(意图匹配)= true/false
|
|
|
+
|
|
|
+推荐query的**使用意图**是否与原问题一致?
|
|
|
+
|
|
|
+**核心问题:用户搜索这个推荐词,想做什么?**
|
|
|
+
|
|
|
+**判断标准:**
|
|
|
+- 原问题意图:找方法?找教程?找资源/素材?找工具?看作品?
|
|
|
+- 推荐词意图:如果用户搜索这个词,他的目的是什么?
|
|
|
+
|
|
|
+**示例:**
|
|
|
+- 原问题意图="找素材"
|
|
|
+ - ✅ true: "素材下载"、"素材网站"、"免费素材"(都是获取素材)
|
|
|
+ - ❌ false: "素材制作教程"、"如何制作素材"(意图变成学习了)
|
|
|
+
|
|
|
+- 原问题意图="学教程"
|
|
|
+ - ✅ true: "教程视频"、"教学步骤"、"入门指南"
|
|
|
+ - ❌ false: "成品展示"、"作品欣赏"(意图变成看作品了)
|
|
|
+
|
|
|
+**评分:**
|
|
|
+- true = 意图一致,搜索推荐词能达到原问题的目的
|
|
|
+- false = 意图改变,搜索推荐词无法达到原问题的目的
|
|
|
+
|
|
|
+### 2. relevance_score(相关性)= 0-1 连续分数
|
|
|
+
|
|
|
+推荐query在**主题、要素、属性**上与原问题的相关程度?
|
|
|
+
|
|
|
+**评估维度:**
|
|
|
+- 主题相关:核心主题是否匹配?(如:摄影、旅游、美食)
|
|
|
+- 要素覆盖:关键要素保留了多少?(如:地域、时间、对象、工具)
|
|
|
+- 属性匹配:质量、风格、特色等属性是否保留?
|
|
|
+
|
|
|
+**评分参考:**
|
|
|
+- 0.9-1.0 = 几乎完美匹配,所有核心要素都保留
|
|
|
+- 0.7-0.8 = 高度相关,核心要素保留,少数次要要素缺失
|
|
|
+- 0.5-0.6 = 中度相关,主题匹配但多个要素缺失
|
|
|
+- 0.3-0.4 = 低度相关,只有部分主题相关
|
|
|
+- 0-0.2 = 基本不相关
|
|
|
+
|
|
|
+## 评估策略
|
|
|
+
|
|
|
+1. **先判断 intent_match**:意图不匹配直接 false,无论相关性多高
|
|
|
+2. **再评估 relevance_score**:在意图匹配的前提下,计算相关性
|
|
|
+
|
|
|
+## 输出要求
|
|
|
+
|
|
|
+- intent_match: true/false
|
|
|
+- relevance_score: 0-1 的浮点数
|
|
|
+- reason: 详细的评估理由,需要说明:
|
|
|
+ - 原问题的意图是什么
|
|
|
+ - 推荐词的意图是什么
|
|
|
+ - 为什么判断意图匹配/不匹配
|
|
|
+ - 相关性分数的依据(哪些要素保留/缺失)
|
|
|
+""".strip()
|
|
|
+
|
|
|
+class RelevanceEvaluation(BaseModel):
|
|
|
+ """评估反馈模型 - 意图匹配 + 相关性"""
|
|
|
+ intent_match: bool = Field(..., description="意图是否匹配")
|
|
|
+ relevance_score: float = Field(..., description="相关性分数 0-1,分数越高越相关")
|
|
|
+ reason: str = Field(..., description="评估理由,需说明意图判断和相关性依据")
|
|
|
+
|
|
|
+evaluator = Agent[None](
|
|
|
+ name="评估专家",
|
|
|
+ instructions=eval_instructions,
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ output_type=RelevanceEvaluation,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# Agent 4: 单个帖子需求满足度评估专家
|
|
|
+# ============================================================================
|
|
|
+note_evaluation_instructions = """
|
|
|
+你是帖子需求满足度评估专家。给定原始问题和一个搜索到的帖子(标题+描述),判断这个帖子能否满足用户的需求。
|
|
|
+
|
|
|
+## 你的任务
|
|
|
+
|
|
|
+评估单个帖子的标题和描述,判断用户点开这个帖子后,能否找到满足原始需求的内容。
|
|
|
+
|
|
|
+## 评估维度
|
|
|
+
|
|
|
+### 1. 标题相关性(title_relevance)= 0-1 连续分数
|
|
|
+
|
|
|
+**评估标准:**
|
|
|
+- 标题是否与原始问题的主题相关?
|
|
|
+- 标题是否包含原始问题的关键要素?
|
|
|
+- 标题是否表明内容能解决用户的问题?
|
|
|
+
|
|
|
+**评分参考:**
|
|
|
+- 0.9-1.0 = 标题高度相关,明确表明能解决问题
|
|
|
+- 0.7-0.8 = 标题相关,包含核心要素
|
|
|
+- 0.5-0.6 = 标题部分相关,有关联但不明确
|
|
|
+- 0.3-0.4 = 标题相关性较低
|
|
|
+- 0-0.2 = 标题基本不相关
|
|
|
+
|
|
|
+### 2. 内容预期(content_expectation)= 0-1 连续分数
|
|
|
+
|
|
|
+**评估标准:**
|
|
|
+- 从描述看,内容是否可能包含用户需要的信息?
|
|
|
+- 描述是否展示了相关的要素或细节?
|
|
|
+- 描述的方向是否与用户需求一致?
|
|
|
+
|
|
|
+**评分参考:**
|
|
|
+- 0.9-1.0 = 描述明确表明内容高度符合需求
|
|
|
+- 0.7-0.8 = 描述显示内容可能符合需求
|
|
|
+- 0.5-0.6 = 描述有一定相关性,但不确定
|
|
|
+- 0.3-0.4 = 描述相关性较低
|
|
|
+- 0-0.2 = 描述基本不相关
|
|
|
+
|
|
|
+### 3. 需求满足度(need_satisfaction)= true/false
|
|
|
+
|
|
|
+**核心问题:用户点开这个帖子后,能否找到他需要的内容?**
|
|
|
+
|
|
|
+**判断标准:**
|
|
|
+- 综合标题和描述,内容是否大概率能满足需求?
|
|
|
+- 如果 title_relevance >= 0.7 且 content_expectation >= 0.6,一般判断为 true
|
|
|
+- 否则判断为 false
|
|
|
+
|
|
|
+### 4. 综合置信度(confidence_score)= 0-1 连续分数
|
|
|
+
|
|
|
+**计算方式:**
|
|
|
+- 可以是 title_relevance 和 content_expectation 的加权平均
|
|
|
+- 标题权重通常更高(如 0.6 * title + 0.4 * content)
|
|
|
+
|
|
|
+## 输出要求
|
|
|
+
|
|
|
+- title_relevance: 0-1 的浮点数
|
|
|
+- content_expectation: 0-1 的浮点数
|
|
|
+- need_satisfaction: true/false
|
|
|
+- confidence_score: 0-1 的浮点数
|
|
|
+- reason: 详细的评估理由,需要说明:
|
|
|
+ - 标题与原问题的相关性分析
|
|
|
+ - 描述的内容预期分析
|
|
|
+ - 为什么判断能/不能满足需求
|
|
|
+ - 置信度分数的依据
|
|
|
+
|
|
|
+## 重要原则
|
|
|
+
|
|
|
+1. **独立评估**:只评估这一个帖子,不考虑其他帖子
|
|
|
+2. **用户视角**:问"我会点开这个帖子吗?点开后能找到答案吗?"
|
|
|
+3. **标题优先**:标题是用户决定是否点击的关键
|
|
|
+4. **保守判断**:不确定时,倾向于给较低的分数
|
|
|
+""".strip()
|
|
|
+
|
|
|
+class NoteEvaluation(BaseModel):
|
|
|
+ """单个帖子评估模型"""
|
|
|
+ title_relevance: float = Field(..., description="标题相关性 0-1")
|
|
|
+ content_expectation: float = Field(..., description="内容预期 0-1")
|
|
|
+ need_satisfaction: bool = Field(..., description="是否满足需求")
|
|
|
+ confidence_score: float = Field(..., description="综合置信度 0-1")
|
|
|
+ reason: str = Field(..., description="详细的评估理由")
|
|
|
+
|
|
|
+note_evaluator = Agent[None](
|
|
|
+ name="帖子需求满足度评估专家",
|
|
|
+ instructions=note_evaluation_instructions,
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ output_type=NoteEvaluation,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# Agent 5: 答案生成专家
|
|
|
+# ============================================================================
|
|
|
+answer_generation_instructions = """
|
|
|
+你是答案生成专家。基于一组满足需求的帖子,为原始问题生成一个全面、准确、有价值的答案。
|
|
|
+
|
|
|
+## 你的任务
|
|
|
+
|
|
|
+根据用户的原始问题和一组相关帖子(包含标题、描述、置信度评分),生成一个高质量的答案。
|
|
|
+
|
|
|
+## 输入信息
|
|
|
+
|
|
|
+1. **原始问题**:用户提出的具体问题
|
|
|
+2. **相关帖子列表**:每个帖子包含
|
|
|
+ - 序号(索引)
|
|
|
+ - 标题
|
|
|
+ - 描述
|
|
|
+ - 置信度分数
|
|
|
+
|
|
|
+## 答案要求
|
|
|
+
|
|
|
+### 1. 内容要求
|
|
|
+
|
|
|
+- **直接回答问题**:开门见山,第一段就给出核心答案
|
|
|
+- **结构清晰**:使用标题、列表、分段等组织内容
|
|
|
+- **综合多个来源**:整合多个帖子的信息,不要只依赖一个
|
|
|
+- **信息准确**:基于帖子内容,不要编造信息
|
|
|
+- **实用性**:提供可操作的建议或具体的信息
|
|
|
+
|
|
|
+### 2. 引用规范
|
|
|
+
|
|
|
+- **必须标注来源**:每个关键信息都要标注帖子索引
|
|
|
+- **引用格式**:使用 `[1]`、`[2]` 等标注帖子序号
|
|
|
+- **多来源引用**:如果多个帖子支持同一观点,使用 `[1,2,3]`
|
|
|
+- **引用位置**:在相关句子或段落的末尾标注
|
|
|
+
|
|
|
+### 3. 置信度使用
|
|
|
+
|
|
|
+- **优先高置信度**:优先引用置信度高的帖子
|
|
|
+- **交叉验证**:如果多个帖子提到相同信息,可以提高可信度
|
|
|
+- **标注不确定性**:如果信息来自低置信度帖子,适当标注
|
|
|
+
|
|
|
+### 4. 答案结构建议
|
|
|
+
|
|
|
+```
|
|
|
+【核心答案】
|
|
|
+直接回答用户的问题,给出最核心的信息。[引用]
|
|
|
+
|
|
|
+【详细说明】
|
|
|
+1. 第一个方面/要点 [引用]
|
|
|
+ - 具体内容
|
|
|
+ - 相关细节
|
|
|
+
|
|
|
+2. 第二个方面/要点 [引用]
|
|
|
+ - 具体内容
|
|
|
+ - 相关细节
|
|
|
+
|
|
|
+【补充建议/注意事项】(可选)
|
|
|
+其他有价值的信息或提醒。[引用]
|
|
|
+
|
|
|
+【参考帖子】
|
|
|
+列出所有引用的帖子编号和标题。
|
|
|
+```
|
|
|
+
|
|
|
+## 输出格式
|
|
|
+
|
|
|
+{
|
|
|
+ "answer": "生成的答案内容(Markdown格式)",
|
|
|
+ "cited_note_indices": [1, 2, 3], # 引用的帖子序号列表
|
|
|
+ "confidence": 0.85, # 答案的整体置信度 (0-1)
|
|
|
+ "summary": "一句话总结答案的核心内容"
|
|
|
+}
|
|
|
+
|
|
|
+## 重要原则
|
|
|
+
|
|
|
+1. **忠于原文**:不要添加帖子中没有的信息
|
|
|
+2. **引用透明**:让用户知道每个信息来自哪个帖子
|
|
|
+3. **综合性**:尽可能整合多个帖子的信息
|
|
|
+4. **可读性**:使用清晰的Markdown格式
|
|
|
+5. **质量优先**:如果帖子质量不够,可以说明信息有限
|
|
|
+""".strip()
|
|
|
+
|
|
|
+class AnswerGeneration(BaseModel):
|
|
|
+ """答案生成模型"""
|
|
|
+ answer: str = Field(..., description="生成的答案内容(Markdown格式)")
|
|
|
+ cited_note_indices: list[int] = Field(..., description="引用的帖子序号列表")
|
|
|
+ confidence: float = Field(..., description="答案的整体置信度 0-1")
|
|
|
+ summary: str = Field(..., description="一句话总结答案的核心内容")
|
|
|
+
|
|
|
+answer_generator = Agent[None](
|
|
|
+ name="答案生成专家",
|
|
|
+ instructions=answer_generation_instructions,
|
|
|
+ model=get_model(MODEL_NAME),
|
|
|
+ output_type=AnswerGeneration,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 日志辅助函数
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+def add_step(context: RunContext, step_name: str, step_type: str, data: dict):
|
|
|
+ """添加步骤记录"""
|
|
|
+ step = {
|
|
|
+ "step_number": len(context.steps) + 1,
|
|
|
+ "step_name": step_name,
|
|
|
+ "step_type": step_type,
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "data": data
|
|
|
+ }
|
|
|
+ context.steps.append(step)
|
|
|
+ return step
|
|
|
+
|
|
|
+
|
|
|
+def process_note_data(note: dict) -> dict:
|
|
|
+ """
|
|
|
+ 处理搜索接口返回的帖子数据,标准化为统一格式
|
|
|
+
|
|
|
+ Args:
|
|
|
+ note: 搜索接口返回的原始帖子数据
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 标准化后的帖子数据字典
|
|
|
+ """
|
|
|
+ note_card = note.get("note_card", {})
|
|
|
+ image_list = note_card.get("image_list", []) # 已在搜索API层预处理为URL字符串列表
|
|
|
+ interact_info = note_card.get("interact_info", {})
|
|
|
+ user_info = note_card.get("user", {})
|
|
|
+
|
|
|
+ return {
|
|
|
+ "note_id": note.get("id", ""),
|
|
|
+ "title": note_card.get("display_title", ""),
|
|
|
+ "desc": note_card.get("desc", ""),
|
|
|
+ "image_list": image_list, # 第一张就是封面,已在XiaohongshuSearch.search()中预处理为URL字符串列表
|
|
|
+ "interact_info": {
|
|
|
+ "liked_count": interact_info.get("liked_count", 0),
|
|
|
+ "collected_count": interact_info.get("collected_count", 0),
|
|
|
+ "comment_count": interact_info.get("comment_count", 0),
|
|
|
+ "shared_count": interact_info.get("shared_count", 0)
|
|
|
+ },
|
|
|
+ "user": {
|
|
|
+ "nickname": user_info.get("nickname", ""),
|
|
|
+ "user_id": user_info.get("user_id", "")
|
|
|
+ },
|
|
|
+ "type": note_card.get("type", "normal"),
|
|
|
+ "note_url": f"https://www.xiaohongshu.com/explore/{note.get('id', '')}"
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 核心函数
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+async def extract_keywords(q: str, context: RunContext) -> KeywordList:
|
|
|
+ """提取关键词"""
|
|
|
+ print("\n[步骤 1] 正在提取关键词...")
|
|
|
+ result = await Runner.run(keyword_extractor, q)
|
|
|
+ keyword_list: KeywordList = result.final_output
|
|
|
+ print(f"提取的关键词:{keyword_list.keywords}")
|
|
|
+ print(f"提取理由:{keyword_list.reasoning}")
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, "提取关键词", "keyword_extraction", {
|
|
|
+ "input_question": q,
|
|
|
+ "keywords": keyword_list.keywords,
|
|
|
+ "reasoning": keyword_list.reasoning
|
|
|
+ })
|
|
|
+
|
|
|
+ return keyword_list
|
|
|
+
|
|
|
+
|
|
|
+async def explore_level(queries: list[str], level_num: int, context: RunContext) -> dict:
|
|
|
+ """探索一个层级(并发获取所有query的推荐词)"""
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] Level {level_num} 探索:{len(queries)} 个query")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ xiaohongshu_api = XiaohongshuSearchRecommendations()
|
|
|
+
|
|
|
+ # 并发获取所有推荐词
|
|
|
+ async def get_single_sug(query: str):
|
|
|
+ print(f" 探索: {query}")
|
|
|
+ suggestions = xiaohongshu_api.get_recommendations(keyword=query)
|
|
|
+ print(f" → {len(suggestions) if suggestions else 0} 个推荐词")
|
|
|
+ return {
|
|
|
+ "query": query,
|
|
|
+ "suggestions": suggestions or []
|
|
|
+ }
|
|
|
+
|
|
|
+ results = await asyncio.gather(*[get_single_sug(q) for q in queries])
|
|
|
+
|
|
|
+ level_data = {
|
|
|
+ "level": level_num,
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "queries": results
|
|
|
+ }
|
|
|
+
|
|
|
+ context.exploration_levels.append(level_data)
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, f"Level {level_num} 探索", "level_exploration", {
|
|
|
+ "level": level_num,
|
|
|
+ "input_queries": queries,
|
|
|
+ "query_count": len(queries),
|
|
|
+ "results": results,
|
|
|
+ "total_suggestions": sum(len(r['suggestions']) for r in results)
|
|
|
+ })
|
|
|
+
|
|
|
+ return level_data
|
|
|
+
|
|
|
+
|
|
|
+async def analyze_level(level_data: dict, all_levels: list[dict], original_question: str, context: RunContext) -> LevelAnalysis:
|
|
|
+ """分析当前层级,决定下一步"""
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n[步骤 {step_num}] 正在分析 Level {level_data['level']}...")
|
|
|
+
|
|
|
+ # 构造输入
|
|
|
+ analysis_input = f"""
|
|
|
+<原始问题>
|
|
|
+{original_question}
|
|
|
+</原始问题>
|
|
|
+
|
|
|
+<已探索的所有层级>
|
|
|
+{json.dumps(all_levels, ensure_ascii=False, indent=2)}
|
|
|
+</已探索的所有层级>
|
|
|
+
|
|
|
+<当前层级>
|
|
|
+Level {level_data['level']}
|
|
|
+{json.dumps(level_data['queries'], ensure_ascii=False, indent=2)}
|
|
|
+</当前层级>
|
|
|
+
|
|
|
+请分析当前探索状态,决定下一步行动。
|
|
|
+"""
|
|
|
+
|
|
|
+ result = await Runner.run(level_analyzer, analysis_input)
|
|
|
+ analysis: LevelAnalysis = result.final_output
|
|
|
+
|
|
|
+ print(f"\n分析结果:")
|
|
|
+ print(f" 关键发现:{analysis.key_findings}")
|
|
|
+ print(f" 有价值的信号:{len(analysis.promising_signals)} 个")
|
|
|
+ print(f" 是否评估:{analysis.should_evaluate_now}")
|
|
|
+
|
|
|
+ if analysis.should_evaluate_now:
|
|
|
+ print(f" 候选query:{analysis.candidates_to_evaluate}")
|
|
|
+ else:
|
|
|
+ print(f" 下一层探索:{analysis.next_combinations}")
|
|
|
+
|
|
|
+ # 保存分析结果
|
|
|
+ context.level_analyses.append({
|
|
|
+ "level": level_data['level'],
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "analysis": analysis.model_dump()
|
|
|
+ })
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, f"Level {level_data['level']} 分析", "level_analysis", {
|
|
|
+ "level": level_data['level'],
|
|
|
+ "key_findings": analysis.key_findings,
|
|
|
+ "promising_signals_count": len(analysis.promising_signals),
|
|
|
+ "promising_signals": [s.model_dump() for s in analysis.promising_signals],
|
|
|
+ "should_evaluate_now": analysis.should_evaluate_now,
|
|
|
+ "candidates_to_evaluate": analysis.candidates_to_evaluate if analysis.should_evaluate_now else [],
|
|
|
+ "next_combinations": analysis.next_combinations if not analysis.should_evaluate_now else [],
|
|
|
+ "reasoning": analysis.reasoning
|
|
|
+ })
|
|
|
+
|
|
|
+ return analysis
|
|
|
+
|
|
|
+
|
|
|
+async def evaluate_candidates(candidates: list[str], original_question: str, context: RunContext) -> list[dict]:
|
|
|
+ """评估候选query(含实际搜索验证)"""
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] 评估 {len(candidates)} 个候选query")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ xiaohongshu_api = XiaohongshuSearchRecommendations()
|
|
|
+ xiaohongshu_search = XiaohongshuSearch()
|
|
|
+
|
|
|
+ # 创建搜索结果保存目录
|
|
|
+ search_results_dir = os.path.join(context.log_dir, "search_results")
|
|
|
+ os.makedirs(search_results_dir, exist_ok=True)
|
|
|
+
|
|
|
+ async def evaluate_single_candidate(candidate: str, candidate_index: int):
|
|
|
+ print(f"\n评估候选:{candidate}")
|
|
|
+
|
|
|
+ # 为当前候选创建子目录
|
|
|
+ # 清理候选名称,移除不适合作为目录名的字符
|
|
|
+ safe_candidate_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in candidate)
|
|
|
+ candidate_dir = os.path.join(search_results_dir, f"candidate_{candidate_index+1}_{safe_candidate_name[:50]}")
|
|
|
+ os.makedirs(candidate_dir, exist_ok=True)
|
|
|
+
|
|
|
+ # 1. 获取推荐词
|
|
|
+ suggestions = xiaohongshu_api.get_recommendations(keyword=candidate)
|
|
|
+ print(f" 获取到 {len(suggestions) if suggestions else 0} 个推荐词")
|
|
|
+
|
|
|
+ if not suggestions:
|
|
|
+ return {
|
|
|
+ "candidate": candidate,
|
|
|
+ "suggestions": [],
|
|
|
+ "evaluations": []
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2. 评估每个推荐词(意图匹配 + 相关性)
|
|
|
+ async def eval_single_sug(sug: str, sug_index: int):
|
|
|
+ # 2.1 先进行意图和相关性评估
|
|
|
+ eval_input = f"""
|
|
|
+<原始问题>
|
|
|
+{original_question}
|
|
|
+</原始问题>
|
|
|
+
|
|
|
+<待评估的推荐query>
|
|
|
+{sug}
|
|
|
+</待评估的推荐query>
|
|
|
+
|
|
|
+请评估该推荐query:
|
|
|
+1. intent_match: 意图是否匹配(true/false)
|
|
|
+2. relevance_score: 相关性分数(0-1)
|
|
|
+3. reason: 详细的评估理由
|
|
|
+"""
|
|
|
+ result = await Runner.run(evaluator, eval_input)
|
|
|
+ evaluation: RelevanceEvaluation = result.final_output
|
|
|
+
|
|
|
+ eval_result = {
|
|
|
+ "query": sug,
|
|
|
+ "intent_match": evaluation.intent_match,
|
|
|
+ "relevance_score": evaluation.relevance_score,
|
|
|
+ "reason": evaluation.reason,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2.2 如果意图匹配且相关性足够高,进行实际搜索验证
|
|
|
+ if evaluation.intent_match and evaluation.relevance_score >= 0.7:
|
|
|
+ print(f" → 合格候选,进行实际搜索验证: {sug}")
|
|
|
+ try:
|
|
|
+ search_result = xiaohongshu_search.search(keyword=sug)
|
|
|
+
|
|
|
+ # 解析result字段(它是JSON字符串,需要先解析)
|
|
|
+ result_str = search_result.get("result", "{}")
|
|
|
+ if isinstance(result_str, str):
|
|
|
+ result_data = json.loads(result_str)
|
|
|
+ else:
|
|
|
+ result_data = result_str
|
|
|
+
|
|
|
+ # 格式化搜索结果:将result字段解析后再保存
|
|
|
+ formatted_search_result = {
|
|
|
+ "success": search_result.get("success"),
|
|
|
+ "result": result_data, # 保存解析后的数据
|
|
|
+ "tool_name": search_result.get("tool_name"),
|
|
|
+ "call_type": search_result.get("call_type"),
|
|
|
+ "query": sug,
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 保存格式化后的搜索结果到文件
|
|
|
+ safe_sug_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in sug)
|
|
|
+ search_result_file = os.path.join(candidate_dir, f"sug_{sug_index+1}_{safe_sug_name[:30]}.json")
|
|
|
+ with open(search_result_file, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(formatted_search_result, f, ensure_ascii=False, indent=2)
|
|
|
+ print(f" 搜索结果已保存: {os.path.basename(search_result_file)}")
|
|
|
+
|
|
|
+ # 提取搜索结果的标题和描述
|
|
|
+ # 正确的数据路径: result.data.data[]
|
|
|
+ notes = result_data.get("data", {}).get("data", [])
|
|
|
+ if notes:
|
|
|
+ print(f" 开始评估 {len(notes)} 个帖子...")
|
|
|
+
|
|
|
+ # 对每个帖子进行独立评估
|
|
|
+ note_evaluations = []
|
|
|
+ for note_idx, note in enumerate(notes, 1): # 评估所有帖子
|
|
|
+ note_card = note.get("note_card", {})
|
|
|
+ title = note_card.get("display_title", "")
|
|
|
+ desc = note_card.get("desc", "")
|
|
|
+ note_id = note.get("id", "")
|
|
|
+
|
|
|
+ # 构造评估输入
|
|
|
+ eval_input = f"""
|
|
|
+<原始问题>
|
|
|
+{original_question}
|
|
|
+</原始问题>
|
|
|
+
|
|
|
+<帖子信息>
|
|
|
+标题: {title}
|
|
|
+描述: {desc}
|
|
|
+</帖子信息>
|
|
|
+
|
|
|
+请评估这个帖子能否满足用户需求。
|
|
|
+"""
|
|
|
+ # 调用评估Agent
|
|
|
+ eval_result_run = await Runner.run(note_evaluator, eval_input)
|
|
|
+ note_eval: NoteEvaluation = eval_result_run.final_output
|
|
|
+
|
|
|
+ note_evaluation_record = {
|
|
|
+ "note_index": note_idx,
|
|
|
+ "note_id": note_id,
|
|
|
+ "title": title,
|
|
|
+ "desc": desc, # 保存完整描述
|
|
|
+ "evaluation": {
|
|
|
+ "title_relevance": note_eval.title_relevance,
|
|
|
+ "content_expectation": note_eval.content_expectation,
|
|
|
+ "need_satisfaction": note_eval.need_satisfaction,
|
|
|
+ "confidence_score": note_eval.confidence_score,
|
|
|
+ "reason": note_eval.reason
|
|
|
+ }
|
|
|
+ }
|
|
|
+ note_evaluations.append(note_evaluation_record)
|
|
|
+
|
|
|
+ # 简单打印进度
|
|
|
+ if note_idx % 3 == 0 or note_idx == len(notes):
|
|
|
+ print(f" 已评估 {note_idx}/{len(notes)} 个帖子")
|
|
|
+
|
|
|
+ # 统计满足需求的帖子数量
|
|
|
+ satisfied_count = sum(1 for ne in note_evaluations if ne["evaluation"]["need_satisfaction"])
|
|
|
+ avg_confidence = sum(ne["evaluation"]["confidence_score"] for ne in note_evaluations) / len(note_evaluations) if note_evaluations else 0
|
|
|
+
|
|
|
+ eval_result["search_verification"] = {
|
|
|
+ "total_notes": len(notes),
|
|
|
+ "evaluated_notes": len(note_evaluations),
|
|
|
+ "satisfied_count": satisfied_count,
|
|
|
+ "average_confidence": round(avg_confidence, 2),
|
|
|
+ "note_evaluations": note_evaluations,
|
|
|
+ "search_result_file": search_result_file
|
|
|
+ }
|
|
|
+
|
|
|
+ print(f" 评估完成: {satisfied_count}/{len(note_evaluations)} 个帖子满足需求, "
|
|
|
+ f"平均置信度={avg_confidence:.2f}")
|
|
|
+ else:
|
|
|
+ eval_result["search_verification"] = {
|
|
|
+ "total_notes": 0,
|
|
|
+ "evaluated_notes": 0,
|
|
|
+ "satisfied_count": 0,
|
|
|
+ "average_confidence": 0.0,
|
|
|
+ "note_evaluations": [],
|
|
|
+ "search_result_file": search_result_file,
|
|
|
+ "reason": "搜索无结果"
|
|
|
+ }
|
|
|
+ print(f" 搜索无结果")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" 搜索验证出错: {e}")
|
|
|
+ eval_result["search_verification"] = {
|
|
|
+ "error": str(e)
|
|
|
+ }
|
|
|
+
|
|
|
+ return eval_result
|
|
|
+
|
|
|
+ evaluations = await asyncio.gather(*[eval_single_sug(s, i) for i, s in enumerate(suggestions)])
|
|
|
+
|
|
|
+ return {
|
|
|
+ "candidate": candidate,
|
|
|
+ "suggestions": suggestions,
|
|
|
+ "evaluations": evaluations
|
|
|
+ }
|
|
|
+
|
|
|
+ results = await asyncio.gather(*[evaluate_single_candidate(c, i) for i, c in enumerate(candidates)])
|
|
|
+
|
|
|
+ # 生成搜索结果汇总文件
|
|
|
+ summary_data = {
|
|
|
+ "original_question": original_question,
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "total_candidates": len(candidates),
|
|
|
+ "candidates": []
|
|
|
+ }
|
|
|
+
|
|
|
+ for i, result in enumerate(results):
|
|
|
+ candidate_summary = {
|
|
|
+ "index": i + 1,
|
|
|
+ "candidate": result["candidate"],
|
|
|
+ "suggestions_count": len(result["suggestions"]),
|
|
|
+ "verified_queries": []
|
|
|
+ }
|
|
|
+
|
|
|
+ for eval_item in result.get("evaluations", []):
|
|
|
+ if "search_verification" in eval_item and "search_result_file" in eval_item["search_verification"]:
|
|
|
+ sv = eval_item["search_verification"]
|
|
|
+ candidate_summary["verified_queries"].append({
|
|
|
+ "query": eval_item["query"],
|
|
|
+ "intent_match": eval_item["intent_match"],
|
|
|
+ "relevance_score": eval_item["relevance_score"],
|
|
|
+ "verification": {
|
|
|
+ "total_notes": sv.get("total_notes", 0),
|
|
|
+ "evaluated_notes": sv.get("evaluated_notes", 0),
|
|
|
+ "satisfied_count": sv.get("satisfied_count", 0),
|
|
|
+ "average_confidence": sv.get("average_confidence", 0.0)
|
|
|
+ },
|
|
|
+ "search_result_file": sv["search_result_file"]
|
|
|
+ })
|
|
|
+
|
|
|
+ summary_data["candidates"].append(candidate_summary)
|
|
|
+
|
|
|
+ # 保存汇总文件
|
|
|
+ summary_file = os.path.join(search_results_dir, "summary.json")
|
|
|
+ with open(summary_file, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(summary_data, f, ensure_ascii=False, indent=2)
|
|
|
+ print(f"\n搜索结果汇总已保存: {summary_file}")
|
|
|
+
|
|
|
+ context.evaluation_results = results
|
|
|
+
|
|
|
+ # 构建详细的步骤记录数据
|
|
|
+ step_data = {
|
|
|
+ "candidate_count": len(candidates),
|
|
|
+ "candidates": candidates,
|
|
|
+ "total_evaluations": sum(len(r['evaluations']) for r in results),
|
|
|
+ "verified_queries": sum(
|
|
|
+ 1 for r in results
|
|
|
+ for e in r.get('evaluations', [])
|
|
|
+ if 'search_verification' in e
|
|
|
+ ),
|
|
|
+ "search_results_dir": search_results_dir,
|
|
|
+ "summary_file": summary_file,
|
|
|
+ "detailed_results": []
|
|
|
+ }
|
|
|
+
|
|
|
+ # 为每个候选记录详细信息
|
|
|
+ for result in results:
|
|
|
+ candidate_detail = {
|
|
|
+ "candidate": result["candidate"],
|
|
|
+ "suggestions": result["suggestions"],
|
|
|
+ "evaluations": []
|
|
|
+ }
|
|
|
+
|
|
|
+ for eval_item in result.get("evaluations", []):
|
|
|
+ eval_detail = {
|
|
|
+ "query": eval_item["query"],
|
|
|
+ "intent_match": eval_item["intent_match"],
|
|
|
+ "relevance_score": eval_item["relevance_score"],
|
|
|
+ "reason": eval_item["reason"]
|
|
|
+ }
|
|
|
+
|
|
|
+ # 如果有搜索验证,添加详细信息
|
|
|
+ if "search_verification" in eval_item:
|
|
|
+ verification = eval_item["search_verification"]
|
|
|
+ eval_detail["search_verification"] = {
|
|
|
+ "performed": True,
|
|
|
+ "total_notes": verification.get("total_notes", 0),
|
|
|
+ "evaluated_notes": verification.get("evaluated_notes", 0),
|
|
|
+ "satisfied_count": verification.get("satisfied_count", 0),
|
|
|
+ "average_confidence": verification.get("average_confidence", 0.0),
|
|
|
+ "search_result_file": verification.get("search_result_file"),
|
|
|
+ "has_error": "error" in verification
|
|
|
+ }
|
|
|
+ if "error" in verification:
|
|
|
+ eval_detail["search_verification"]["error"] = verification["error"]
|
|
|
+
|
|
|
+ # 保存每个帖子的评估详情
|
|
|
+ if "note_evaluations" in verification:
|
|
|
+ eval_detail["search_verification"]["note_evaluations"] = verification["note_evaluations"]
|
|
|
+ else:
|
|
|
+ eval_detail["search_verification"] = {
|
|
|
+ "performed": False,
|
|
|
+ "reason": "未达到搜索验证阈值(intent_match=False 或 relevance_score<0.7)"
|
|
|
+ }
|
|
|
+
|
|
|
+ candidate_detail["evaluations"].append(eval_detail)
|
|
|
+
|
|
|
+ step_data["detailed_results"].append(candidate_detail)
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, "评估候选query", "candidate_evaluation", step_data)
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 新的独立步骤函数(方案A)
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+async def step_evaluate_query_suggestions(candidates: list[str], original_question: str, context: RunContext) -> list[dict]:
|
|
|
+ """
|
|
|
+ 步骤1: 评估候选query的推荐词
|
|
|
+
|
|
|
+ 输入:
|
|
|
+ - candidates: 候选query列表
|
|
|
+ - original_question: 原始问题
|
|
|
+ - context: 运行上下文
|
|
|
+
|
|
|
+ 输出:
|
|
|
+ - 每个候选的评估结果列表,包含:
|
|
|
+ - candidate: 候选query
|
|
|
+ - suggestions: 推荐词列表
|
|
|
+ - evaluations: 每个推荐词的意图匹配和相关性评分
|
|
|
+ """
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] 评估 {len(candidates)} 个候选query的推荐词")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ xiaohongshu_api = XiaohongshuSearchRecommendations()
|
|
|
+
|
|
|
+ async def evaluate_single_candidate(candidate: str):
|
|
|
+ print(f"\n评估候选:{candidate}")
|
|
|
+
|
|
|
+ # 1. 获取推荐词
|
|
|
+ suggestions = xiaohongshu_api.get_recommendations(keyword=candidate)
|
|
|
+ print(f" 获取到 {len(suggestions) if suggestions else 0} 个推荐词")
|
|
|
+
|
|
|
+ if not suggestions:
|
|
|
+ return {
|
|
|
+ "candidate": candidate,
|
|
|
+ "suggestions": [],
|
|
|
+ "evaluations": []
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2. 评估每个推荐词(只做意图匹配和相关性评分)
|
|
|
+ async def eval_single_sug(sug: str):
|
|
|
+ eval_input = f"""
|
|
|
+<原始问题>
|
|
|
+{original_question}
|
|
|
+</原始问题>
|
|
|
+
|
|
|
+<待评估的推荐query>
|
|
|
+{sug}
|
|
|
+</待评估的推荐query>
|
|
|
+
|
|
|
+请评估该推荐query:
|
|
|
+1. intent_match: 意图是否匹配(true/false)
|
|
|
+2. relevance_score: 相关性分数(0-1)
|
|
|
+3. reason: 详细的评估理由
|
|
|
+"""
|
|
|
+ result = await Runner.run(evaluator, eval_input)
|
|
|
+ evaluation: RelevanceEvaluation = result.final_output
|
|
|
+
|
|
|
+ return {
|
|
|
+ "query": sug,
|
|
|
+ "intent_match": evaluation.intent_match,
|
|
|
+ "relevance_score": evaluation.relevance_score,
|
|
|
+ "reason": evaluation.reason
|
|
|
+ }
|
|
|
+
|
|
|
+ evaluations = await asyncio.gather(*[eval_single_sug(s) for s in suggestions])
|
|
|
+
|
|
|
+ return {
|
|
|
+ "candidate": candidate,
|
|
|
+ "suggestions": suggestions,
|
|
|
+ "evaluations": evaluations
|
|
|
+ }
|
|
|
+
|
|
|
+ results = await asyncio.gather(*[evaluate_single_candidate(c) for c in candidates])
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, "评估候选query的推荐词", "query_suggestion_evaluation", {
|
|
|
+ "candidate_count": len(candidates),
|
|
|
+ "candidates": candidates,
|
|
|
+ "results": results,
|
|
|
+ "total_evaluations": sum(len(r['evaluations']) for r in results),
|
|
|
+ "qualified_count": sum(
|
|
|
+ 1 for r in results
|
|
|
+ for e in r['evaluations']
|
|
|
+ if e['intent_match'] and e['relevance_score'] >= 0.7
|
|
|
+ )
|
|
|
+ })
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+
|
|
|
+def step_filter_qualified_queries(evaluation_results: list[dict], context: RunContext, min_relevance_score: float = 0.7) -> list[dict]:
|
|
|
+ """
|
|
|
+ 步骤1.5: 筛选合格的推荐词
|
|
|
+
|
|
|
+ 输入:
|
|
|
+ - evaluation_results: 步骤1的评估结果
|
|
|
+
|
|
|
+ 输出:
|
|
|
+ - 合格的query列表,每个包含:
|
|
|
+ - query: 推荐词
|
|
|
+ - from_candidate: 来源候选
|
|
|
+ - intent_match: 意图匹配
|
|
|
+ - relevance_score: 相关性分数
|
|
|
+ - reason: 评估理由
|
|
|
+ """
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] 筛选合格的推荐词")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ qualified_queries = []
|
|
|
+ all_queries = [] # 保存所有查询,包括不合格的
|
|
|
+
|
|
|
+ for result in evaluation_results:
|
|
|
+ candidate = result["candidate"]
|
|
|
+ for eval_item in result.get("evaluations", []):
|
|
|
+ query_data = {
|
|
|
+ "query": eval_item["query"],
|
|
|
+ "from_candidate": candidate,
|
|
|
+ "intent_match": eval_item["intent_match"],
|
|
|
+ "relevance_score": eval_item["relevance_score"],
|
|
|
+ "reason": eval_item["reason"]
|
|
|
+ }
|
|
|
+
|
|
|
+ # 判断是否合格
|
|
|
+ is_qualified = (eval_item['intent_match'] is True
|
|
|
+ and eval_item['relevance_score'] >= min_relevance_score)
|
|
|
+ query_data["is_qualified"] = is_qualified
|
|
|
+
|
|
|
+ all_queries.append(query_data)
|
|
|
+ if is_qualified:
|
|
|
+ qualified_queries.append(query_data)
|
|
|
+
|
|
|
+ # 按相关性分数降序排列
|
|
|
+ qualified_queries.sort(key=lambda x: x['relevance_score'], reverse=True)
|
|
|
+ all_queries.sort(key=lambda x: x['relevance_score'], reverse=True)
|
|
|
+
|
|
|
+ print(f"\n找到 {len(qualified_queries)} 个合格的推荐词 (共评估 {len(all_queries)} 个)")
|
|
|
+ if qualified_queries:
|
|
|
+ print(f"相关性分数范围: {qualified_queries[0]['relevance_score']:.2f} ~ {qualified_queries[-1]['relevance_score']:.2f}")
|
|
|
+ print("\n合格的推荐词:")
|
|
|
+ for idx, q in enumerate(qualified_queries[:5], 1):
|
|
|
+ print(f" {idx}. {q['query']} (分数: {q['relevance_score']:.2f})")
|
|
|
+ if len(qualified_queries) > 5:
|
|
|
+ print(f" ... 还有 {len(qualified_queries) - 5} 个")
|
|
|
+
|
|
|
+ # 记录步骤 - 保存所有查询数据
|
|
|
+ add_step(context, "筛选合格的推荐词", "filter_qualified_queries", {
|
|
|
+ "input_evaluation_count": len(all_queries),
|
|
|
+ "min_relevance_score": min_relevance_score,
|
|
|
+ "qualified_count": len(qualified_queries),
|
|
|
+ "qualified_queries": qualified_queries,
|
|
|
+ "all_queries": all_queries # 新增:保存所有查询数据
|
|
|
+ })
|
|
|
+
|
|
|
+ return qualified_queries
|
|
|
+
|
|
|
+
|
|
|
+async def step_search_qualified_queries(qualified_queries: list[dict], context: RunContext) -> dict:
|
|
|
+ """
|
|
|
+ 步骤2: 搜索合格的推荐词
|
|
|
+
|
|
|
+ 输入:
|
|
|
+ - qualified_queries: 步骤1.5筛选出的合格query列表,每个包含:
|
|
|
+ - query: 推荐词
|
|
|
+ - from_candidate: 来源候选
|
|
|
+ - intent_match, relevance_score, reason
|
|
|
+
|
|
|
+ 输出:
|
|
|
+ - 搜索结果字典,包含:
|
|
|
+ - searches: 每个query的搜索结果列表
|
|
|
+ - search_results_dir: 搜索结果保存目录
|
|
|
+ """
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] 搜索 {len(qualified_queries)} 个合格的推荐词")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ if not qualified_queries:
|
|
|
+ add_step(context, "搜索合格的推荐词", "search_qualified_queries", {
|
|
|
+ "qualified_count": 0,
|
|
|
+ "searches": []
|
|
|
+ })
|
|
|
+ return {"searches": [], "search_results_dir": None}
|
|
|
+
|
|
|
+ # 创建搜索结果保存目录
|
|
|
+ search_results_dir = os.path.join(context.log_dir, "search_results")
|
|
|
+ os.makedirs(search_results_dir, exist_ok=True)
|
|
|
+
|
|
|
+ xiaohongshu_search = XiaohongshuSearch()
|
|
|
+
|
|
|
+ # 搜索每个合格的query
|
|
|
+ async def search_single_query(query_info: dict, query_index: int):
|
|
|
+ query = query_info['query']
|
|
|
+ print(f"\n搜索 [{query_index+1}/{len(qualified_queries)}]: {query}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 执行搜索
|
|
|
+ search_result = xiaohongshu_search.search(keyword=query)
|
|
|
+
|
|
|
+ # 解析result字段
|
|
|
+ result_str = search_result.get("result", "{}")
|
|
|
+ if isinstance(result_str, str):
|
|
|
+ result_data = json.loads(result_str)
|
|
|
+ else:
|
|
|
+ result_data = result_str
|
|
|
+
|
|
|
+ # 格式化搜索结果
|
|
|
+ formatted_search_result = {
|
|
|
+ "success": search_result.get("success"),
|
|
|
+ "result": result_data,
|
|
|
+ "tool_name": search_result.get("tool_name"),
|
|
|
+ "call_type": search_result.get("call_type"),
|
|
|
+ "query": query,
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ # 保存到文件
|
|
|
+ safe_query_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in query)
|
|
|
+ query_dir = os.path.join(search_results_dir, f"query_{query_index+1}_{safe_query_name[:50]}")
|
|
|
+ os.makedirs(query_dir, exist_ok=True)
|
|
|
+
|
|
|
+ search_result_file = os.path.join(query_dir, "search_result.json")
|
|
|
+ with open(search_result_file, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(formatted_search_result, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ # 提取帖子列表
|
|
|
+ notes = result_data.get("data", {}).get("data", [])
|
|
|
+
|
|
|
+ print(f" → 搜索成功,获得 {len(notes)} 个帖子")
|
|
|
+
|
|
|
+ # ⭐ 提取帖子摘要信息用于steps.json
|
|
|
+ notes_summary = [process_note_data(note) for note in notes]
|
|
|
+
|
|
|
+ return {
|
|
|
+ "query": query,
|
|
|
+ "from_candidate": query_info['from_candidate'],
|
|
|
+ "intent_match": query_info['intent_match'],
|
|
|
+ "relevance_score": query_info['relevance_score'],
|
|
|
+ "reason": query_info['reason'],
|
|
|
+ "search_result_file": search_result_file,
|
|
|
+ "note_count": len(notes),
|
|
|
+ "notes": notes, # 保存所有帖子用于评估
|
|
|
+ "notes_summary": notes_summary # ⭐ 保存到steps.json
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" → 搜索失败: {e}")
|
|
|
+ return {
|
|
|
+ "query": query,
|
|
|
+ "from_candidate": query_info['from_candidate'],
|
|
|
+ "intent_match": query_info['intent_match'],
|
|
|
+ "relevance_score": query_info['relevance_score'],
|
|
|
+ "reason": query_info['reason'],
|
|
|
+ "error": str(e),
|
|
|
+ "note_count": 0,
|
|
|
+ "notes": []
|
|
|
+ }
|
|
|
+
|
|
|
+ search_results = await asyncio.gather(*[search_single_query(q, i) for i, q in enumerate(qualified_queries)])
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, "搜索合格的推荐词", "search_qualified_queries", {
|
|
|
+ "qualified_count": len(qualified_queries),
|
|
|
+ "search_results": [
|
|
|
+ {
|
|
|
+ "query": sr['query'],
|
|
|
+ "from_candidate": sr['from_candidate'],
|
|
|
+ "note_count": sr['note_count'],
|
|
|
+ "search_result_file": sr.get('search_result_file'),
|
|
|
+ "has_error": 'error' in sr,
|
|
|
+ "notes_summary": sr.get('notes_summary', []) # ⭐ 包含帖子摘要
|
|
|
+ }
|
|
|
+ for sr in search_results
|
|
|
+ ],
|
|
|
+ "search_results_dir": search_results_dir
|
|
|
+ })
|
|
|
+
|
|
|
+ return {
|
|
|
+ "searches": search_results,
|
|
|
+ "search_results_dir": search_results_dir
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+async def step_evaluate_search_notes(search_data: dict, original_question: str, context: RunContext) -> dict:
|
|
|
+ """
|
|
|
+ 步骤3: 评估搜索到的帖子
|
|
|
+
|
|
|
+ 输入:
|
|
|
+ - search_data: 步骤2的搜索结果,包含:
|
|
|
+ - searches: 搜索结果列表
|
|
|
+ - search_results_dir: 结果目录
|
|
|
+
|
|
|
+ 输出:
|
|
|
+ - 帖子评估结果字典,包含:
|
|
|
+ - note_evaluations: 每个query的帖子评估列表
|
|
|
+ """
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] 评估搜索到的帖子")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ search_results = search_data['searches']
|
|
|
+
|
|
|
+ if not search_results:
|
|
|
+ add_step(context, "评估搜索到的帖子", "evaluate_search_notes", {
|
|
|
+ "query_count": 0,
|
|
|
+ "total_notes": 0,
|
|
|
+ "evaluated_notes": 0,
|
|
|
+ "note_evaluations": []
|
|
|
+ })
|
|
|
+ return {"note_evaluations": []}
|
|
|
+
|
|
|
+ # 对每个query的帖子进行评估
|
|
|
+ async def evaluate_query_notes(search_result: dict, query_index: int):
|
|
|
+ query = search_result['query']
|
|
|
+ notes = search_result.get('notes', [])
|
|
|
+
|
|
|
+ if not notes or 'error' in search_result:
|
|
|
+ return {
|
|
|
+ "query": query,
|
|
|
+ "from_candidate": search_result['from_candidate'],
|
|
|
+ "note_count": 0,
|
|
|
+ "evaluated_notes": [],
|
|
|
+ "satisfied_count": 0,
|
|
|
+ "average_confidence": 0.0
|
|
|
+ }
|
|
|
+
|
|
|
+ print(f"\n评估query [{query_index+1}]: {query} ({len(notes)} 个帖子)")
|
|
|
+
|
|
|
+ # 评估每个帖子
|
|
|
+ note_evaluations = []
|
|
|
+ for note_idx, note in enumerate(notes, 1):
|
|
|
+ # 使用标准化函数处理帖子数据
|
|
|
+ note_data = process_note_data(note)
|
|
|
+ title = note_data["title"]
|
|
|
+ desc = note_data["desc"]
|
|
|
+
|
|
|
+ # 调用评估Agent
|
|
|
+ eval_input = f"""
|
|
|
+<原始问题>
|
|
|
+{original_question}
|
|
|
+</原始问题>
|
|
|
+
|
|
|
+<帖子信息>
|
|
|
+标题: {title}
|
|
|
+描述: {desc}
|
|
|
+</帖子信息>
|
|
|
+
|
|
|
+请评估这个帖子能否满足用户需求。
|
|
|
+"""
|
|
|
+ eval_result_run = await Runner.run(note_evaluator, eval_input)
|
|
|
+ note_eval: NoteEvaluation = eval_result_run.final_output
|
|
|
+
|
|
|
+ # 合并标准化的帖子数据和评估结果
|
|
|
+ note_evaluations.append({
|
|
|
+ **note_data, # 包含所有标准化字段
|
|
|
+ "note_index": note_idx,
|
|
|
+ "evaluation": {
|
|
|
+ "title_relevance": note_eval.title_relevance,
|
|
|
+ "content_expectation": note_eval.content_expectation,
|
|
|
+ "need_satisfaction": note_eval.need_satisfaction,
|
|
|
+ "confidence_score": note_eval.confidence_score,
|
|
|
+ "reason": note_eval.reason
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ if note_idx % 3 == 0 or note_idx == len(notes):
|
|
|
+ print(f" 已评估 {note_idx}/{len(notes)} 个帖子")
|
|
|
+
|
|
|
+ # 统计
|
|
|
+ satisfied_count = sum(1 for ne in note_evaluations if ne["evaluation"]["need_satisfaction"])
|
|
|
+ avg_confidence = sum(ne["evaluation"]["confidence_score"] for ne in note_evaluations) / len(note_evaluations) if note_evaluations else 0
|
|
|
+
|
|
|
+ print(f" → 完成:{satisfied_count}/{len(note_evaluations)} 个帖子满足需求")
|
|
|
+
|
|
|
+ return {
|
|
|
+ "query": query,
|
|
|
+ "from_candidate": search_result['from_candidate'],
|
|
|
+ "note_count": len(notes),
|
|
|
+ "evaluated_notes": note_evaluations,
|
|
|
+ "satisfied_count": satisfied_count,
|
|
|
+ "average_confidence": round(avg_confidence, 2)
|
|
|
+ }
|
|
|
+
|
|
|
+ # 并发评估所有query的帖子
|
|
|
+ all_evaluations = await asyncio.gather(*[evaluate_query_notes(sr, i) for i, sr in enumerate(search_results, 1)])
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ total_notes = sum(e['note_count'] for e in all_evaluations)
|
|
|
+ total_satisfied = sum(e['satisfied_count'] for e in all_evaluations)
|
|
|
+
|
|
|
+ add_step(context, "评估搜索到的帖子", "evaluate_search_notes", {
|
|
|
+ "query_count": len(search_results),
|
|
|
+ "total_notes": total_notes,
|
|
|
+ "total_satisfied": total_satisfied,
|
|
|
+ "note_evaluations": all_evaluations
|
|
|
+ })
|
|
|
+
|
|
|
+ return {"note_evaluations": all_evaluations}
|
|
|
+
|
|
|
+
|
|
|
+def step_collect_satisfied_notes(note_evaluation_data: dict) -> list[dict]:
|
|
|
+ """
|
|
|
+ 步骤4: 汇总所有满足需求的帖子
|
|
|
+
|
|
|
+ 输入:
|
|
|
+ - note_evaluation_data: 步骤3的帖子评估结果
|
|
|
+
|
|
|
+ 输出:
|
|
|
+ - 所有满足需求的帖子列表,按置信度降序排列
|
|
|
+ """
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"汇总满足需求的帖子")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ all_satisfied_notes = []
|
|
|
+
|
|
|
+ for query_eval in note_evaluation_data['note_evaluations']:
|
|
|
+ for note in query_eval['evaluated_notes']:
|
|
|
+ if note['evaluation']['need_satisfaction']:
|
|
|
+ all_satisfied_notes.append({
|
|
|
+ "query": query_eval['query'],
|
|
|
+ "from_candidate": query_eval['from_candidate'],
|
|
|
+ "note_id": note['note_id'],
|
|
|
+ "title": note['title'],
|
|
|
+ "desc": note['desc'],
|
|
|
+ # ⭐ 保留完整帖子信息
|
|
|
+ "image_list": note.get('image_list', []),
|
|
|
+ "cover_image": note.get('cover_image', {}),
|
|
|
+ "interact_info": note.get('interact_info', {}),
|
|
|
+ "user": note.get('user', {}),
|
|
|
+ "type": note.get('type', 'normal'),
|
|
|
+ "note_url": note.get('note_url', ''),
|
|
|
+ # 评估结果
|
|
|
+ "title_relevance": note['evaluation']['title_relevance'],
|
|
|
+ "content_expectation": note['evaluation']['content_expectation'],
|
|
|
+ "confidence_score": note['evaluation']['confidence_score'],
|
|
|
+ "reason": note['evaluation']['reason']
|
|
|
+ })
|
|
|
+
|
|
|
+ # 按置信度降序排列
|
|
|
+ all_satisfied_notes.sort(key=lambda x: x['confidence_score'], reverse=True)
|
|
|
+
|
|
|
+ print(f"\n共收集到 {len(all_satisfied_notes)} 个满足需求的帖子")
|
|
|
+ if all_satisfied_notes:
|
|
|
+ print(f"置信度范围: {all_satisfied_notes[0]['confidence_score']:.2f} ~ {all_satisfied_notes[-1]['confidence_score']:.2f}")
|
|
|
+
|
|
|
+ return all_satisfied_notes
|
|
|
+
|
|
|
+
|
|
|
+async def step_generate_answer(satisfied_notes: list[dict], original_question: str, context: RunContext) -> dict:
|
|
|
+ """
|
|
|
+ 步骤5: 基于满足需求的帖子生成答案
|
|
|
+
|
|
|
+ 输入:
|
|
|
+ - satisfied_notes: 步骤4收集的满足需求的帖子列表
|
|
|
+ - original_question: 原始问题
|
|
|
+ - context: 运行上下文
|
|
|
+
|
|
|
+ 输出:
|
|
|
+ - 生成的答案及相关信息
|
|
|
+ - answer: 答案内容(Markdown格式)
|
|
|
+ - cited_note_indices: 引用的帖子索引
|
|
|
+ - confidence: 答案置信度
|
|
|
+ - summary: 答案摘要
|
|
|
+ - cited_notes: 被引用的帖子详情
|
|
|
+ """
|
|
|
+ step_num = len(context.steps) + 1
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"[步骤 {step_num}] 基于 {len(satisfied_notes)} 个帖子生成答案")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ if not satisfied_notes:
|
|
|
+ print("\n⚠️ 没有满足需求的帖子,无法生成答案")
|
|
|
+ result = {
|
|
|
+ "answer": "抱歉,未找到能够回答该问题的相关内容。",
|
|
|
+ "cited_note_indices": [],
|
|
|
+ "confidence": 0.0,
|
|
|
+ "summary": "无可用信息",
|
|
|
+ "cited_notes": []
|
|
|
+ }
|
|
|
+
|
|
|
+ add_step(context, "生成答案", "answer_generation", {
|
|
|
+ "original_question": original_question,
|
|
|
+ "input_notes_count": 0,
|
|
|
+ "result": result
|
|
|
+ })
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ # 构建Agent输入
|
|
|
+ notes_info = []
|
|
|
+ for idx, note in enumerate(satisfied_notes, 1):
|
|
|
+ notes_info.append(f"""
|
|
|
+【帖子 {idx}】
|
|
|
+标题: {note['title']}
|
|
|
+描述: {note['desc']}
|
|
|
+置信度: {note['confidence_score']:.2f}
|
|
|
+""".strip())
|
|
|
+
|
|
|
+ agent_input = f"""
|
|
|
+<原始问题>
|
|
|
+{original_question}
|
|
|
+</原始问题>
|
|
|
+
|
|
|
+<相关帖子>
|
|
|
+{chr(10).join(notes_info)}
|
|
|
+</相关帖子>
|
|
|
+
|
|
|
+请基于以上帖子,为原始问题生成一个全面、准确的答案。
|
|
|
+记得在答案中使用 [1], [2] 等标注引用的帖子序号。
|
|
|
+""".strip()
|
|
|
+
|
|
|
+ print(f"\n📝 调用答案生成Agent...")
|
|
|
+ print(f" - 可用帖子: {len(satisfied_notes)} 个")
|
|
|
+ print(f" - 平均置信度: {sum(n['confidence_score'] for n in satisfied_notes) / len(satisfied_notes):.2f}")
|
|
|
+
|
|
|
+ # 调用Agent生成答案
|
|
|
+ result_run = await Runner.run(answer_generator, agent_input)
|
|
|
+ answer_result: AnswerGeneration = result_run.final_output
|
|
|
+
|
|
|
+ # 提取被引用的帖子详情
|
|
|
+ cited_notes = []
|
|
|
+ for idx in answer_result.cited_note_indices:
|
|
|
+ if 1 <= idx <= len(satisfied_notes):
|
|
|
+ note = satisfied_notes[idx - 1]
|
|
|
+ cited_notes.append({
|
|
|
+ "index": idx,
|
|
|
+ "note_id": note['note_id'],
|
|
|
+ "title": note['title'],
|
|
|
+ "desc": note['desc'],
|
|
|
+ "confidence_score": note['confidence_score'],
|
|
|
+ # ⭐ 完整帖子信息用于可视化
|
|
|
+ "image_list": note.get('image_list', []),
|
|
|
+ "cover_image": note.get('cover_image', {}),
|
|
|
+ "interact_info": note.get('interact_info', {}),
|
|
|
+ "user": note.get('user', {}),
|
|
|
+ "note_url": note.get('note_url', ''),
|
|
|
+ "type": note.get('type', 'normal'),
|
|
|
+ # ⭐ 评估详情
|
|
|
+ "title_relevance": note.get('title_relevance', 0),
|
|
|
+ "content_expectation": note.get('content_expectation', 0),
|
|
|
+ "reason": note.get('reason', '')
|
|
|
+ })
|
|
|
+
|
|
|
+ result = {
|
|
|
+ "answer": answer_result.answer,
|
|
|
+ "cited_note_indices": answer_result.cited_note_indices,
|
|
|
+ "confidence": answer_result.confidence,
|
|
|
+ "summary": answer_result.summary,
|
|
|
+ "cited_notes": cited_notes
|
|
|
+ }
|
|
|
+
|
|
|
+ # 打印结果
|
|
|
+ print(f"\n✅ 答案生成完成")
|
|
|
+ print(f" - 引用帖子数: {len(answer_result.cited_note_indices)} 个")
|
|
|
+ print(f" - 答案置信度: {answer_result.confidence:.2f}")
|
|
|
+ print(f" - 答案摘要: {answer_result.summary}")
|
|
|
+
|
|
|
+ # 记录步骤
|
|
|
+ add_step(context, "生成答案", "answer_generation", {
|
|
|
+ "original_question": original_question,
|
|
|
+ "input_notes_count": len(satisfied_notes),
|
|
|
+ "result": result,
|
|
|
+ "agent_input_preview": agent_input[:500] + "..." if len(agent_input) > 500 else agent_input
|
|
|
+ })
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def find_qualified_queries(evaluation_results: list[dict], min_relevance_score: float = 0.7) -> list[dict]:
|
|
|
+ """
|
|
|
+ 查找所有合格的query(旧函数,保留兼容性)
|
|
|
+
|
|
|
+ 筛选标准:
|
|
|
+ 1. intent_match = True(必须满足)
|
|
|
+ 2. relevance_score >= min_relevance_score
|
|
|
+
|
|
|
+ 返回:按 relevance_score 降序排列
|
|
|
+ """
|
|
|
+ all_qualified = []
|
|
|
+
|
|
|
+ for result in evaluation_results:
|
|
|
+ for eval_item in result.get("evaluations", []):
|
|
|
+ if (eval_item['intent_match'] is True
|
|
|
+ and eval_item['relevance_score'] >= min_relevance_score):
|
|
|
+ all_qualified.append({
|
|
|
+ "from_candidate": result["candidate"],
|
|
|
+ **eval_item
|
|
|
+ })
|
|
|
+
|
|
|
+ # 按relevance_score降序排列
|
|
|
+ return sorted(all_qualified, key=lambda x: x['relevance_score'], reverse=True)
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 主流程
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+async def progressive_exploration(context: RunContext, max_levels: int = 4) -> dict:
|
|
|
+ """
|
|
|
+ 渐进式探索流程 - 使用独立步骤
|
|
|
+
|
|
|
+ 流程:
|
|
|
+ 1. 提取关键词 + 渐进式探索(复用旧流程)
|
|
|
+ 2. 步骤1: 评估候选query的推荐词
|
|
|
+ 3. 步骤2: 搜索合格的推荐词
|
|
|
+ 4. 步骤3: 评估搜索到的帖子
|
|
|
+ 5. 步骤4: 汇总满足需求的帖子
|
|
|
+ 6. 步骤5: 生成答案
|
|
|
+
|
|
|
+ Args:
|
|
|
+ context: 运行上下文
|
|
|
+ max_levels: 最大探索层数,默认4
|
|
|
+
|
|
|
+ 返回格式:
|
|
|
+ {
|
|
|
+ "success": True/False,
|
|
|
+ "final_answer": {...}, # 生成的答案
|
|
|
+ "satisfied_notes": [...], # 满足需求的帖子
|
|
|
+ "message": "..."
|
|
|
+ }
|
|
|
+ """
|
|
|
+
|
|
|
+ # ========== 阶段1:渐进式探索(复用旧流程找到候选query)==========
|
|
|
+
|
|
|
+ # 1.1 提取关键词
|
|
|
+ keyword_result = await extract_keywords(context.q, context)
|
|
|
+ context.keywords = keyword_result.keywords
|
|
|
+
|
|
|
+ # 1.2 渐进式探索各层级
|
|
|
+ current_level = 1
|
|
|
+ candidates_to_evaluate = []
|
|
|
+
|
|
|
+ # Level 1:单个关键词
|
|
|
+ level_1_queries = context.keywords # 使用所有关键词
|
|
|
+ level_1_data = await explore_level(level_1_queries, current_level, context)
|
|
|
+ analysis_1 = await analyze_level(level_1_data, context.exploration_levels, context.q, context)
|
|
|
+
|
|
|
+ if analysis_1.should_evaluate_now:
|
|
|
+ candidates_to_evaluate.extend(analysis_1.candidates_to_evaluate)
|
|
|
+
|
|
|
+ # Level 2及以后:迭代探索
|
|
|
+ for level_num in range(2, max_levels + 1):
|
|
|
+ prev_analysis: LevelAnalysis = context.level_analyses[-1]["analysis"]
|
|
|
+ prev_analysis = LevelAnalysis(**prev_analysis)
|
|
|
+
|
|
|
+ if not prev_analysis.next_combinations:
|
|
|
+ print(f"\nLevel {level_num-1} 分析后无需继续探索")
|
|
|
+ break
|
|
|
+
|
|
|
+ level_data = await explore_level(prev_analysis.next_combinations, level_num, context)
|
|
|
+ analysis = await analyze_level(level_data, context.exploration_levels, context.q, context)
|
|
|
+
|
|
|
+ if analysis.should_evaluate_now:
|
|
|
+ candidates_to_evaluate.extend(analysis.candidates_to_evaluate)
|
|
|
+
|
|
|
+ if not candidates_to_evaluate:
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "final_answer": None,
|
|
|
+ "satisfied_notes": [],
|
|
|
+ "message": "渐进式探索未找到候选query"
|
|
|
+ }
|
|
|
+
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print(f"渐进式探索完成,找到 {len(candidates_to_evaluate)} 个候选query")
|
|
|
+ print(f"{'='*60}")
|
|
|
+
|
|
|
+ # ========== 阶段2:新的独立步骤流程 ==========
|
|
|
+
|
|
|
+ # 步骤1: 评估候选query的推荐词
|
|
|
+ evaluation_results = await step_evaluate_query_suggestions(
|
|
|
+ candidates_to_evaluate,
|
|
|
+ context.q,
|
|
|
+ context
|
|
|
+ )
|
|
|
+
|
|
|
+ # 步骤1.5: 筛选合格的推荐词
|
|
|
+ qualified_queries = step_filter_qualified_queries(
|
|
|
+ evaluation_results,
|
|
|
+ context,
|
|
|
+ min_relevance_score=0.7
|
|
|
+ )
|
|
|
+
|
|
|
+ if not qualified_queries:
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "final_answer": None,
|
|
|
+ "satisfied_notes": [],
|
|
|
+ "message": "没有合格的推荐词"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 步骤2: 搜索合格的推荐词
|
|
|
+ search_results = await step_search_qualified_queries(
|
|
|
+ qualified_queries,
|
|
|
+ context
|
|
|
+ )
|
|
|
+
|
|
|
+ if not search_results.get('searches'):
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "final_answer": None,
|
|
|
+ "satisfied_notes": [],
|
|
|
+ "message": "搜索失败"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 步骤3: 评估搜索到的帖子
|
|
|
+ note_evaluation_data = await step_evaluate_search_notes(
|
|
|
+ search_results,
|
|
|
+ context.q,
|
|
|
+ context
|
|
|
+ )
|
|
|
+
|
|
|
+ # 步骤4: 汇总满足需求的帖子
|
|
|
+ satisfied_notes = step_collect_satisfied_notes(note_evaluation_data)
|
|
|
+
|
|
|
+ if not satisfied_notes:
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "final_answer": None,
|
|
|
+ "satisfied_notes": [],
|
|
|
+ "message": "未找到满足需求的帖子"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 步骤5: 生成答案
|
|
|
+ final_answer = await step_generate_answer(
|
|
|
+ satisfied_notes,
|
|
|
+ context.q,
|
|
|
+ context
|
|
|
+ )
|
|
|
+
|
|
|
+ # ========== 返回最终结果 ==========
|
|
|
+
|
|
|
+ return {
|
|
|
+ "success": True,
|
|
|
+ "final_answer": final_answer,
|
|
|
+ "satisfied_notes": satisfied_notes,
|
|
|
+ "message": f"成功找到 {len(satisfied_notes)} 个满足需求的帖子,并生成答案"
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 输出格式化
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+def format_output(optimization_result: dict, context: RunContext) -> str:
|
|
|
+ """
|
|
|
+ 格式化输出结果 - 用于独立步骤流程
|
|
|
+
|
|
|
+ 包含:
|
|
|
+ - 生成的答案
|
|
|
+ - 引用的帖子详情
|
|
|
+ - 满足需求的帖子统计
|
|
|
+ """
|
|
|
+ final_answer = optimization_result.get("final_answer")
|
|
|
+ satisfied_notes = optimization_result.get("satisfied_notes", [])
|
|
|
+
|
|
|
+ output = f"原始问题:{context.q}\n"
|
|
|
+ output += f"提取的关键词:{', '.join(context.keywords or [])}\n"
|
|
|
+ output += f"探索层数:{len(context.exploration_levels)}\n"
|
|
|
+ output += f"找到满足需求的帖子:{len(satisfied_notes)} 个\n"
|
|
|
+ output += "\n" + "="*60 + "\n"
|
|
|
+
|
|
|
+ if final_answer:
|
|
|
+ output += "【生成的答案】\n\n"
|
|
|
+ output += final_answer.get("answer", "")
|
|
|
+ output += "\n\n" + "="*60 + "\n"
|
|
|
+
|
|
|
+ output += f"答案置信度:{final_answer.get('confidence', 0):.2f}\n"
|
|
|
+ output += f"答案摘要:{final_answer.get('summary', '')}\n"
|
|
|
+ output += f"引用帖子数:{len(final_answer.get('cited_note_indices', []))} 个\n"
|
|
|
+ output += "\n" + "="*60 + "\n"
|
|
|
+
|
|
|
+ output += "【引用的帖子详情】\n\n"
|
|
|
+ for cited_note in final_answer.get("cited_notes", []):
|
|
|
+ output += f"[{cited_note['index']}] {cited_note['title']}\n"
|
|
|
+ output += f" 置信度: {cited_note['confidence_score']:.2f}\n"
|
|
|
+ output += f" 描述: {cited_note['desc']}\n"
|
|
|
+ output += f" note_id: {cited_note['note_id']}\n\n"
|
|
|
+ else:
|
|
|
+ output += "未能生成答案\n"
|
|
|
+
|
|
|
+ return output
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================================
|
|
|
+# 主函数
|
|
|
+# ============================================================================
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+async def main(input_dir: str, max_levels: int = 4, visualize: bool = False):
|
|
|
+ """
|
|
|
+ 主函数 - 使用独立步骤流程(方案A)
|
|
|
+ """
|
|
|
+ current_time, log_url = set_trace()
|
|
|
+
|
|
|
+ # 从目录中读取固定文件名
|
|
|
+ input_context_file = os.path.join(input_dir, 'context.md')
|
|
|
+ input_q_file = os.path.join(input_dir, 'q.md')
|
|
|
+
|
|
|
+ q_context = read_file_as_string(input_context_file)
|
|
|
+ q = read_file_as_string(input_q_file)
|
|
|
+ q_with_context = f"""
|
|
|
+<需求上下文>
|
|
|
+{q_context}
|
|
|
+</需求上下文>
|
|
|
+<当前问题>
|
|
|
+{q}
|
|
|
+</当前问题>
|
|
|
+""".strip()
|
|
|
+
|
|
|
+ # 获取当前文件名作为版本
|
|
|
+ version = os.path.basename(__file__)
|
|
|
+ version_name = os.path.splitext(version)[0]
|
|
|
+
|
|
|
+ # 日志保存目录
|
|
|
+ log_dir = os.path.join(input_dir, "output", version_name, current_time)
|
|
|
+
|
|
|
+ run_context = RunContext(
|
|
|
+ version=version,
|
|
|
+ input_files={
|
|
|
+ "input_dir": input_dir,
|
|
|
+ "context_file": input_context_file,
|
|
|
+ "q_file": input_q_file,
|
|
|
+ },
|
|
|
+ q_with_context=q_with_context,
|
|
|
+ q_context=q_context,
|
|
|
+ q=q,
|
|
|
+ log_dir=log_dir,
|
|
|
+ log_url=log_url,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 执行渐进式探索
|
|
|
+ optimization_result = await progressive_exploration(run_context, max_levels=max_levels)
|
|
|
+
|
|
|
+ # 格式化输出
|
|
|
+ final_output = format_output(optimization_result, run_context)
|
|
|
+ print(f"\n{'='*60}")
|
|
|
+ print("最终结果")
|
|
|
+ print(f"{'='*60}")
|
|
|
+ print(final_output)
|
|
|
+
|
|
|
+ # 保存结果
|
|
|
+ run_context.optimization_result = optimization_result
|
|
|
+ run_context.final_output = final_output
|
|
|
+
|
|
|
+ # 记录最终输出步骤(新格式)
|
|
|
+ final_answer = optimization_result.get("final_answer")
|
|
|
+ satisfied_notes = optimization_result.get("satisfied_notes", [])
|
|
|
+
|
|
|
+ add_step(run_context, "生成最终结果", "final_result", {
|
|
|
+ "success": optimization_result["success"],
|
|
|
+ "message": optimization_result["message"],
|
|
|
+ "satisfied_notes_count": len(satisfied_notes),
|
|
|
+ "final_answer": final_answer,
|
|
|
+ "satisfied_notes_summary": [
|
|
|
+ {
|
|
|
+ "note_id": note["note_id"],
|
|
|
+ "title": note["title"],
|
|
|
+ "confidence_score": note["confidence_score"]
|
|
|
+ }
|
|
|
+ for note in satisfied_notes # 保存所有满足条件的帖子摘要
|
|
|
+ ] if satisfied_notes else [],
|
|
|
+ "final_output": final_output
|
|
|
+ })
|
|
|
+
|
|
|
+ # 保存 RunContext 到 log_dir(不包含 steps,steps 单独保存)
|
|
|
+ os.makedirs(run_context.log_dir, exist_ok=True)
|
|
|
+ context_file_path = os.path.join(run_context.log_dir, "run_context.json")
|
|
|
+ context_dict = run_context.model_dump()
|
|
|
+ context_dict.pop('steps', None) # 移除 steps,避免数据冗余
|
|
|
+ with open(context_file_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(context_dict, f, ensure_ascii=False, indent=2)
|
|
|
+ print(f"\nRunContext saved to: {context_file_path}")
|
|
|
+
|
|
|
+ # 保存步骤化日志
|
|
|
+ steps_file_path = os.path.join(run_context.log_dir, "steps.json")
|
|
|
+ with open(steps_file_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(run_context.steps, f, ensure_ascii=False, indent=2)
|
|
|
+ print(f"Steps log saved to: {steps_file_path}")
|
|
|
+
|
|
|
+ # 如果需要生成可视化
|
|
|
+ if visualize:
|
|
|
+ import subprocess
|
|
|
+ output_html = os.path.join(run_context.log_dir, "visualization.html")
|
|
|
+ print(f"\n🎨 生成可视化HTML...")
|
|
|
+ result = subprocess.run([
|
|
|
+ "python", "sug_v6_1_2_3.visualize.py",
|
|
|
+ steps_file_path,
|
|
|
+ "-o", output_html
|
|
|
+ ])
|
|
|
+ if result.returncode == 0:
|
|
|
+ print(f"✅ 可视化已生成: {output_html}")
|
|
|
+ else:
|
|
|
+ print(f"❌ 可视化生成失败")
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.3 独立步骤+答案生成版")
|
|
|
+ parser.add_argument(
|
|
|
+ "--input-dir",
|
|
|
+ type=str,
|
|
|
+ default="input/简单扣图",
|
|
|
+ help="输入目录路径,默认: input/简单扣图"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--max-levels",
|
|
|
+ type=int,
|
|
|
+ default=10,
|
|
|
+ help="最大探索层数,默认: 10"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--visualize",
|
|
|
+ action="store_true",
|
|
|
+ default=True,
|
|
|
+ help="运行完成后自动生成可视化HTML(默认开启)"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--no-visualize",
|
|
|
+ action="store_false",
|
|
|
+ dest="visualize",
|
|
|
+ help="关闭自动生成可视化"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--visualize-only",
|
|
|
+ action="store_true",
|
|
|
+ help="只生成可视化,不运行搜索流程。自动查找input-dir下最新的输出目录"
|
|
|
+ )
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ # 如果只是生成可视化
|
|
|
+ if args.visualize_only:
|
|
|
+ import subprocess
|
|
|
+ import glob
|
|
|
+
|
|
|
+ # 获取版本名称
|
|
|
+ version_name = os.path.splitext(os.path.basename(__file__))[0]
|
|
|
+ output_base = os.path.join(args.input_dir, "output", version_name)
|
|
|
+
|
|
|
+ # 查找最新的输出目录
|
|
|
+ if not os.path.exists(output_base):
|
|
|
+ print(f"❌ 找不到输出目录: {output_base}")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ # 获取所有日期目录
|
|
|
+ date_dirs = glob.glob(os.path.join(output_base, "*", "*"))
|
|
|
+ if not date_dirs:
|
|
|
+ print(f"❌ 在 {output_base} 中没有找到输出目录")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ # 按修改时间排序,获取最新的
|
|
|
+ latest_dir = max(date_dirs, key=os.path.getmtime)
|
|
|
+ steps_json = os.path.join(latest_dir, "steps.json")
|
|
|
+
|
|
|
+ if not os.path.exists(steps_json):
|
|
|
+ print(f"❌ 找不到 steps.json: {steps_json}")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ output_html = os.path.join(latest_dir, "visualization.html")
|
|
|
+ print(f"🎨 找到最新输出目录: {latest_dir}")
|
|
|
+ print(f"🎨 生成可视化: {steps_json} -> {output_html}")
|
|
|
+
|
|
|
+ result = subprocess.run([
|
|
|
+ "python", "sug_v6_1_2_3.visualize.py",
|
|
|
+ steps_json,
|
|
|
+ "-o", output_html
|
|
|
+ ])
|
|
|
+ sys.exit(result.returncode)
|
|
|
+
|
|
|
+ asyncio.run(main(args.input_dir, max_levels=args.max_levels, visualize=args.visualize))
|