import asyncio import json import os import argparse from datetime import datetime from agents import Agent, Runner from lib.my_trace import set_trace from typing import Literal from pydantic import BaseModel, Field from lib.utils import read_file_as_string from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations class RunContext(BaseModel): version: str = Field(..., description="当前运行的脚本版本(文件名)") input_files: dict[str, str] = Field(..., description="输入文件路径映射") q_with_context: str q_context: str q: str log_url: str log_dir: str question_annotation: str | None = Field(default=None, description="问题的标注结果") operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史") final_output: str | None = Field(default=None, description="最终输出结果") # ============================================================================ # Agent 1: 问题标注专家 # ============================================================================ question_annotation_instructions = """ 你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。 ## 三层结构 **[本质]** - 问题的核心意图,改变后是完全不同的问题 - 如何获取、教程、推荐、作品、测评等 **[硬]** - 在本质意图下,必须满足的约束 - 地域、时间、对象、质量要求等 **[软]** - 可有可无的修饰 - 能体现、特色、快速、简单等 ## 输出格式 词语[本质-描述]、词语[硬-描述]、词语[软-描述] ## 示例 输入:如何获取能体现川西秋季特色的高质量风光摄影素材? 输出:如何获取[本质-找方法] 川西[硬-地域] 秋季[硬-季节] 高质量[硬-质量] 风光摄影素材[硬-对象] 能体现[软-修饰] 特色[软-修饰] 输入:PS抠图教程 输出:PS[硬-工具] 抠图[硬-需求] 教程[本质-学习] 输入:川西秋季风光摄影作品 输出:川西[硬-地域] 秋季[硬-季节] 风光摄影[硬-对象] 作品[本质-欣赏] ## 注意 - 只输出标注后的字符串 - 结合需求背景判断意图 """.strip() question_annotator = Agent[None]( name="问题标注专家", instructions=question_annotation_instructions, ) # ============================================================================ # Agent 2: 评估专家 # ============================================================================ eval_instructions = """ 你是搜索query评估专家。给定原始问题标注(三层)和推荐query,评估三个分数。 ## 评估目标 用这个推荐query搜索,能否找到满足原始需求的内容? ## 三层评分 ### 1. essence_score(本质/意图)= 0 或 1 推荐query的本质/意图是否与原问题一致? **原问题标注中的[本质-XXX]:** - 找方法/如何获取 → 推荐词应该是方法/获取途径 - 教程/学习 → 推荐词应该是教程/教学 - 作品/欣赏 → 推荐词应该是作品展示 - 工具/推荐 → 推荐词应该是工具推荐 **评分:** - 1 = 本质一致 - 0 = 本质改变(完全答非所问) ### 2. hard_score(硬性约束)= 0 或 1 在本质一致的前提下,是否满足所有硬性约束? **原问题标注中的[硬-XXX]:**地域、时间、对象、质量、工具等 **评分:** - 1 = 所有硬性约束都满足 - 0 = 任一硬性约束不满足 ### 3. soft_score(软性修饰)= 0-1 软性修饰词保留了多少? **评分参考:** - 1.0 = 完整保留 - 0.7-0.9 = 保留核心 - 0.4-0.6 = 部分丢失 - 0-0.3 = 大量丢失 ## 示例 **原问题标注:** 如何获取[本质-找方法] 川西[硬-地域] 秋季[硬-季节] 高质量[硬-质量] 风光摄影素材[硬-对象] 能体现[软-修饰] 特色[软-修饰] **推荐query1:** 川西秋季风光摄影素材视频 - essence_score=0(找方法→找素材本身,本质变了) - hard_score=1(地域、季节、对象都符合) - soft_score=0.5(丢失"高质量") - reason: 本质改变,用户要找获取素材的方法,推荐词是素材内容本身 **推荐query2:** 川西秋季风光摄影素材网站推荐 - essence_score=1(找方法→推荐网站,本质一致) - hard_score=1(所有硬性约束满足) - soft_score=0.8(保留核心,"高质量"未明确但推荐通常筛选过) - reason: 本质一致,硬性约束满足,软性略有丢失但可接受 ## 注意 - essence=0 直接拒绝,不管hard/soft多高 - essence=1, hard=0 也要拒绝 - essence=1, hard=1 才看soft_score """.strip() class EvaluationFeedback(BaseModel): """评估反馈模型 - 三层评分""" essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1") hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1") soft_score: float = Field(..., description="软性修饰完整度,0-1") reason: str = Field(..., description="评估理由") evaluator = Agent[None]( name="评估专家", instructions=eval_instructions, output_type=EvaluationFeedback, ) # ============================================================================ # Agent 3: 修改策略生成专家 # ============================================================================ strategy_instructions = """ 你是query修改策略专家。基于所有历史尝试,生成下一步的query修改策略。 ## 输入 - 原始问题标注(三层) - 当前query - 当前轮推荐词及其评估结果(essence_score, hard_score, soft_score, reason) - **历史尝试记录**:之前所有轮次的query、修改策略、评估结果 ## 分析维度 ### 0. 首先查看历史尝试(关键!) - 之前用过哪些query?结果如何? - 哪些操作类型已经尝试过?(简化、扩展、替换、组合) - 哪些方向是死路?(多次简化仍返回空/essence=0) - 是否有改进趋势?(从空列表→有推荐词,从essence=0→essence=1) - **避免重复无效的策略** ### 1. 检查当前轮是否有推荐词 - 如果返回空列表:query可能过于复杂或生僻 - 结合历史:之前是否也返回空列表?如果多次空列表,需要大幅调整 ### 2. 分析 essence_score - 如果全是 essence_score=0:本质方向错了 - 结合历史:之前essence情况如何?是否曾经有essence=1的结果? ### 3. 分析 hard_score(仅看 essence_score=1 的) - 如果全是 hard_score=0:查看reason,哪些硬性约束不满足? - 结合历史:哪些约束反复不满足? ### 4. 分析 soft_score(仅看 essence=1 且 hard=1 的) - 如果 soft_score < 0.7:查看reason,丢失了哪些信息? - 结合历史:soft_score的变化趋势? ## 四种操作类型 **简化**:删除冗余词汇,提取核心关键词 - 适用场景:query过于复杂,返回空列表或推荐词发散 - 注意:如果历史中已多次简化仍无效,应尝试其他操作 **扩展**:添加限定词或场景描述 - 适用场景:hard_score低,需要补充约束信息 - 注意:查看历史中哪些约束信息被丢失 **替换**:使用同义词、行业术语或口语化表达 - 适用场景:essence_score低,核心概念理解偏差 - 注意:如果历史显示简化无效,替换可能是突破口 **组合**:调整关键词顺序或组合方式 - 适用场景:结构不合理,需要调整重点 - 注意:基于历史中哪些关键词组合更有效 ## 输出要求 必须输出以下字段: - operation_type: "简化" | "扩展" | "替换" | "组合" - new_query: 修改后的新query - reason: 详细理由,必须包括: 1. **历史尝试总结**:之前尝试了哪些query和策略?结果如何? 2. **当前轮评估发现**:引用具体的分数和理由 3. **为什么这样修改**:结合历史经验说明 4. **预期改进**:基于历史趋势的预期 ## 注意 - **优先考虑历史经验,避免重复失败的策略** - 理由必须基于实际数据,不能编造 - 如果返回空列表,必须在reason中说明,并结合历史判断原因 """.strip() class ModificationStrategy(BaseModel): """修改策略模型""" operation_type: Literal["简化", "扩展", "替换", "组合"] = Field(..., description="操作类型") new_query: str = Field(..., description="修改后的新query") reason: str = Field(..., description="修改理由") strategy_generator = Agent[None]( name="策略生成专家", instructions=strategy_instructions, output_type=ModificationStrategy, ) # ============================================================================ # 核心函数 # ============================================================================ async def annotate_question(q_with_context: str) -> str: """标注问题(三层)""" print("\n正在标注问题...") result = await Runner.run(question_annotator, q_with_context) annotation = str(result.final_output) print(f"问题标注完成:{annotation}") return annotation async def get_suggestions_with_eval(query: str, annotation: str, context: RunContext) -> list[dict]: """获取推荐词并评估""" print(f"\n正在获取推荐词:{query}") # 1. 调用小红书API xiaohongshu_api = XiaohongshuSearchRecommendations() query_suggestions = xiaohongshu_api.get_recommendations(keyword=query) print(f"获取到 {len(query_suggestions) if query_suggestions else 0} 个推荐词:{query_suggestions}") if not query_suggestions: # 记录到历史 context.operations_history.append({ "operation_type": "get_query_suggestions", "timestamp": datetime.now().isoformat(), "query": query, "suggestions": [], "evaluations": "未返回任何推荐词", }) return [] # 2. 并发评估所有推荐词 async def evaluate_single_query(q_sug: str): eval_input = f""" <原始问题标注(三层)> {annotation} <待评估的推荐query> {q_sug} 请评估该推荐query的三个分数: 1. essence_score: 本质/意图是否一致(0或1) 2. hard_score: 硬性约束是否满足(0或1) 3. soft_score: 软性修饰保留程度(0-1) 4. reason: 详细的评估理由 """ evaluator_result = await Runner.run(evaluator, eval_input) result: EvaluationFeedback = evaluator_result.final_output return { "query": q_sug, "essence_score": result.essence_score, "hard_score": result.hard_score, "soft_score": result.soft_score, "reason": result.reason, } evaluations = await asyncio.gather(*[evaluate_single_query(q_sug) for q_sug in query_suggestions]) # 3. 记录到历史 context.operations_history.append({ "operation_type": "get_query_suggestions", "timestamp": datetime.now().isoformat(), "query": query, "suggestions": query_suggestions, "evaluations": evaluations, }) return evaluations async def generate_modification_strategy( current_query: str, evaluations: list[dict], annotation: str, context: RunContext ) -> ModificationStrategy: """生成修改策略""" print("\n正在生成修改策略...") # 整理历史尝试记录 - 完整保留推荐词和评估结果 history_records = [] round_num = 0 for op in context.operations_history: if op["operation_type"] == "get_query_suggestions": round_num += 1 record = { "round": round_num, "query": op["query"], "suggestions": op["suggestions"], "evaluations": op["evaluations"] } history_records.append(record) elif op["operation_type"] == "modify_query": # 修改操作也记录,但不增加轮数 history_records.append({ "operation": "modify_query", "modification_type": op["modification_type"], "original_query": op["original_query"], "new_query": op["new_query"], "reason": op["reason"] }) # 格式化历史记录为JSON history_json = json.dumps(history_records, ensure_ascii=False, indent=2) strategy_input = f""" <原始问题标注(三层)> {annotation} <历史尝试记录(完整)> {history_json} <当前query> {current_query} <当前轮推荐词评估结果> {json.dumps(evaluations, ensure_ascii=False, indent=2) if evaluations else "空列表"} 请基于所有历史尝试和当前评估结果,生成下一步的query修改策略。 重点关注: 1. **查看历史中每个推荐词的详细评估**: - 哪些推荐词的essence_score=1?它们的表述有什么特点? - 哪些推荐词的hard_score=1?它们保留了哪些约束? - 每个推荐词的reason中提到了什么问题? 2. **避免重复失败的方向**: - 如果某个方向多次返回essence=0,避免继续 - 如果某个操作类型反复无效,尝试其他类型 3. **识别有效的表述方式**: - 历史中是否有essence=1的推荐词?学习它们的表述 - 哪些关键词组合更容易被正确理解? 4. **基于趋势做决策**: - 是否有改进趋势(空列表→有推荐词,essence=0→essence=1)? - 还是陷入死胡同(多次尝试无改善)? """ result = await Runner.run(strategy_generator, strategy_input) strategy: ModificationStrategy = result.final_output return strategy def find_best_qualified_query(evaluations: list[dict], min_soft_score: float = 0.7) -> dict | None: """查找最佳合格query""" qualified = [ e for e in evaluations if e['essence_score'] == 1 and e['hard_score'] == 1 and e['soft_score'] >= min_soft_score ] if qualified: return max(qualified, key=lambda x: x['soft_score']) return None # ============================================================================ # 主流程(代码控制) # ============================================================================ async def optimize_query(context: RunContext, max_rounds: int = 20) -> dict: """ 主优化流程 - 由代码控制 Args: context: 运行上下文 max_rounds: 最大迭代轮数,默认20 返回格式: { "success": True/False, "result": {...} or None, "message": "..." } """ # 1. 标注问题(仅一次) annotation = await annotate_question(context.q_with_context) context.question_annotation = annotation # 2. 迭代优化 current_query = context.q for round_num in range(1, max_rounds + 1): print(f"\n{'='*60}") print(f"第 {round_num} 轮:{'使用原始问题' if round_num == 1 else '使用修改后的query'}") print(f"当前query: {current_query}") print(f"{'='*60}") # 获取推荐词并评估 evaluations = await get_suggestions_with_eval(current_query, annotation, context) if evaluations: # 检查是否找到合格query best = find_best_qualified_query(evaluations, min_soft_score=0.7) if best: return { "success": True, "result": best, "message": f"第{round_num}轮找到合格query" } # 如果是最后一轮,不再生成策略 if round_num == max_rounds: break # 生成修改策略 print(f"\n--- 生成修改策略 ---") strategy = await generate_modification_strategy(current_query, evaluations, annotation, context) print(f"\n修改策略:") print(f" 操作类型:{strategy.operation_type}") print(f" 原query:{current_query}") print(f" 新query:{strategy.new_query}") print(f" 理由:{strategy.reason}") # 记录修改 context.operations_history.append({ "operation_type": "modify_query", "timestamp": datetime.now().isoformat(), "modification_type": strategy.operation_type, "original_query": current_query, "new_query": strategy.new_query, "reason": strategy.reason, }) # 更新当前query current_query = strategy.new_query # 所有轮次后仍未找到,降低标准查找 print(f"\n{'='*60}") print(f"{max_rounds}轮后未找到最优query,降低标准(soft_score >= 0.5)") print(f"{'='*60}") best_acceptable = find_best_qualified_query(evaluations, min_soft_score=0.5) if best_acceptable: return { "success": True, "result": best_acceptable, "message": f"{max_rounds}轮后找到可接受query(soft_score >= 0.5)" } # 完全失败:找出最接近的 essence_hard_ok = [ e for e in evaluations if e['essence_score'] == 1 and e['hard_score'] == 1 ] if essence_hard_ok: closest = max(essence_hard_ok, key=lambda x: x['soft_score']) return { "success": False, "result": closest, "message": f"未找到合格query,最接近的soft_score={closest['soft_score']}" } return { "success": False, "result": None, "message": "未找到任何满足本质和硬性约束的推荐词" } # ============================================================================ # 输出格式化 # ============================================================================ def format_output(optimization_result: dict, context: RunContext) -> str: """格式化输出结果""" if optimization_result["success"]: result = optimization_result["result"] return f""" 原始问题:{context.q} 优化后的query:{result['query']} 本质匹配度:{result['essence_score']} (1=本质一致) 硬性约束匹配度:{result['hard_score']} (1=所有约束满足) 软性修饰完整度:{result['soft_score']} (0-1) 评估理由:{result['reason']} 状态:{optimization_result['message']} """.strip() else: output = f""" 原始问题:{context.q} 结果:未找到合格推荐query 原因:{optimization_result['message']} """ if optimization_result["result"]: result = optimization_result["result"] output += f""" 最接近的推荐词:{result['query']} - essence_score: {result['essence_score']} - hard_score: {result['hard_score']} - soft_score: {result['soft_score']} - reason: {result['reason']} """ output += "\n建议:尝试简化问题或调整需求描述" return output.strip() # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 20): current_time, log_url = set_trace() # 从目录中读取固定文件名 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') q_context = read_file_as_string(input_context_file) q = read_file_as_string(input_q_file) q_with_context = f""" <需求上下文> {q_context} <当前问题> {q} """.strip() # 获取当前文件名作为版本 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志保存目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, q_with_context=q_with_context, q_context=q_context, q=q, log_dir=log_dir, log_url=log_url, ) # 执行优化流程(代码控制) optimization_result = await optimize_query(run_context, max_rounds=max_rounds) # 格式化输出 final_output = format_output(optimization_result, run_context) print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(final_output) # 保存结果 run_context.final_output = final_output # 保存 RunContext 到 log_dir os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") with open(context_file_path, "w", encoding="utf-8") as f: json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具") parser.add_argument( "--input-dir", type=str, default="input/简单扣图", help="输入目录路径,默认: input/简单扣图" ) parser.add_argument( "--max-rounds", type=int, default=20, help="最大迭代轮数,默认: 20" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds))