import asyncio import json import os import argparse from datetime import datetime from agents import Agent, Runner from lib.my_trace import set_trace from typing import Literal from pydantic import BaseModel, Field from lib.utils import read_file_as_string from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations class RunContext(BaseModel): version: str = Field(..., description="当前运行的脚本版本(文件名)") input_files: dict[str, str] = Field(..., description="输入文件路径映射") q_with_context: str q_context: str q: str log_url: str log_dir: str # 探索阶段记录 keywords: list[str] | None = Field(default=None, description="提取的关键词") exploration_levels: list[dict] = Field(default_factory=list, description="每一层的探索结果") level_analyses: list[dict] = Field(default_factory=list, description="每一层的主Agent分析") # 最终结果 final_candidates: list[str] | None = Field(default=None, description="最终选出的候选query") evaluation_results: list[dict] | None = Field(default=None, description="候选query的评估结果") optimization_result: dict | None = Field(default=None, description="最终优化结果对象") final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)") # ============================================================================ # Agent 1: 关键词提取专家 # ============================================================================ keyword_extraction_instructions = """ 你是关键词提取专家。给定一个搜索问题(含上下文),提取出**最细粒度的关键概念**。 ## 提取原则 1. **细粒度优先**:拆分成最小的有意义单元 - 不要保留完整的长句 - 拆分成独立的、有搜索意义的词或短语 2. **保留核心维度**: - 地域/对象 - 时间 - 行为/意图:获取、教程、推荐、如何等 - 主题/领域 - 质量/属性 3. **去掉无意义的虚词**:的、吗、呢等 4. **保留领域专有词**:不要过度拆分专业术语 - 如果是常见的组合词,保持完整 ## 输出要求 输出关键词列表,按重要性排序(最核心的在前)。 """.strip() class KeywordList(BaseModel): """关键词列表""" keywords: list[str] = Field(..., description="提取的关键词,按重要性排序") reasoning: str = Field(..., description="提取理由") keyword_extractor = Agent[None]( name="关键词提取专家", instructions=keyword_extraction_instructions, output_type=KeywordList, ) # ============================================================================ # Agent 2: 层级探索分析专家 # ============================================================================ level_analysis_instructions = """ 你是搜索空间探索分析专家。基于当前层级的探索结果,决定下一步行动。 ## 你的任务 分析当前已探索的词汇空间,判断: 1. **发现了什么有价值的信号?** 2. **是否已经可以评估候选了?** 3. **如果还不够,下一层应该探索什么组合?** ## 分析维度 ### 1. 信号识别(最重要) 看推荐词里**出现了什么主题**: **关键问题:** - 哪些推荐词**最接近原始需求**? - 哪些推荐词**揭示了有价值的方向**(即使不完全匹配)? - 哪些推荐词可以作为**下一层探索的桥梁**? - 系统对哪些概念理解得好?哪些理解偏了? ### 2. 组合策略 基于发现的信号,设计下一层探索: **组合类型:** a) **关键词直接组合** - 两个关键词组合成新query b) **利用推荐词作为桥梁**(重要!) - 发现某个推荐词很有价值 → 直接探索这个推荐词 - 或在推荐词基础上加其他关键词 c) **跨层级组合** - 结合多层发现的有价值推荐词 - 组合成更复杂的query ### 3. 停止条件 **何时可以评估候选?** 满足以下之一: - 推荐词中出现了**明确包含原始需求多个核心要素的query** - 已经探索到**足够复杂的组合**(3-4个关键词),且推荐词相关 - 探索了**3-4层**,信息已经足够丰富 **何时继续探索?** - 当前推荐词太泛,没有接近原始需求 - 发现了有价值的信号,但需要进一步组合验证 - 层数还少(< 3层) ## 输出要求 ### 1. key_findings 总结当前层发现的关键信息,包括: - 哪些推荐词最有价值? - 系统对哪些概念理解得好/不好? - 发现了什么意外的方向? ### 2. promising_signals 列出最有价值的推荐词(来自任何已探索的query),每个说明为什么有价值 ### 3. should_evaluate_now 是否已经可以开始评估候选了?true/false ### 4. candidates_to_evaluate 如果should_evaluate_now=true,列出应该评估的候选query - 可以是推荐词 - 可以是自己构造的组合 ### 5. next_combinations 如果should_evaluate_now=false,列出下一层应该探索的query组合 ### 6. reasoning 详细的推理过程 ## 重要原则 1. **不要过早评估**:至少探索2层,除非第一层就发现了完美匹配 2. **充分利用推荐词**:推荐词是系统给的提示,要善用 3. **保持探索方向的多样性**:不要只盯着一个方向 4. **识别死胡同**:如果某个方向的推荐词一直不相关,果断放弃 """.strip() class PromisingSignal(BaseModel): """有价值的推荐词信号""" query: str = Field(..., description="推荐词") from_level: int = Field(..., description="来自哪一层") reason: str = Field(..., description="为什么有价值") class LevelAnalysis(BaseModel): """层级分析结果""" key_findings: str = Field(..., description="当前层的关键发现") promising_signals: list[PromisingSignal] = Field(..., description="有价值的推荐词信号") should_evaluate_now: bool = Field(..., description="是否应该开始评估候选") candidates_to_evaluate: list[str] = Field(default_factory=list, description="如果should_evaluate_now=true,要评估的候选query列表") next_combinations: list[str] = Field(default_factory=list, description="如果should_evaluate_now=false,下一层要探索的query组合") reasoning: str = Field(..., description="详细的推理过程") level_analyzer = Agent[None]( name="层级探索分析专家", instructions=level_analysis_instructions, output_type=LevelAnalysis, ) # ============================================================================ # Agent 3: 评估专家(复用v5_3的评估逻辑) # ============================================================================ eval_instructions = """ 你是搜索query评估专家。给定原始问题和推荐query,评估三个分数。 ## 评估目标 用这个推荐query搜索,能否找到满足原始需求的内容? ## 三层评分 ### 1. essence_score(本质/意图)= 0 或 1 推荐query的本质/意图是否与原问题一致? **判断标准:** - 原问题的核心意图是什么?(找方法、找教程、找作品、找工具、找资源等) - 推荐词是否明确表达了相同的意图? **评分原则:** - 1 = 本质一致,推荐词**明确表达**相同意图 - 0 = 本质改变或**不够明确** ### 2. hard_score(硬性约束)= 0 或 1 在本质一致的前提下,是否满足所有硬性约束? **硬性约束**:地域、时间、对象、工具等客观可验证的限定 **评分:** - 1 = 所有硬性约束都满足 - 0 = 任一硬性约束不满足 ### 3. soft_score(软性修饰)= 0-1 软性修饰词(质量、特色、美观等主观评价)保留了多少? **评分参考:** - 1.0 = 完整保留 - 0.7-0.9 = 保留核心 - 0.4-0.6 = 部分丢失 - 0-0.3 = 大量丢失 ## 注意 - essence=0 直接拒绝,不管hard/soft多高 - essence=1, hard=0 也要拒绝 - essence=1, hard=1 才看soft_score """.strip() class EvaluationFeedback(BaseModel): """评估反馈模型 - 三层评分""" essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1") hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1") soft_score: float = Field(..., description="软性修饰完整度,0-1") reason: str = Field(..., description="评估理由") evaluator = Agent[None]( name="评估专家", instructions=eval_instructions, output_type=EvaluationFeedback, ) # ============================================================================ # 核心函数 # ============================================================================ async def extract_keywords(q: str) -> KeywordList: """提取关键词""" print("\n正在提取关键词...") result = await Runner.run(keyword_extractor, q) keyword_list: KeywordList = result.final_output print(f"提取的关键词:{keyword_list.keywords}") print(f"提取理由:{keyword_list.reasoning}") return keyword_list async def explore_level(queries: list[str], level_num: int, context: RunContext) -> dict: """探索一个层级(并发获取所有query的推荐词)""" print(f"\n{'='*60}") print(f"Level {level_num} 探索:{len(queries)} 个query") print(f"{'='*60}") xiaohongshu_api = XiaohongshuSearchRecommendations() # 并发获取所有推荐词 async def get_single_sug(query: str): print(f" 探索: {query}") suggestions = xiaohongshu_api.get_recommendations(keyword=query) print(f" → {len(suggestions) if suggestions else 0} 个推荐词") return { "query": query, "suggestions": suggestions or [] } results = await asyncio.gather(*[get_single_sug(q) for q in queries]) level_data = { "level": level_num, "timestamp": datetime.now().isoformat(), "queries": results } context.exploration_levels.append(level_data) return level_data async def analyze_level(level_data: dict, all_levels: list[dict], original_question: str, context: RunContext) -> LevelAnalysis: """分析当前层级,决定下一步""" print(f"\n正在分析 Level {level_data['level']}...") # 构造输入 analysis_input = f""" <原始问题> {original_question} <已探索的所有层级> {json.dumps(all_levels, ensure_ascii=False, indent=2)} <当前层级> Level {level_data['level']} {json.dumps(level_data['queries'], ensure_ascii=False, indent=2)} 请分析当前探索状态,决定下一步行动。 """ result = await Runner.run(level_analyzer, analysis_input) analysis: LevelAnalysis = result.final_output print(f"\n分析结果:") print(f" 关键发现:{analysis.key_findings}") print(f" 有价值的信号:{len(analysis.promising_signals)} 个") print(f" 是否评估:{analysis.should_evaluate_now}") if analysis.should_evaluate_now: print(f" 候选query:{analysis.candidates_to_evaluate}") else: print(f" 下一层探索:{analysis.next_combinations}") # 保存分析结果 context.level_analyses.append({ "level": level_data['level'], "timestamp": datetime.now().isoformat(), "analysis": analysis.model_dump() }) return analysis async def evaluate_candidates(candidates: list[str], original_question: str, context: RunContext) -> list[dict]: """评估候选query""" print(f"\n{'='*60}") print(f"评估 {len(candidates)} 个候选query") print(f"{'='*60}") xiaohongshu_api = XiaohongshuSearchRecommendations() async def evaluate_single_candidate(candidate: str): print(f"\n评估候选:{candidate}") # 1. 获取推荐词 suggestions = xiaohongshu_api.get_recommendations(keyword=candidate) print(f" 获取到 {len(suggestions) if suggestions else 0} 个推荐词") if not suggestions: return { "candidate": candidate, "suggestions": [], "evaluations": [] } # 2. 评估每个推荐词 async def eval_single_sug(sug: str): eval_input = f""" <原始问题> {original_question} <待评估的推荐query> {sug} 请评估该推荐query的三个分数: 1. essence_score: 本质/意图是否一致(0或1) 2. hard_score: 硬性约束是否满足(0或1) 3. soft_score: 软性修饰保留程度(0-1) 4. reason: 详细的评估理由 """ result = await Runner.run(evaluator, eval_input) evaluation: EvaluationFeedback = result.final_output return { "query": sug, "essence_score": evaluation.essence_score, "hard_score": evaluation.hard_score, "soft_score": evaluation.soft_score, "reason": evaluation.reason, } evaluations = await asyncio.gather(*[eval_single_sug(s) for s in suggestions]) return { "candidate": candidate, "suggestions": suggestions, "evaluations": evaluations } results = await asyncio.gather(*[evaluate_single_candidate(c) for c in candidates]) context.evaluation_results = results return results def find_qualified_queries(evaluation_results: list[dict], min_soft_score: float = 0.7) -> list[dict]: """查找所有合格的query""" all_qualified = [] for result in evaluation_results: for eval_item in result.get("evaluations", []): if (eval_item['essence_score'] == 1 and eval_item['hard_score'] == 1 and eval_item['soft_score'] >= min_soft_score): all_qualified.append({ "from_candidate": result["candidate"], **eval_item }) # 按soft_score降序排列 return sorted(all_qualified, key=lambda x: x['soft_score'], reverse=True) # ============================================================================ # 主流程 # ============================================================================ async def progressive_exploration(context: RunContext, max_levels: int = 4) -> dict: """ 渐进式广度探索流程 Args: context: 运行上下文 max_levels: 最大探索层数,默认4 返回格式: { "success": True/False, "results": [...], "message": "..." } """ # 阶段1:提取关键词(从原始问题提取) keyword_result = await extract_keywords(context.q) context.keywords = keyword_result.keywords # 阶段2:渐进式探索 current_level = 1 # Level 1:单个关键词 level_1_queries = context.keywords[:7] # 限制最多7个关键词 level_1_data = await explore_level(level_1_queries, current_level, context) # 分析Level 1 analysis_1 = await analyze_level(level_1_data, context.exploration_levels, context.q, context) if analysis_1.should_evaluate_now: # 直接评估 eval_results = await evaluate_candidates(analysis_1.candidates_to_evaluate, context.q, context) qualified = find_qualified_queries(eval_results, min_soft_score=0.7) if qualified: return { "success": True, "results": qualified, "message": f"Level 1 即找到 {len(qualified)} 个合格query" } # Level 2 及以后:迭代探索 for level_num in range(2, max_levels + 1): # 获取上一层的分析结果 prev_analysis: LevelAnalysis = context.level_analyses[-1]["analysis"] prev_analysis = LevelAnalysis(**prev_analysis) # 转回对象 if not prev_analysis.next_combinations: print(f"\nLevel {level_num-1} 分析后无需继续探索") break # 探索当前层 level_data = await explore_level(prev_analysis.next_combinations, level_num, context) # 分析当前层 analysis = await analyze_level(level_data, context.exploration_levels, context.q, context) if analysis.should_evaluate_now: # 评估候选 eval_results = await evaluate_candidates(analysis.candidates_to_evaluate, context.q, context) qualified = find_qualified_queries(eval_results, min_soft_score=0.7) if qualified: return { "success": True, "results": qualified, "message": f"Level {level_num} 找到 {len(qualified)} 个合格query" } # 所有层探索完,降低标准 print(f"\n{'='*60}") print(f"探索完 {max_levels} 层,降低标准(soft_score >= 0.5)") print(f"{'='*60}") if context.evaluation_results: acceptable = find_qualified_queries(context.evaluation_results, min_soft_score=0.5) if acceptable: return { "success": True, "results": acceptable, "message": f"找到 {len(acceptable)} 个可接受query(soft_score >= 0.5)" } # 完全失败 return { "success": False, "results": [], "message": "探索完所有层级,未找到合格的推荐词" } # ============================================================================ # 输出格式化 # ============================================================================ def format_output(optimization_result: dict, context: RunContext) -> str: """格式化输出结果""" results = optimization_result.get("results", []) output = f"原始问题:{context.q}\n" output += f"提取的关键词:{', '.join(context.keywords or [])}\n" output += f"探索层数:{len(context.exploration_levels)}\n" output += f"状态:{optimization_result['message']}\n\n" if optimization_result["success"] and results: output += "合格的推荐query(按soft_score降序):\n" for i, result in enumerate(results, 1): output += f"\n{i}. {result['query']}\n" output += f" - 来自候选:{result['from_candidate']}\n" output += f" - 本质匹配度:{result['essence_score']} (1=本质一致)\n" output += f" - 硬性约束匹配度:{result['hard_score']} (1=所有约束满足)\n" output += f" - 软性修饰完整度:{result['soft_score']:.2f} (0-1)\n" output += f" - 评估理由:{result['reason']}\n" else: output += "结果:未找到合格推荐query\n" if context.level_analyses: last_analysis = context.level_analyses[-1]["analysis"] output += f"\n最后一层分析:\n{last_analysis.get('key_findings', 'N/A')}\n" return output.strip() # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_levels: int = 4): current_time, log_url = set_trace() # 从目录中读取固定文件名 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') q_context = read_file_as_string(input_context_file) q = read_file_as_string(input_q_file) q_with_context = f""" <需求上下文> {q_context} <当前问题> {q} """.strip() # 获取当前文件名作为版本 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志保存目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, q_with_context=q_with_context, q_context=q_context, q=q, log_dir=log_dir, log_url=log_url, ) # 执行渐进式探索 optimization_result = await progressive_exploration(run_context, max_levels=max_levels) # 格式化输出 final_output = format_output(optimization_result, run_context) print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(final_output) # 保存结果 run_context.optimization_result = optimization_result run_context.final_output = final_output # 保存 RunContext 到 log_dir os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") with open(context_file_path, "w", encoding="utf-8") as f: json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - 渐进式广度探索版") parser.add_argument( "--input-dir", type=str, default="input/简单扣图", help="输入目录路径,默认: input/简单扣图" ) parser.add_argument( "--max-levels", type=int, default=4, help="最大探索层数,默认: 4" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_levels=args.max_levels))