import asyncio import json import os import sys import argparse from datetime import datetime from agents import Agent, Runner from lib.my_trace import set_trace from typing import Literal from pydantic import BaseModel, Field from lib.utils import read_file_as_string from lib.client import get_model MODEL_NAME = "google/gemini-2.5-flash" from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from script.search.xiaohongshu_search import XiaohongshuSearch class RunContext(BaseModel): version: str = Field(..., description="当前运行的脚本版本(文件名)") input_files: dict[str, str] = Field(..., description="输入文件路径映射") q_with_context: str q_context: str q: str log_url: str log_dir: str # 步骤化日志 steps: list[dict] = Field(default_factory=list, description="执行步骤的详细记录") # 探索阶段记录(保留用于向后兼容) keywords: list[str] | None = Field(default=None, description="提取的关键词") exploration_levels: list[dict] = Field(default_factory=list, description="每一层的探索结果") level_analyses: list[dict] = Field(default_factory=list, description="每一层的主Agent分析") # 最终结果 final_candidates: list[str] | None = Field(default=None, description="最终选出的候选query") evaluation_results: list[dict] | None = Field(default=None, description="候选query的评估结果") optimization_result: dict | None = Field(default=None, description="最终优化结果对象") final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)") # ============================================================================ # Agent 1: 关键词提取专家 # ============================================================================ keyword_extraction_instructions = """ 你是关键词提取专家。给定一个搜索问题(含上下文),提取出**最细粒度的关键概念**。 ## 提取原则 1. **细粒度优先**:拆分成最小的有意义单元 - 不要保留完整的长句 - 拆分成独立的、有搜索意义的词或短语 2. **保留核心维度**: - 地域/对象 - 时间 - 行为/意图:获取、教程、推荐、如何等 - 主题/领域 - 质量/属性 3. **去掉无意义的虚词**:的、吗、呢等 4. **保留领域专有词**:不要过度拆分专业术语 - 如果是常见的组合词,保持完整 ## 输出要求 输出关键词列表,按重要性排序(最核心的在前)。 """.strip() class KeywordList(BaseModel): """关键词列表""" keywords: list[str] = Field(..., description="提取的关键词,按重要性排序") reasoning: str = Field(..., description="提取理由") keyword_extractor = Agent[None]( name="关键词提取专家", instructions=keyword_extraction_instructions, model=get_model(MODEL_NAME), output_type=KeywordList, ) # ============================================================================ # Agent 2: 层级探索分析专家 # ============================================================================ level_analysis_instructions = """ 你是搜索空间探索分析专家。基于当前层级的探索结果,决定下一步行动。 ## 你的任务 分析当前已探索的词汇空间,判断: 1. **发现了什么有价值的信号?** 2. **是否已经可以评估候选了?** 3. **如果还不够,下一层应该探索什么组合?** ## 分析维度 ### 1. 信号识别(最重要) 看推荐词里**出现了什么主题**: **关键问题:** - 哪些推荐词**最接近原始需求**? - 哪些推荐词**揭示了有价值的方向**(即使不完全匹配)? - 哪些推荐词可以作为**下一层探索的桥梁**? - 系统对哪些概念理解得好?哪些理解偏了? ### 2. 组合策略 基于发现的信号,设计下一层探索: **组合类型:** a) **关键词直接组合** - 两个关键词组合成新query b) **利用推荐词作为桥梁**(重要!) - 发现某个推荐词很有价值 → 直接探索这个推荐词 - 或在推荐词基础上加其他关键词 c) **跨层级组合** - 结合多层发现的有价值推荐词 - 组合成更复杂的query ### 3. 停止条件 **何时可以评估候选?** 满足以下之一: - 推荐词中出现了**明确包含原始需求多个核心要素的query** - 已经探索到**足够复杂的组合**(3-4个关键词),且推荐词相关 - 探索了**3-4层**,信息已经足够丰富 **何时继续探索?** - 当前推荐词太泛,没有接近原始需求 - 发现了有价值的信号,但需要进一步组合验证 - 层数还少(< 3层) ## 输出要求 ### 1. key_findings 总结当前层发现的关键信息,包括: - 哪些推荐词最有价值? - 系统对哪些概念理解得好/不好? - 发现了什么意外的方向? ### 2. promising_signals 列出最有价值的推荐词(来自任何已探索的query),每个说明为什么有价值 ### 3. should_evaluate_now 是否已经可以开始评估候选了?true/false ### 4. candidates_to_evaluate 如果should_evaluate_now=true,列出应该评估的候选query - 可以是推荐词 - 可以是自己构造的组合 ### 5. next_combinations 如果should_evaluate_now=false,列出下一层应该探索的query组合 ### 6. reasoning 详细的推理过程 ## 重要原则 1. **不要过早评估**:至少探索2层,除非第一层就发现了完美匹配 2. **充分利用推荐词**:推荐词是系统给的提示,要善用 3. **保持探索方向的多样性**:不要只盯着一个方向 4. **识别死胡同**:如果某个方向的推荐词一直不相关,果断放弃 """.strip() class PromisingSignal(BaseModel): """有价值的推荐词信号""" query: str = Field(..., description="推荐词") from_level: int = Field(..., description="来自哪一层") reason: str = Field(..., description="为什么有价值") class LevelAnalysis(BaseModel): """层级分析结果""" key_findings: str = Field(..., description="当前层的关键发现") promising_signals: list[PromisingSignal] = Field(..., description="有价值的推荐词信号") should_evaluate_now: bool = Field(..., description="是否应该开始评估候选") candidates_to_evaluate: list[str] = Field(default_factory=list, description="如果should_evaluate_now=true,要评估的候选query列表") next_combinations: list[str] = Field(default_factory=list, description="如果should_evaluate_now=false,下一层要探索的query组合") reasoning: str = Field(..., description="详细的推理过程") level_analyzer = Agent[None]( name="层级探索分析专家", instructions=level_analysis_instructions, model=get_model(MODEL_NAME), output_type=LevelAnalysis, ) # ============================================================================ # Agent 3: 评估专家(简化版:意图匹配 + 相关性评分) # ============================================================================ eval_instructions = """ 你是搜索query评估专家。给定原始问题和推荐query,评估两个维度。 ## 评估目标 用这个推荐query搜索,能否找到满足原始需求的内容? ## 两层评分 ### 1. intent_match(意图匹配)= true/false 推荐query的**使用意图**是否与原问题一致? **核心问题:用户搜索这个推荐词,想做什么?** **判断标准:** - 原问题意图:找方法?找教程?找资源/素材?找工具?看作品? - 推荐词意图:如果用户搜索这个词,他的目的是什么? **示例:** - 原问题意图="找素材" - ✅ true: "素材下载"、"素材网站"、"免费素材"(都是获取素材) - ❌ false: "素材制作教程"、"如何制作素材"(意图变成学习了) - 原问题意图="学教程" - ✅ true: "教程视频"、"教学步骤"、"入门指南" - ❌ false: "成品展示"、"作品欣赏"(意图变成看作品了) **评分:** - true = 意图一致,搜索推荐词能达到原问题的目的 - false = 意图改变,搜索推荐词无法达到原问题的目的 ### 2. relevance_score(相关性)= 0-1 连续分数 推荐query在**主题、要素、属性**上与原问题的相关程度? **评估维度:** - 主题相关:核心主题是否匹配?(如:摄影、旅游、美食) - 要素覆盖:关键要素保留了多少?(如:地域、时间、对象、工具) - 属性匹配:质量、风格、特色等属性是否保留? **评分参考:** - 0.9-1.0 = 几乎完美匹配,所有核心要素都保留 - 0.7-0.8 = 高度相关,核心要素保留,少数次要要素缺失 - 0.5-0.6 = 中度相关,主题匹配但多个要素缺失 - 0.3-0.4 = 低度相关,只有部分主题相关 - 0-0.2 = 基本不相关 ## 评估策略 1. **先判断 intent_match**:意图不匹配直接 false,无论相关性多高 2. **再评估 relevance_score**:在意图匹配的前提下,计算相关性 ## 输出要求 - intent_match: true/false - relevance_score: 0-1 的浮点数 - reason: 详细的评估理由,需要说明: - 原问题的意图是什么 - 推荐词的意图是什么 - 为什么判断意图匹配/不匹配 - 相关性分数的依据(哪些要素保留/缺失) """.strip() class RelevanceEvaluation(BaseModel): """评估反馈模型 - 意图匹配 + 相关性""" intent_match: bool = Field(..., description="意图是否匹配") relevance_score: float = Field(..., description="相关性分数 0-1,分数越高越相关") reason: str = Field(..., description="评估理由,需说明意图判断和相关性依据") evaluator = Agent[None]( name="评估专家", instructions=eval_instructions, model=get_model(MODEL_NAME), output_type=RelevanceEvaluation, ) # ============================================================================ # Agent 4: 单个帖子需求满足度评估专家 # ============================================================================ note_evaluation_instructions = """ 你是帖子需求满足度评估专家。给定原始问题和一个搜索到的帖子(标题+描述),判断这个帖子能否满足用户的需求。 ## 你的任务 评估单个帖子的标题和描述,判断用户点开这个帖子后,能否找到满足原始需求的内容。 ## 评估维度 ### 1. 标题相关性(title_relevance)= 0-1 连续分数 **评估标准:** - 标题是否与原始问题的主题相关? - 标题是否包含原始问题的关键要素? - 标题是否表明内容能解决用户的问题? **评分参考:** - 0.9-1.0 = 标题高度相关,明确表明能解决问题 - 0.7-0.8 = 标题相关,包含核心要素 - 0.5-0.6 = 标题部分相关,有关联但不明确 - 0.3-0.4 = 标题相关性较低 - 0-0.2 = 标题基本不相关 ### 2. 内容预期(content_expectation)= 0-1 连续分数 **评估标准:** - 从描述看,内容是否可能包含用户需要的信息? - 描述是否展示了相关的要素或细节? - 描述的方向是否与用户需求一致? **评分参考:** - 0.9-1.0 = 描述明确表明内容高度符合需求 - 0.7-0.8 = 描述显示内容可能符合需求 - 0.5-0.6 = 描述有一定相关性,但不确定 - 0.3-0.4 = 描述相关性较低 - 0-0.2 = 描述基本不相关 ### 3. 需求满足度(need_satisfaction)= true/false **核心问题:用户点开这个帖子后,能否找到他需要的内容?** **判断标准:** - 综合标题和描述,内容是否大概率能满足需求? - 如果 title_relevance >= 0.7 且 content_expectation >= 0.6,一般判断为 true - 否则判断为 false ### 4. 综合置信度(confidence_score)= 0-1 连续分数 **计算方式:** - 可以是 title_relevance 和 content_expectation 的加权平均 - 标题权重通常更高(如 0.6 * title + 0.4 * content) ## 输出要求 - title_relevance: 0-1 的浮点数 - content_expectation: 0-1 的浮点数 - need_satisfaction: true/false - confidence_score: 0-1 的浮点数 - reason: 详细的评估理由,需要说明: - 标题与原问题的相关性分析 - 描述的内容预期分析 - 为什么判断能/不能满足需求 - 置信度分数的依据 ## 重要原则 1. **独立评估**:只评估这一个帖子,不考虑其他帖子 2. **用户视角**:问"我会点开这个帖子吗?点开后能找到答案吗?" 3. **标题优先**:标题是用户决定是否点击的关键 4. **保守判断**:不确定时,倾向于给较低的分数 """.strip() class NoteEvaluation(BaseModel): """单个帖子评估模型""" title_relevance: float = Field(..., description="标题相关性 0-1") content_expectation: float = Field(..., description="内容预期 0-1") need_satisfaction: bool = Field(..., description="是否满足需求") confidence_score: float = Field(..., description="综合置信度 0-1") reason: str = Field(..., description="详细的评估理由") note_evaluator = Agent[None]( name="帖子需求满足度评估专家", instructions=note_evaluation_instructions, model=get_model(MODEL_NAME), output_type=NoteEvaluation, ) # ============================================================================ # Agent 5: 答案生成专家 # ============================================================================ answer_generation_instructions = """ 你是答案生成专家。基于一组满足需求的帖子,为原始问题生成一个全面、准确、有价值的答案。 ## 你的任务 根据用户的原始问题和一组相关帖子(包含标题、描述、置信度评分),生成一个高质量的答案。 ## 输入信息 1. **原始问题**:用户提出的具体问题 2. **相关帖子列表**:每个帖子包含 - 序号(索引) - 标题 - 描述 - 置信度分数 ## 答案要求 ### 1. 内容要求 - **直接回答问题**:开门见山,第一段就给出核心答案 - **结构清晰**:使用标题、列表、分段等组织内容 - **综合多个来源**:整合多个帖子的信息,不要只依赖一个 - **信息准确**:基于帖子内容,不要编造信息 - **实用性**:提供可操作的建议或具体的信息 ### 2. 引用规范 - **必须标注来源**:每个关键信息都要标注帖子索引 - **引用格式**:使用 `[1]`、`[2]` 等标注帖子序号 - **多来源引用**:如果多个帖子支持同一观点,使用 `[1,2,3]` - **引用位置**:在相关句子或段落的末尾标注 ### 3. 置信度使用 - **优先高置信度**:优先引用置信度高的帖子 - **交叉验证**:如果多个帖子提到相同信息,可以提高可信度 - **标注不确定性**:如果信息来自低置信度帖子,适当标注 ### 4. 答案结构建议 ``` 【核心答案】 直接回答用户的问题,给出最核心的信息。[引用] 【详细说明】 1. 第一个方面/要点 [引用] - 具体内容 - 相关细节 2. 第二个方面/要点 [引用] - 具体内容 - 相关细节 【补充建议/注意事项】(可选) 其他有价值的信息或提醒。[引用] 【参考帖子】 列出所有引用的帖子编号和标题。 ``` ## 输出格式 { "answer": "生成的答案内容(Markdown格式)", "cited_note_indices": [1, 2, 3], # 引用的帖子序号列表 "confidence": 0.85, # 答案的整体置信度 (0-1) "summary": "一句话总结答案的核心内容" } ## 重要原则 1. **忠于原文**:不要添加帖子中没有的信息 2. **引用透明**:让用户知道每个信息来自哪个帖子 3. **综合性**:尽可能整合多个帖子的信息 4. **可读性**:使用清晰的Markdown格式 5. **质量优先**:如果帖子质量不够,可以说明信息有限 """.strip() class AnswerGeneration(BaseModel): """答案生成模型""" answer: str = Field(..., description="生成的答案内容(Markdown格式)") cited_note_indices: list[int] = Field(..., description="引用的帖子序号列表") confidence: float = Field(..., description="答案的整体置信度 0-1") summary: str = Field(..., description="一句话总结答案的核心内容") answer_generator = Agent[None]( name="答案生成专家", instructions=answer_generation_instructions, model=get_model(MODEL_NAME), output_type=AnswerGeneration, ) # ============================================================================ # 日志辅助函数 # ============================================================================ def add_step(context: RunContext, step_name: str, step_type: str, data: dict): """添加步骤记录""" step = { "step_number": len(context.steps) + 1, "step_name": step_name, "step_type": step_type, "timestamp": datetime.now().isoformat(), "data": data } context.steps.append(step) return step def process_note_data(note: dict) -> dict: """ 处理搜索接口返回的帖子数据,标准化为统一格式 Args: note: 搜索接口返回的原始帖子数据 Returns: 标准化后的帖子数据字典 """ note_card = note.get("note_card", {}) image_list = note_card.get("image_list", []) # 已在搜索API层预处理为URL字符串列表 interact_info = note_card.get("interact_info", {}) user_info = note_card.get("user", {}) return { "note_id": note.get("id", ""), "title": note_card.get("display_title", ""), "desc": note_card.get("desc", ""), "image_list": image_list, # 第一张就是封面,已在XiaohongshuSearch.search()中预处理为URL字符串列表 "interact_info": { "liked_count": interact_info.get("liked_count", 0), "collected_count": interact_info.get("collected_count", 0), "comment_count": interact_info.get("comment_count", 0), "shared_count": interact_info.get("shared_count", 0) }, "user": { "nickname": user_info.get("nickname", ""), "user_id": user_info.get("user_id", "") }, "type": note_card.get("type", "normal"), "note_url": f"https://www.xiaohongshu.com/explore/{note.get('id', '')}" } # ============================================================================ # 核心函数 # ============================================================================ async def extract_keywords(q: str, context: RunContext) -> KeywordList: """提取关键词""" print("\n[步骤 1] 正在提取关键词...") result = await Runner.run(keyword_extractor, q) keyword_list: KeywordList = result.final_output print(f"提取的关键词:{keyword_list.keywords}") print(f"提取理由:{keyword_list.reasoning}") # 记录步骤 add_step(context, "提取关键词", "keyword_extraction", { "input_question": q, "keywords": keyword_list.keywords, "reasoning": keyword_list.reasoning }) return keyword_list async def explore_level(queries: list[str], level_num: int, context: RunContext) -> dict: """探索一个层级(并发获取所有query的推荐词)""" step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] Level {level_num} 探索:{len(queries)} 个query") print(f"{'='*60}") xiaohongshu_api = XiaohongshuSearchRecommendations() # 并发获取所有推荐词 async def get_single_sug(query: str): print(f" 探索: {query}") suggestions = xiaohongshu_api.get_recommendations(keyword=query) print(f" → {len(suggestions) if suggestions else 0} 个推荐词") return { "query": query, "suggestions": suggestions or [] } results = await asyncio.gather(*[get_single_sug(q) for q in queries]) level_data = { "level": level_num, "timestamp": datetime.now().isoformat(), "queries": results } context.exploration_levels.append(level_data) # 记录步骤 add_step(context, f"Level {level_num} 探索", "level_exploration", { "level": level_num, "input_queries": queries, "query_count": len(queries), "results": results, "total_suggestions": sum(len(r['suggestions']) for r in results) }) return level_data async def analyze_level(level_data: dict, all_levels: list[dict], original_question: str, context: RunContext) -> LevelAnalysis: """分析当前层级,决定下一步""" step_num = len(context.steps) + 1 print(f"\n[步骤 {step_num}] 正在分析 Level {level_data['level']}...") # 构造输入 analysis_input = f""" <原始问题> {original_question} <已探索的所有层级> {json.dumps(all_levels, ensure_ascii=False, indent=2)} <当前层级> Level {level_data['level']} {json.dumps(level_data['queries'], ensure_ascii=False, indent=2)} 请分析当前探索状态,决定下一步行动。 """ result = await Runner.run(level_analyzer, analysis_input) analysis: LevelAnalysis = result.final_output print(f"\n分析结果:") print(f" 关键发现:{analysis.key_findings}") print(f" 有价值的信号:{len(analysis.promising_signals)} 个") print(f" 是否评估:{analysis.should_evaluate_now}") if analysis.should_evaluate_now: print(f" 候选query:{analysis.candidates_to_evaluate}") else: print(f" 下一层探索:{analysis.next_combinations}") # 保存分析结果 context.level_analyses.append({ "level": level_data['level'], "timestamp": datetime.now().isoformat(), "analysis": analysis.model_dump() }) # 记录步骤 add_step(context, f"Level {level_data['level']} 分析", "level_analysis", { "level": level_data['level'], "key_findings": analysis.key_findings, "promising_signals_count": len(analysis.promising_signals), "promising_signals": [s.model_dump() for s in analysis.promising_signals], "should_evaluate_now": analysis.should_evaluate_now, "candidates_to_evaluate": analysis.candidates_to_evaluate if analysis.should_evaluate_now else [], "next_combinations": analysis.next_combinations if not analysis.should_evaluate_now else [], "reasoning": analysis.reasoning }) return analysis async def evaluate_candidates(candidates: list[str], original_question: str, context: RunContext) -> list[dict]: """评估候选query(含实际搜索验证)""" step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] 评估 {len(candidates)} 个候选query") print(f"{'='*60}") xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() # 创建搜索结果保存目录 search_results_dir = os.path.join(context.log_dir, "search_results") os.makedirs(search_results_dir, exist_ok=True) async def evaluate_single_candidate(candidate: str, candidate_index: int): print(f"\n评估候选:{candidate}") # 为当前候选创建子目录 # 清理候选名称,移除不适合作为目录名的字符 safe_candidate_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in candidate) candidate_dir = os.path.join(search_results_dir, f"candidate_{candidate_index+1}_{safe_candidate_name[:50]}") os.makedirs(candidate_dir, exist_ok=True) # 1. 获取推荐词 suggestions = xiaohongshu_api.get_recommendations(keyword=candidate) print(f" 获取到 {len(suggestions) if suggestions else 0} 个推荐词") if not suggestions: return { "candidate": candidate, "suggestions": [], "evaluations": [] } # 2. 评估每个推荐词(意图匹配 + 相关性) async def eval_single_sug(sug: str, sug_index: int): # 2.1 先进行意图和相关性评估 eval_input = f""" <原始问题> {original_question} <待评估的推荐query> {sug} 请评估该推荐query: 1. intent_match: 意图是否匹配(true/false) 2. relevance_score: 相关性分数(0-1) 3. reason: 详细的评估理由 """ result = await Runner.run(evaluator, eval_input) evaluation: RelevanceEvaluation = result.final_output eval_result = { "query": sug, "intent_match": evaluation.intent_match, "relevance_score": evaluation.relevance_score, "reason": evaluation.reason, } # 2.2 如果意图匹配且相关性足够高,进行实际搜索验证 if evaluation.intent_match and evaluation.relevance_score >= 0.7: print(f" → 合格候选,进行实际搜索验证: {sug}") try: search_result = xiaohongshu_search.search(keyword=sug) # 解析result字段(它是JSON字符串,需要先解析) result_str = search_result.get("result", "{}") if isinstance(result_str, str): result_data = json.loads(result_str) else: result_data = result_str # 格式化搜索结果:将result字段解析后再保存 formatted_search_result = { "success": search_result.get("success"), "result": result_data, # 保存解析后的数据 "tool_name": search_result.get("tool_name"), "call_type": search_result.get("call_type"), "query": sug, "timestamp": datetime.now().isoformat() } # 保存格式化后的搜索结果到文件 safe_sug_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in sug) search_result_file = os.path.join(candidate_dir, f"sug_{sug_index+1}_{safe_sug_name[:30]}.json") with open(search_result_file, 'w', encoding='utf-8') as f: json.dump(formatted_search_result, f, ensure_ascii=False, indent=2) print(f" 搜索结果已保存: {os.path.basename(search_result_file)}") # 提取搜索结果的标题和描述 # 正确的数据路径: result.data.data[] notes = result_data.get("data", {}).get("data", []) if notes: print(f" 开始评估 {len(notes)} 个帖子...") # 对每个帖子进行独立评估 note_evaluations = [] for note_idx, note in enumerate(notes, 1): # 评估所有帖子 note_card = note.get("note_card", {}) title = note_card.get("display_title", "") desc = note_card.get("desc", "") note_id = note.get("id", "") # 构造评估输入 eval_input = f""" <原始问题> {original_question} <帖子信息> 标题: {title} 描述: {desc} 请评估这个帖子能否满足用户需求。 """ # 调用评估Agent eval_result_run = await Runner.run(note_evaluator, eval_input) note_eval: NoteEvaluation = eval_result_run.final_output note_evaluation_record = { "note_index": note_idx, "note_id": note_id, "title": title, "desc": desc, # 保存完整描述 "evaluation": { "title_relevance": note_eval.title_relevance, "content_expectation": note_eval.content_expectation, "need_satisfaction": note_eval.need_satisfaction, "confidence_score": note_eval.confidence_score, "reason": note_eval.reason } } note_evaluations.append(note_evaluation_record) # 简单打印进度 if note_idx % 3 == 0 or note_idx == len(notes): print(f" 已评估 {note_idx}/{len(notes)} 个帖子") # 统计满足需求的帖子数量 satisfied_count = sum(1 for ne in note_evaluations if ne["evaluation"]["need_satisfaction"]) avg_confidence = sum(ne["evaluation"]["confidence_score"] for ne in note_evaluations) / len(note_evaluations) if note_evaluations else 0 eval_result["search_verification"] = { "total_notes": len(notes), "evaluated_notes": len(note_evaluations), "satisfied_count": satisfied_count, "average_confidence": round(avg_confidence, 2), "note_evaluations": note_evaluations, "search_result_file": search_result_file } print(f" 评估完成: {satisfied_count}/{len(note_evaluations)} 个帖子满足需求, " f"平均置信度={avg_confidence:.2f}") else: eval_result["search_verification"] = { "total_notes": 0, "evaluated_notes": 0, "satisfied_count": 0, "average_confidence": 0.0, "note_evaluations": [], "search_result_file": search_result_file, "reason": "搜索无结果" } print(f" 搜索无结果") except Exception as e: print(f" 搜索验证出错: {e}") eval_result["search_verification"] = { "error": str(e) } return eval_result evaluations = await asyncio.gather(*[eval_single_sug(s, i) for i, s in enumerate(suggestions)]) return { "candidate": candidate, "suggestions": suggestions, "evaluations": evaluations } results = await asyncio.gather(*[evaluate_single_candidate(c, i) for i, c in enumerate(candidates)]) # 生成搜索结果汇总文件 summary_data = { "original_question": original_question, "timestamp": datetime.now().isoformat(), "total_candidates": len(candidates), "candidates": [] } for i, result in enumerate(results): candidate_summary = { "index": i + 1, "candidate": result["candidate"], "suggestions_count": len(result["suggestions"]), "verified_queries": [] } for eval_item in result.get("evaluations", []): if "search_verification" in eval_item and "search_result_file" in eval_item["search_verification"]: sv = eval_item["search_verification"] candidate_summary["verified_queries"].append({ "query": eval_item["query"], "intent_match": eval_item["intent_match"], "relevance_score": eval_item["relevance_score"], "verification": { "total_notes": sv.get("total_notes", 0), "evaluated_notes": sv.get("evaluated_notes", 0), "satisfied_count": sv.get("satisfied_count", 0), "average_confidence": sv.get("average_confidence", 0.0) }, "search_result_file": sv["search_result_file"] }) summary_data["candidates"].append(candidate_summary) # 保存汇总文件 summary_file = os.path.join(search_results_dir, "summary.json") with open(summary_file, 'w', encoding='utf-8') as f: json.dump(summary_data, f, ensure_ascii=False, indent=2) print(f"\n搜索结果汇总已保存: {summary_file}") context.evaluation_results = results # 构建详细的步骤记录数据 step_data = { "candidate_count": len(candidates), "candidates": candidates, "total_evaluations": sum(len(r['evaluations']) for r in results), "verified_queries": sum( 1 for r in results for e in r.get('evaluations', []) if 'search_verification' in e ), "search_results_dir": search_results_dir, "summary_file": summary_file, "detailed_results": [] } # 为每个候选记录详细信息 for result in results: candidate_detail = { "candidate": result["candidate"], "suggestions": result["suggestions"], "evaluations": [] } for eval_item in result.get("evaluations", []): eval_detail = { "query": eval_item["query"], "intent_match": eval_item["intent_match"], "relevance_score": eval_item["relevance_score"], "reason": eval_item["reason"] } # 如果有搜索验证,添加详细信息 if "search_verification" in eval_item: verification = eval_item["search_verification"] eval_detail["search_verification"] = { "performed": True, "total_notes": verification.get("total_notes", 0), "evaluated_notes": verification.get("evaluated_notes", 0), "satisfied_count": verification.get("satisfied_count", 0), "average_confidence": verification.get("average_confidence", 0.0), "search_result_file": verification.get("search_result_file"), "has_error": "error" in verification } if "error" in verification: eval_detail["search_verification"]["error"] = verification["error"] # 保存每个帖子的评估详情 if "note_evaluations" in verification: eval_detail["search_verification"]["note_evaluations"] = verification["note_evaluations"] else: eval_detail["search_verification"] = { "performed": False, "reason": "未达到搜索验证阈值(intent_match=False 或 relevance_score<0.7)" } candidate_detail["evaluations"].append(eval_detail) step_data["detailed_results"].append(candidate_detail) # 记录步骤 add_step(context, "评估候选query", "candidate_evaluation", step_data) return results # ============================================================================ # 新的独立步骤函数(方案A) # ============================================================================ async def step_evaluate_query_suggestions(candidates: list[str], original_question: str, context: RunContext) -> list[dict]: """ 步骤1: 评估候选query的推荐词 输入: - candidates: 候选query列表 - original_question: 原始问题 - context: 运行上下文 输出: - 每个候选的评估结果列表,包含: - candidate: 候选query - suggestions: 推荐词列表 - evaluations: 每个推荐词的意图匹配和相关性评分 """ step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] 评估 {len(candidates)} 个候选query的推荐词") print(f"{'='*60}") xiaohongshu_api = XiaohongshuSearchRecommendations() async def evaluate_single_candidate(candidate: str): print(f"\n评估候选:{candidate}") # 1. 获取推荐词 suggestions = xiaohongshu_api.get_recommendations(keyword=candidate) print(f" 获取到 {len(suggestions) if suggestions else 0} 个推荐词") if not suggestions: return { "candidate": candidate, "suggestions": [], "evaluations": [] } # 2. 评估每个推荐词(只做意图匹配和相关性评分) async def eval_single_sug(sug: str): eval_input = f""" <原始问题> {original_question} <待评估的推荐query> {sug} 请评估该推荐query: 1. intent_match: 意图是否匹配(true/false) 2. relevance_score: 相关性分数(0-1) 3. reason: 详细的评估理由 """ result = await Runner.run(evaluator, eval_input) evaluation: RelevanceEvaluation = result.final_output return { "query": sug, "intent_match": evaluation.intent_match, "relevance_score": evaluation.relevance_score, "reason": evaluation.reason } evaluations = await asyncio.gather(*[eval_single_sug(s) for s in suggestions]) return { "candidate": candidate, "suggestions": suggestions, "evaluations": evaluations } results = await asyncio.gather(*[evaluate_single_candidate(c) for c in candidates]) # 记录步骤 add_step(context, "评估候选query的推荐词", "query_suggestion_evaluation", { "candidate_count": len(candidates), "candidates": candidates, "results": results, "total_evaluations": sum(len(r['evaluations']) for r in results), "qualified_count": sum( 1 for r in results for e in r['evaluations'] if e['intent_match'] and e['relevance_score'] >= 0.7 ) }) return results def step_filter_qualified_queries(evaluation_results: list[dict], context: RunContext, min_relevance_score: float = 0.7) -> list[dict]: """ 步骤1.5: 筛选合格的推荐词 输入: - evaluation_results: 步骤1的评估结果 输出: - 合格的query列表,每个包含: - query: 推荐词 - from_candidate: 来源候选 - intent_match: 意图匹配 - relevance_score: 相关性分数 - reason: 评估理由 """ step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] 筛选合格的推荐词") print(f"{'='*60}") qualified_queries = [] all_queries = [] # 保存所有查询,包括不合格的 for result in evaluation_results: candidate = result["candidate"] for eval_item in result.get("evaluations", []): query_data = { "query": eval_item["query"], "from_candidate": candidate, "intent_match": eval_item["intent_match"], "relevance_score": eval_item["relevance_score"], "reason": eval_item["reason"] } # 判断是否合格 is_qualified = (eval_item['intent_match'] is True and eval_item['relevance_score'] >= min_relevance_score) query_data["is_qualified"] = is_qualified all_queries.append(query_data) if is_qualified: qualified_queries.append(query_data) # 按相关性分数降序排列 qualified_queries.sort(key=lambda x: x['relevance_score'], reverse=True) all_queries.sort(key=lambda x: x['relevance_score'], reverse=True) print(f"\n找到 {len(qualified_queries)} 个合格的推荐词 (共评估 {len(all_queries)} 个)") if qualified_queries: print(f"相关性分数范围: {qualified_queries[0]['relevance_score']:.2f} ~ {qualified_queries[-1]['relevance_score']:.2f}") print("\n合格的推荐词:") for idx, q in enumerate(qualified_queries[:5], 1): print(f" {idx}. {q['query']} (分数: {q['relevance_score']:.2f})") if len(qualified_queries) > 5: print(f" ... 还有 {len(qualified_queries) - 5} 个") # 记录步骤 - 保存所有查询数据 add_step(context, "筛选合格的推荐词", "filter_qualified_queries", { "input_evaluation_count": len(all_queries), "min_relevance_score": min_relevance_score, "qualified_count": len(qualified_queries), "qualified_queries": qualified_queries, "all_queries": all_queries # 新增:保存所有查询数据 }) return qualified_queries async def step_search_qualified_queries(qualified_queries: list[dict], context: RunContext) -> dict: """ 步骤2: 搜索合格的推荐词 输入: - qualified_queries: 步骤1.5筛选出的合格query列表,每个包含: - query: 推荐词 - from_candidate: 来源候选 - intent_match, relevance_score, reason 输出: - 搜索结果字典,包含: - searches: 每个query的搜索结果列表 - search_results_dir: 搜索结果保存目录 """ step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] 搜索 {len(qualified_queries)} 个合格的推荐词") print(f"{'='*60}") if not qualified_queries: add_step(context, "搜索合格的推荐词", "search_qualified_queries", { "qualified_count": 0, "searches": [] }) return {"searches": [], "search_results_dir": None} # 创建搜索结果保存目录 search_results_dir = os.path.join(context.log_dir, "search_results") os.makedirs(search_results_dir, exist_ok=True) xiaohongshu_search = XiaohongshuSearch() # 搜索每个合格的query async def search_single_query(query_info: dict, query_index: int): query = query_info['query'] print(f"\n搜索 [{query_index+1}/{len(qualified_queries)}]: {query}") try: # 执行搜索 search_result = xiaohongshu_search.search(keyword=query) # 解析result字段 result_str = search_result.get("result", "{}") if isinstance(result_str, str): result_data = json.loads(result_str) else: result_data = result_str # 格式化搜索结果 formatted_search_result = { "success": search_result.get("success"), "result": result_data, "tool_name": search_result.get("tool_name"), "call_type": search_result.get("call_type"), "query": query, "timestamp": datetime.now().isoformat() } # 保存到文件 safe_query_name = "".join(c if c.isalnum() or c in (' ', '_', '-') else '_' for c in query) query_dir = os.path.join(search_results_dir, f"query_{query_index+1}_{safe_query_name[:50]}") os.makedirs(query_dir, exist_ok=True) search_result_file = os.path.join(query_dir, "search_result.json") with open(search_result_file, 'w', encoding='utf-8') as f: json.dump(formatted_search_result, f, ensure_ascii=False, indent=2) # 提取帖子列表 notes = result_data.get("data", {}).get("data", []) print(f" → 搜索成功,获得 {len(notes)} 个帖子") # ⭐ 提取帖子摘要信息用于steps.json notes_summary = [process_note_data(note) for note in notes] return { "query": query, "from_candidate": query_info['from_candidate'], "intent_match": query_info['intent_match'], "relevance_score": query_info['relevance_score'], "reason": query_info['reason'], "search_result_file": search_result_file, "note_count": len(notes), "notes": notes, # 保存所有帖子用于评估 "notes_summary": notes_summary # ⭐ 保存到steps.json } except Exception as e: print(f" → 搜索失败: {e}") return { "query": query, "from_candidate": query_info['from_candidate'], "intent_match": query_info['intent_match'], "relevance_score": query_info['relevance_score'], "reason": query_info['reason'], "error": str(e), "note_count": 0, "notes": [] } search_results = await asyncio.gather(*[search_single_query(q, i) for i, q in enumerate(qualified_queries)]) # 记录步骤 add_step(context, "搜索合格的推荐词", "search_qualified_queries", { "qualified_count": len(qualified_queries), "search_results": [ { "query": sr['query'], "from_candidate": sr['from_candidate'], "note_count": sr['note_count'], "search_result_file": sr.get('search_result_file'), "has_error": 'error' in sr, "notes_summary": sr.get('notes_summary', []) # ⭐ 包含帖子摘要 } for sr in search_results ], "search_results_dir": search_results_dir }) return { "searches": search_results, "search_results_dir": search_results_dir } async def step_evaluate_search_notes(search_data: dict, original_question: str, context: RunContext) -> dict: """ 步骤3: 评估搜索到的帖子 输入: - search_data: 步骤2的搜索结果,包含: - searches: 搜索结果列表 - search_results_dir: 结果目录 输出: - 帖子评估结果字典,包含: - note_evaluations: 每个query的帖子评估列表 """ step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] 评估搜索到的帖子") print(f"{'='*60}") search_results = search_data['searches'] if not search_results: add_step(context, "评估搜索到的帖子", "evaluate_search_notes", { "query_count": 0, "total_notes": 0, "evaluated_notes": 0, "note_evaluations": [] }) return {"note_evaluations": []} # 对每个query的帖子进行评估 async def evaluate_query_notes(search_result: dict, query_index: int): query = search_result['query'] notes = search_result.get('notes', []) if not notes or 'error' in search_result: return { "query": query, "from_candidate": search_result['from_candidate'], "note_count": 0, "evaluated_notes": [], "satisfied_count": 0, "average_confidence": 0.0 } print(f"\n评估query [{query_index+1}]: {query} ({len(notes)} 个帖子)") # 评估每个帖子 note_evaluations = [] for note_idx, note in enumerate(notes, 1): # 使用标准化函数处理帖子数据 note_data = process_note_data(note) title = note_data["title"] desc = note_data["desc"] # 调用评估Agent eval_input = f""" <原始问题> {original_question} <帖子信息> 标题: {title} 描述: {desc} 请评估这个帖子能否满足用户需求。 """ eval_result_run = await Runner.run(note_evaluator, eval_input) note_eval: NoteEvaluation = eval_result_run.final_output # 合并标准化的帖子数据和评估结果 note_evaluations.append({ **note_data, # 包含所有标准化字段 "note_index": note_idx, "evaluation": { "title_relevance": note_eval.title_relevance, "content_expectation": note_eval.content_expectation, "need_satisfaction": note_eval.need_satisfaction, "confidence_score": note_eval.confidence_score, "reason": note_eval.reason } }) if note_idx % 3 == 0 or note_idx == len(notes): print(f" 已评估 {note_idx}/{len(notes)} 个帖子") # 统计 satisfied_count = sum(1 for ne in note_evaluations if ne["evaluation"]["need_satisfaction"]) avg_confidence = sum(ne["evaluation"]["confidence_score"] for ne in note_evaluations) / len(note_evaluations) if note_evaluations else 0 print(f" → 完成:{satisfied_count}/{len(note_evaluations)} 个帖子满足需求") return { "query": query, "from_candidate": search_result['from_candidate'], "note_count": len(notes), "evaluated_notes": note_evaluations, "satisfied_count": satisfied_count, "average_confidence": round(avg_confidence, 2) } # 并发评估所有query的帖子 all_evaluations = await asyncio.gather(*[evaluate_query_notes(sr, i) for i, sr in enumerate(search_results, 1)]) # 记录步骤 total_notes = sum(e['note_count'] for e in all_evaluations) total_satisfied = sum(e['satisfied_count'] for e in all_evaluations) add_step(context, "评估搜索到的帖子", "evaluate_search_notes", { "query_count": len(search_results), "total_notes": total_notes, "total_satisfied": total_satisfied, "note_evaluations": all_evaluations }) return {"note_evaluations": all_evaluations} def step_collect_satisfied_notes(note_evaluation_data: dict) -> list[dict]: """ 步骤4: 汇总所有满足需求的帖子 输入: - note_evaluation_data: 步骤3的帖子评估结果 输出: - 所有满足需求的帖子列表,按置信度降序排列 """ print(f"\n{'='*60}") print(f"汇总满足需求的帖子") print(f"{'='*60}") all_satisfied_notes = [] for query_eval in note_evaluation_data['note_evaluations']: for note in query_eval['evaluated_notes']: if note['evaluation']['need_satisfaction']: all_satisfied_notes.append({ "query": query_eval['query'], "from_candidate": query_eval['from_candidate'], "note_id": note['note_id'], "title": note['title'], "desc": note['desc'], # ⭐ 保留完整帖子信息 "image_list": note.get('image_list', []), "cover_image": note.get('cover_image', {}), "interact_info": note.get('interact_info', {}), "user": note.get('user', {}), "type": note.get('type', 'normal'), "note_url": note.get('note_url', ''), # 评估结果 "title_relevance": note['evaluation']['title_relevance'], "content_expectation": note['evaluation']['content_expectation'], "confidence_score": note['evaluation']['confidence_score'], "reason": note['evaluation']['reason'] }) # 按置信度降序排列 all_satisfied_notes.sort(key=lambda x: x['confidence_score'], reverse=True) print(f"\n共收集到 {len(all_satisfied_notes)} 个满足需求的帖子") if all_satisfied_notes: print(f"置信度范围: {all_satisfied_notes[0]['confidence_score']:.2f} ~ {all_satisfied_notes[-1]['confidence_score']:.2f}") return all_satisfied_notes async def step_generate_answer(satisfied_notes: list[dict], original_question: str, context: RunContext) -> dict: """ 步骤5: 基于满足需求的帖子生成答案 输入: - satisfied_notes: 步骤4收集的满足需求的帖子列表 - original_question: 原始问题 - context: 运行上下文 输出: - 生成的答案及相关信息 - answer: 答案内容(Markdown格式) - cited_note_indices: 引用的帖子索引 - confidence: 答案置信度 - summary: 答案摘要 - cited_notes: 被引用的帖子详情 """ step_num = len(context.steps) + 1 print(f"\n{'='*60}") print(f"[步骤 {step_num}] 基于 {len(satisfied_notes)} 个帖子生成答案") print(f"{'='*60}") if not satisfied_notes: print("\n⚠️ 没有满足需求的帖子,无法生成答案") result = { "answer": "抱歉,未找到能够回答该问题的相关内容。", "cited_note_indices": [], "confidence": 0.0, "summary": "无可用信息", "cited_notes": [] } add_step(context, "生成答案", "answer_generation", { "original_question": original_question, "input_notes_count": 0, "result": result }) return result # 构建Agent输入 notes_info = [] for idx, note in enumerate(satisfied_notes, 1): notes_info.append(f""" 【帖子 {idx}】 标题: {note['title']} 描述: {note['desc']} 置信度: {note['confidence_score']:.2f} """.strip()) agent_input = f""" <原始问题> {original_question} <相关帖子> {chr(10).join(notes_info)} 请基于以上帖子,为原始问题生成一个全面、准确的答案。 记得在答案中使用 [1], [2] 等标注引用的帖子序号。 """.strip() print(f"\n📝 调用答案生成Agent...") print(f" - 可用帖子: {len(satisfied_notes)} 个") print(f" - 平均置信度: {sum(n['confidence_score'] for n in satisfied_notes) / len(satisfied_notes):.2f}") # 调用Agent生成答案 result_run = await Runner.run(answer_generator, agent_input) answer_result: AnswerGeneration = result_run.final_output # 提取被引用的帖子详情 cited_notes = [] for idx in answer_result.cited_note_indices: if 1 <= idx <= len(satisfied_notes): note = satisfied_notes[idx - 1] cited_notes.append({ "index": idx, "note_id": note['note_id'], "title": note['title'], "desc": note['desc'], "confidence_score": note['confidence_score'], # ⭐ 完整帖子信息用于可视化 "image_list": note.get('image_list', []), "cover_image": note.get('cover_image', {}), "interact_info": note.get('interact_info', {}), "user": note.get('user', {}), "note_url": note.get('note_url', ''), "type": note.get('type', 'normal'), # ⭐ 评估详情 "title_relevance": note.get('title_relevance', 0), "content_expectation": note.get('content_expectation', 0), "reason": note.get('reason', '') }) result = { "answer": answer_result.answer, "cited_note_indices": answer_result.cited_note_indices, "confidence": answer_result.confidence, "summary": answer_result.summary, "cited_notes": cited_notes } # 打印结果 print(f"\n✅ 答案生成完成") print(f" - 引用帖子数: {len(answer_result.cited_note_indices)} 个") print(f" - 答案置信度: {answer_result.confidence:.2f}") print(f" - 答案摘要: {answer_result.summary}") # 记录步骤 add_step(context, "生成答案", "answer_generation", { "original_question": original_question, "input_notes_count": len(satisfied_notes), "result": result, "agent_input_preview": agent_input[:500] + "..." if len(agent_input) > 500 else agent_input }) return result def find_qualified_queries(evaluation_results: list[dict], min_relevance_score: float = 0.7) -> list[dict]: """ 查找所有合格的query(旧函数,保留兼容性) 筛选标准: 1. intent_match = True(必须满足) 2. relevance_score >= min_relevance_score 返回:按 relevance_score 降序排列 """ all_qualified = [] for result in evaluation_results: for eval_item in result.get("evaluations", []): if (eval_item['intent_match'] is True and eval_item['relevance_score'] >= min_relevance_score): all_qualified.append({ "from_candidate": result["candidate"], **eval_item }) # 按relevance_score降序排列 return sorted(all_qualified, key=lambda x: x['relevance_score'], reverse=True) # ============================================================================ # 主流程 # ============================================================================ async def progressive_exploration(context: RunContext, max_levels: int = 4) -> dict: """ 渐进式探索流程 - 使用独立步骤 流程: 1. 提取关键词 + 渐进式探索(复用旧流程) 2. 步骤1: 评估候选query的推荐词 3. 步骤2: 搜索合格的推荐词 4. 步骤3: 评估搜索到的帖子 5. 步骤4: 汇总满足需求的帖子 6. 步骤5: 生成答案 Args: context: 运行上下文 max_levels: 最大探索层数,默认4 返回格式: { "success": True/False, "final_answer": {...}, # 生成的答案 "satisfied_notes": [...], # 满足需求的帖子 "message": "..." } """ # ========== 阶段1:渐进式探索(复用旧流程找到候选query)========== # 1.1 提取关键词 keyword_result = await extract_keywords(context.q, context) context.keywords = keyword_result.keywords # 1.2 渐进式探索各层级 current_level = 1 candidates_to_evaluate = [] # Level 1:单个关键词 level_1_queries = context.keywords # 使用所有关键词 level_1_data = await explore_level(level_1_queries, current_level, context) analysis_1 = await analyze_level(level_1_data, context.exploration_levels, context.q, context) if analysis_1.should_evaluate_now: candidates_to_evaluate.extend(analysis_1.candidates_to_evaluate) # Level 2及以后:迭代探索 for level_num in range(2, max_levels + 1): prev_analysis: LevelAnalysis = context.level_analyses[-1]["analysis"] prev_analysis = LevelAnalysis(**prev_analysis) if not prev_analysis.next_combinations: print(f"\nLevel {level_num-1} 分析后无需继续探索") break level_data = await explore_level(prev_analysis.next_combinations, level_num, context) analysis = await analyze_level(level_data, context.exploration_levels, context.q, context) if analysis.should_evaluate_now: candidates_to_evaluate.extend(analysis.candidates_to_evaluate) if not candidates_to_evaluate: return { "success": False, "final_answer": None, "satisfied_notes": [], "message": "渐进式探索未找到候选query" } print(f"\n{'='*60}") print(f"渐进式探索完成,找到 {len(candidates_to_evaluate)} 个候选query") print(f"{'='*60}") # ========== 阶段2:新的独立步骤流程 ========== # 步骤1: 评估候选query的推荐词 evaluation_results = await step_evaluate_query_suggestions( candidates_to_evaluate, context.q, context ) # 步骤1.5: 筛选合格的推荐词 qualified_queries = step_filter_qualified_queries( evaluation_results, context, min_relevance_score=0.7 ) if not qualified_queries: return { "success": False, "final_answer": None, "satisfied_notes": [], "message": "没有合格的推荐词" } # 步骤2: 搜索合格的推荐词 search_results = await step_search_qualified_queries( qualified_queries, context ) if not search_results.get('searches'): return { "success": False, "final_answer": None, "satisfied_notes": [], "message": "搜索失败" } # 步骤3: 评估搜索到的帖子 note_evaluation_data = await step_evaluate_search_notes( search_results, context.q, context ) # 步骤4: 汇总满足需求的帖子 satisfied_notes = step_collect_satisfied_notes(note_evaluation_data) if not satisfied_notes: return { "success": False, "final_answer": None, "satisfied_notes": [], "message": "未找到满足需求的帖子" } # 步骤5: 生成答案 final_answer = await step_generate_answer( satisfied_notes, context.q, context ) # ========== 返回最终结果 ========== return { "success": True, "final_answer": final_answer, "satisfied_notes": satisfied_notes, "message": f"成功找到 {len(satisfied_notes)} 个满足需求的帖子,并生成答案" } # ============================================================================ # 输出格式化 # ============================================================================ def format_output(optimization_result: dict, context: RunContext) -> str: """ 格式化输出结果 - 用于独立步骤流程 包含: - 生成的答案 - 引用的帖子详情 - 满足需求的帖子统计 """ final_answer = optimization_result.get("final_answer") satisfied_notes = optimization_result.get("satisfied_notes", []) output = f"原始问题:{context.q}\n" output += f"提取的关键词:{', '.join(context.keywords or [])}\n" output += f"探索层数:{len(context.exploration_levels)}\n" output += f"找到满足需求的帖子:{len(satisfied_notes)} 个\n" output += "\n" + "="*60 + "\n" if final_answer: output += "【生成的答案】\n\n" output += final_answer.get("answer", "") output += "\n\n" + "="*60 + "\n" output += f"答案置信度:{final_answer.get('confidence', 0):.2f}\n" output += f"答案摘要:{final_answer.get('summary', '')}\n" output += f"引用帖子数:{len(final_answer.get('cited_note_indices', []))} 个\n" output += "\n" + "="*60 + "\n" output += "【引用的帖子详情】\n\n" for cited_note in final_answer.get("cited_notes", []): output += f"[{cited_note['index']}] {cited_note['title']}\n" output += f" 置信度: {cited_note['confidence_score']:.2f}\n" output += f" 描述: {cited_note['desc']}\n" output += f" note_id: {cited_note['note_id']}\n\n" else: output += "未能生成答案\n" return output # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_levels: int = 4, visualize: bool = False): """ 主函数 - 使用独立步骤流程(方案A) """ current_time, log_url = set_trace() # 从目录中读取固定文件名 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') q_context = read_file_as_string(input_context_file) q = read_file_as_string(input_q_file) q_with_context = f""" <需求上下文> {q_context} <当前问题> {q} """.strip() # 获取当前文件名作为版本 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志保存目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, q_with_context=q_with_context, q_context=q_context, q=q, log_dir=log_dir, log_url=log_url, ) # 执行渐进式探索 optimization_result = await progressive_exploration(run_context, max_levels=max_levels) # 格式化输出 final_output = format_output(optimization_result, run_context) print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(final_output) # 保存结果 run_context.optimization_result = optimization_result run_context.final_output = final_output # 记录最终输出步骤(新格式) final_answer = optimization_result.get("final_answer") satisfied_notes = optimization_result.get("satisfied_notes", []) add_step(run_context, "生成最终结果", "final_result", { "success": optimization_result["success"], "message": optimization_result["message"], "satisfied_notes_count": len(satisfied_notes), "final_answer": final_answer, "satisfied_notes_summary": [ { "note_id": note["note_id"], "title": note["title"], "confidence_score": note["confidence_score"] } for note in satisfied_notes # 保存所有满足条件的帖子摘要 ] if satisfied_notes else [], "final_output": final_output }) # 保存 RunContext 到 log_dir(不包含 steps,steps 单独保存) os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") context_dict = run_context.model_dump() context_dict.pop('steps', None) # 移除 steps,避免数据冗余 with open(context_file_path, "w", encoding="utf-8") as f: json.dump(context_dict, f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") # 保存步骤化日志 steps_file_path = os.path.join(run_context.log_dir, "steps.json") with open(steps_file_path, "w", encoding="utf-8") as f: json.dump(run_context.steps, f, ensure_ascii=False, indent=2) print(f"Steps log saved to: {steps_file_path}") # 如果需要生成可视化 if visualize: import subprocess output_html = os.path.join(run_context.log_dir, "visualization.html") print(f"\n🎨 生成可视化HTML...") result = subprocess.run([ "python", "sug_v6_1_2_3.visualize.py", steps_file_path, "-o", output_html ]) if result.returncode == 0: print(f"✅ 可视化已生成: {output_html}") else: print(f"❌ 可视化生成失败") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.3 独立步骤+答案生成版") parser.add_argument( "--input-dir", type=str, default="input/简单扣图", help="输入目录路径,默认: input/简单扣图" ) parser.add_argument( "--max-levels", type=int, default=10, help="最大探索层数,默认: 10" ) parser.add_argument( "--visualize", action="store_true", default=True, help="运行完成后自动生成可视化HTML(默认开启)" ) parser.add_argument( "--no-visualize", action="store_false", dest="visualize", help="关闭自动生成可视化" ) parser.add_argument( "--visualize-only", action="store_true", help="只生成可视化,不运行搜索流程。自动查找input-dir下最新的输出目录" ) args = parser.parse_args() # 如果只是生成可视化 if args.visualize_only: import subprocess import glob # 获取版本名称 version_name = os.path.splitext(os.path.basename(__file__))[0] output_base = os.path.join(args.input_dir, "output", version_name) # 查找最新的输出目录 if not os.path.exists(output_base): print(f"❌ 找不到输出目录: {output_base}") sys.exit(1) # 获取所有日期目录 date_dirs = glob.glob(os.path.join(output_base, "*", "*")) if not date_dirs: print(f"❌ 在 {output_base} 中没有找到输出目录") sys.exit(1) # 按修改时间排序,获取最新的 latest_dir = max(date_dirs, key=os.path.getmtime) steps_json = os.path.join(latest_dir, "steps.json") if not os.path.exists(steps_json): print(f"❌ 找不到 steps.json: {steps_json}") sys.exit(1) output_html = os.path.join(latest_dir, "visualization.html") print(f"🎨 找到最新输出目录: {latest_dir}") print(f"🎨 生成可视化: {steps_json} -> {output_html}") result = subprocess.run([ "python", "sug_v6_1_2_3.visualize.py", steps_json, "-o", output_html ]) sys.exit(result.returncode) asyncio.run(main(args.input_dir, max_levels=args.max_levels, visualize=args.visualize))