import asyncio import json import os import argparse from datetime import datetime from agents import Agent, Runner from lib.my_trace import set_trace from typing import Literal from pydantic import BaseModel, Field from lib.utils import read_file_as_string from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations class RunContext(BaseModel): version: str = Field(..., description="当前运行的脚本版本(文件名)") input_files: dict[str, str] = Field(..., description="输入文件路径映射") q_with_context: str q_context: str q: str log_url: str log_dir: str question_annotation: str | None = Field(default=None, description="问题的标注结果") operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史") final_output: str | None = Field(default=None, description="最终输出结果") # ============================================================================ # Agent 1: 问题标注专家 # ============================================================================ question_annotation_instructions = """ 你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。 ## 判断标准 **[本质]** - 问题的核心意图 - 如何获取、教程、推荐、作品、测评等 **[硬]** - 客观事实性约束(可明确验证、非主观判断) - 能明确区分类别的:地域、时间、对象、工具、操作类型 - 特征:改变后得到完全不同类别的结果 **[软]** - 主观判断性修饰(因人而异、程度性的) - 需要主观评价的:质量、速度、美观、特色、程度 - 特征:改变后仍是同类结果,只是满足程度不同 ## 输出格式 词语[本质-描述]、词语[硬-描述]、词语[软-描述] ## 注意 - 只输出标注后的字符串 - 结合需求背景判断意图 """.strip() question_annotator = Agent[None]( name="问题标注专家", instructions=question_annotation_instructions, ) # ============================================================================ # Agent 2: 评估专家 # ============================================================================ eval_instructions = """ 你是搜索query评估专家。给定原始问题标注(三层)和推荐query,评估三个分数。 ## 评估目标 用这个推荐query搜索,能否找到满足原始需求的内容? ## 三层评分 ### 1. essence_score(本质/意图)= 0 或 1 推荐query的本质/意图是否与原问题一致? **原问题标注中的[本质-XXX]对应推荐词要求:** - 找方法/如何获取 → 推荐词**必须明确包含方法/途径类词汇** - ✅ "...网站推荐"、"如何获取..."、"...获取途径"、"...方法" - ❌ "...下载"、"...素材"(直接找内容,不是找方法) - 教程/学习 → 推荐词应该是教程/教学 - 作品/欣赏 → 推荐词应该是作品展示 - 工具/推荐 → 推荐词应该是工具推荐 **评分原则:** - 1 = 本质一致,推荐词**明确表达**相同意图 - 0 = 本质改变或**不够明确**(宁可严格,不可放松) ### 2. hard_score(硬性约束)= 0 或 1 在本质一致的前提下,是否满足所有硬性约束? **原问题标注中的[硬-XXX]:**地域、时间、对象、质量、工具等 **评分:** - 1 = 所有硬性约束都满足 - 0 = 任一硬性约束不满足 ### 3. soft_score(软性修饰)= 0-1 软性修饰词保留了多少? **评分参考:** - 1.0 = 完整保留 - 0.7-0.9 = 保留核心 - 0.4-0.6 = 部分丢失 - 0-0.3 = 大量丢失 ## 注意 - essence=0 直接拒绝,不管hard/soft多高 - essence=1, hard=0 也要拒绝 - essence=1, hard=1 才看soft_score """.strip() class EvaluationFeedback(BaseModel): """评估反馈模型 - 三层评分""" essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1") hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1") soft_score: float = Field(..., description="软性修饰完整度,0-1") reason: str = Field(..., description="评估理由") evaluator = Agent[None]( name="评估专家", instructions=eval_instructions, output_type=EvaluationFeedback, ) # ============================================================================ # Agent 3: 修改策略生成专家 # ============================================================================ strategy_instructions = """ 你是query修改策略专家。**模拟人在搜索引擎中的真实搜索行为**,基于反馈动态调整query。 ## 核心思路:搜索是探索过程,不是直达过程 **关键认知:** 1. **中间query不需要满足原始需求** - 它是探索工具,可以偏离原需求 2. **推荐词是最宝贵的反馈信号** - 告诉你系统理解成什么了,有什么内容 3. **每一步query都有明确的探索目的** - 不是盲目改词,而是试探和引导 4. **最终目标:找到满足需求的推荐词** - 不是让query本身满足需求 ## 人的真实搜索过程 **搜索的本质**:通过多步探索,利用推荐词作为桥梁,逐步引导系统 **典型模式**: 第1步:直接尝试 - 目的:看系统能否直接理解 - 结果:空列表或essence=0 - essence=0的推荐词:告诉你系统理解成什么了 第2步:降低要求,简化query - 目的:让系统有响应,看它在基础层面有什么 - 推荐词虽然essence=0,但揭示了系统在某个主题有内容 - **关键**:选一个最有潜力的推荐词 第3步:基于推荐词,往目标方向引导 - 目的:利用推荐词作为桥梁,加上目标方向的词 - 推荐词还是essence=0,但主题在变化(接近目标) - **渐进式**:不求一步到位,每步都有进展 第4步:继续引导或换角度 - 如果推荐词主题不变 → 换角度 - 如果推荐词主题在接近 → 继续引导 最终:找到essence=1的推荐词 **关键原则**: 1. essence_score是评估推荐词的,不是评估中间query的 2. essence=0的推荐词也有价值,它揭示了系统的理解方向 3. 每一步都有明确的探索目的,看目的是否达成 4. 通过推荐词的主题变化,判断是否在接近目标 ## 输入信息 - 原始问题标注(三层):本质、硬性约束、软性修饰 - 历史尝试记录:所有轮次的query、推荐词、评估结果 - 当前query和推荐词评估 ## 分析步骤 ### 第一步:理解当前推荐词的信号 **核心问题:推荐词告诉我什么信息?** **重要提醒:essence_score是评估推荐词是否满足原始需求的最终目标** - essence_score=1: 推荐词满足原需求的本质 - essence_score=0: 推荐词不满足原需求的本质 - **但中间query的目的可能不是满足原需求**,所以essence_score只是参考 1. **系统理解层面**(看推荐词的主题): - 空列表 → 系统完全不理解当前query - 有推荐词 → 系统理解成了什么主题? - 旅游?教程?素材?工具?品种介绍? - 这些主题是否有助于往目标方向引导? 2. **内容可用性层面**(看推荐词的价值): - **即使推荐词essence=0,也可能是很好的探索起点** - 例如:推荐词"川西旅游攻略"虽然essence=0,但揭示了系统认识"川西" - 哪些推荐词最有潜力作为下一步的桥梁? 3. **探索目的验证**: - 当前query的探索目的是什么?达到了吗? - 例如:目的是"看系统对川西有什么" → 达到了(有推荐词) - 下一步要验证/探索什么? ### 第二步:回顾历史,识别规律 **什么是真正的死胡同?** - ❌ 不是essence=0就是死胡同 - ✅ 死胡同:连续多次尝试,**推荐词的主题完全不变** - 例如:连续3轮都是"取名/品种介绍" - 例如:连续3轮都是"旅游攻略" **什么是有进展?** - ✅ 推荐词的主题在变化(旅游→摄影→作品→素材) - ✅ 即使essence=0,也说明在接近目标 - ✅ 说明系统的理解在被引导 **避免同义词打转**: - ❌ "素材"→"照片"→"图片"→"资源" - 这些同义词不会改变系统理解,纯粹浪费轮次 - ✅ 应该从推荐词中选一个作为桥梁,或换完全不同的角度 ### 第三步:选择策略类型(带着明确的探索目的) **refine_current(微调当前query)** - 适用:推荐词方向对了,需要微调让它更精确 - 探索目的:在正确方向上精细化 - 动作:加词/减词/换词/调整顺序 **use_recommendation(选推荐词作为新起点)** ⭐ 最重要策略 - 适用:推荐词虽然essence=0,但**揭示了系统在这个方向有内容** - 探索目的:利用推荐词这个客观信号,引导系统往目标方向 - **核心思维**:推荐词是系统给你的提示,告诉你"我有这个" - 动作: - 选一个最有潜力的推荐词作为base_query - 在它基础上加目标方向的词 - **这个新query可能不满足原需求,但目的是探索和引导** **change_approach(换完全不同的角度)** - 适用:当前方向是死路(多次尝试推荐词主题不变) - 探索目的:跳出当前框架,从另一个角度切入 - 动作:换一种完全不同的表述方式 **relax_constraints(放宽约束)** - 适用:query太复杂,系统不理解(返回空列表) - 探索目的:先让系统有响应,看它在最基础层面有什么 - 动作:去掉限定词,保留核心概念 ## 输出要求 ### 1. reasoning(推理过程) 必须包含三部分,**核心是目标导向**: - **当前位置评估**(相对于最终目标): - 当前推荐词反映了系统在什么主题空间? - 离原始需求多远?(完全偏离/部分相关/已经接近) - **不要只看essence_score**: - essence_score=0不代表没用,可能是必经之路 - 关键看推荐词能否帮助靠近目标 - **历史进展判断**: - 推荐词主题是否在变化?(变化=进展,不变=死胡同) - 在靠近目标,还是原地打转? - 哪些尝试让系统理解发生了改变? - **下一步探索规划**: - **这一步的作用**(必须明确!): * 离目标远?→ 先让系统理解某个关键词/概念 * 部分相关?→ 选推荐词作为桥梁,引导往目标方向 * 已经接近?→ 微调细节,精确匹配 - 为什么选这个base_query? - 这个修改如何让我们靠近目标? - **重要**:中间query不需要满足原需求,只要能靠近目标 ### 2. strategy_type 从4种策略中选择:refine_current, use_recommendation, change_approach, relax_constraints ### 3. base_query **关键**:可以选择历史中的query,也可以选择历史推荐词 - 如果选历史query:base_query_source = "history_query" - 如果选历史推荐词:base_query_source = "history_recommendation" ### 4. base_query_source 说明base_query的来源 ### 5. modification_action **重要:一次只做一个核心动作** - 不要列举多个动作 - 只描述最核心的那一个修改 - 例如:"选择推荐词'川西旅游'作为新起点" - 例如:"去掉'如何获取'改为直接搜内容" - 例如:"加上'AI生成'转向生成方向" ### 6. new_query 最终的新query ## 重要原则 1. **推荐词是最宝贵的反馈** - 充分利用推荐词这个客观信号 - 即使essence=0的推荐词,也揭示了系统在这个方向有什么 - **优先考虑use_recommendation策略** - 选一个推荐词作为起点 2. **中间query可以偏离原需求** - 每一步都有明确的探索目的 - 不要纠结"这个query不满足原需求" - 关键是:这个query能不能帮你往正确方向引导系统 3. **识别死胡同,及时换方向** - 如果多次尝试推荐词主题不变 → 换方向 - 如果推荐词越来越偏 → 回退到之前的某个好的起点 4. **保持推理简洁** - 抓住关键信息 - 明确说出探索目的 - 不要重复啰嗦 """.strip() class ModificationStrategy(BaseModel): """修改策略模型 - 模拟人的搜索调整过程""" reasoning: str = Field(..., description="推理过程:1)当前推荐词分析:系统理解成什么了?2)历史尝试总结:哪些方向有效/无效?3)下一步策略:为什么这样调整?") strategy_type: Literal[ "refine_current", # 微调当前query(加词/减词/换词/换顺序) "use_recommendation", # 选择推荐词作为新起点,在它基础上修改 "change_approach", # 换完全不同的表述角度 "relax_constraints" # 放宽约束,去掉部分限定词 ] = Field(..., description="策略类型") base_query: str = Field(..., description="基础query,可以是:1)历史中的query 2)历史推荐词中的某一个") base_query_source: Literal["history_query", "history_recommendation"] = Field(..., description="base_query的来源") modification_action: str = Field(..., description="核心修改动作(只一个),如:'选择推荐词作为新起点' 或 '去掉方法类词改为直接搜内容' 或 '加上AI生成转向生成方向'") new_query: str = Field(..., description="修改后的新query") strategy_generator = Agent[None]( name="策略生成专家", instructions=strategy_instructions, output_type=ModificationStrategy, ) # ============================================================================ # 核心函数 # ============================================================================ async def annotate_question(q_with_context: str) -> str: """标注问题(三层)""" print("\n正在标注问题...") result = await Runner.run(question_annotator, q_with_context) annotation = str(result.final_output) print(f"问题标注完成:{annotation}") return annotation async def get_suggestions_with_eval(query: str, annotation: str, context: RunContext) -> list[dict]: """获取推荐词并评估""" print(f"\n正在获取推荐词:{query}") # 1. 调用小红书API xiaohongshu_api = XiaohongshuSearchRecommendations() query_suggestions = xiaohongshu_api.get_recommendations(keyword=query) print(f"获取到 {len(query_suggestions) if query_suggestions else 0} 个推荐词:{query_suggestions}") if not query_suggestions: # 记录到历史 context.operations_history.append({ "operation_type": "get_query_suggestions", "timestamp": datetime.now().isoformat(), "query": query, "suggestions": [], "evaluations": "未返回任何推荐词", }) return [] # 2. 并发评估所有推荐词 async def evaluate_single_query(q_sug: str): eval_input = f""" <需求背景> {context.q_context if context.q_context else "无"} <原始问题> {context.q} <原始问题标注(三层)> {annotation} <待评估的推荐query> {q_sug} 请评估该推荐query的三个分数: 1. essence_score: 本质/意图是否一致(0或1) 2. hard_score: 硬性约束是否满足(0或1) 3. soft_score: 软性修饰保留程度(0-1) 4. reason: 详细的评估理由 """ evaluator_result = await Runner.run(evaluator, eval_input) result: EvaluationFeedback = evaluator_result.final_output return { "query": q_sug, "essence_score": result.essence_score, "hard_score": result.hard_score, "soft_score": result.soft_score, "reason": result.reason, } evaluations = await asyncio.gather(*[evaluate_single_query(q_sug) for q_sug in query_suggestions]) # 3. 记录到历史 context.operations_history.append({ "operation_type": "get_query_suggestions", "timestamp": datetime.now().isoformat(), "query": query, "suggestions": query_suggestions, "evaluations": evaluations, }) return evaluations async def generate_modification_strategy( annotation: str, context: RunContext ) -> ModificationStrategy: """生成修改策略""" print("\n正在生成修改策略...") # 整理历史尝试记录 - 完整保留推荐词和评估结果 history_records = [] round_num = 0 for op in context.operations_history: if op["operation_type"] == "get_query_suggestions": round_num += 1 record = { "round": round_num, "query": op["query"], "suggestions": op["suggestions"], "evaluations": op["evaluations"] } history_records.append(record) elif op["operation_type"] == "modify_query": # 修改操作也记录,但不增加轮数 history_records.append({ "operation": "modify_query", "strategy_type": op.get("strategy_type", op.get("modification_type")), # 兼容旧字段 "base_query": op.get("base_query"), "base_query_source": op.get("base_query_source"), "modification_action": op.get("modification_action", op.get("modification_actions", [])), # 兼容旧版本 "original_query": op["original_query"], "new_query": op["new_query"], "reasoning": op["reasoning"] }) # 格式化历史记录为JSON history_json = json.dumps(history_records, ensure_ascii=False, indent=2) strategy_input = f""" <需求背景> {context.q_context if context.q_context else "无"} <原始问题> {context.q} <原始问题标注(三层)> {annotation} <历史尝试记录(完整)> {history_json} 请基于所有历史尝试,生成下一步的query修改策略。 **说明**:历史记录中最后一条就是当前轮的query和推荐词评估结果。 **核心思路**:每一步都要明确 "当前在哪 → 离目标多远 → 下一步做什么能靠近目标" 重点分析: 1. **评估当前位置**(相对于最终目标): - 当前推荐词反映了什么?系统在哪个主题空间? - 离原始需求的距离:完全偏离?部分相关?已经接近? - **不要只看essence_score**:essence=0不代表没用,可能是通往目标的必经之路 2. **判断历史进展**: - 推荐词主题是否在变化?(变化=有进展,不变=死胡同) - 是在靠近目标,还是在原地打转? - 哪个方向的query让系统理解发生了改变? 3. **规划下一步探索**: - **这一步query的作用是什么**?(必须明确!) * 如果离目标很远:需要先让系统理解某个关键词/概念 * 如果部分相关:选一个推荐词作为桥梁,在它基础上引导 * 如果已经接近:微调细节,精确匹配需求 - **记住**:中间query不需要满足原需求,只要能让我们往目标靠近 """ result = await Runner.run(strategy_generator, strategy_input) strategy: ModificationStrategy = result.final_output return strategy def find_qualified_queries(evaluations: list[dict], min_soft_score: float = 0.7) -> list[dict]: """查找所有合格的query,按soft_score降序排列""" qualified = [ e for e in evaluations if e['essence_score'] == 1 and e['hard_score'] == 1 and e['soft_score'] >= min_soft_score ] # 按soft_score降序排列 return sorted(qualified, key=lambda x: x['soft_score'], reverse=True) # ============================================================================ # 主流程(代码控制) # ============================================================================ async def optimize_query(context: RunContext, max_rounds: int = 20) -> dict: """ 主优化流程 - 由代码控制 Args: context: 运行上下文 max_rounds: 最大迭代轮数,默认20 返回格式: { "success": True/False, "result": {...} or None, "message": "..." } """ # 1. 标注问题(仅一次) annotation = await annotate_question(context.q_with_context) context.question_annotation = annotation # 2. 迭代优化 current_query = context.q for round_num in range(1, max_rounds + 1): print(f"\n{'='*60}") print(f"第 {round_num} 轮:{'使用原始问题' if round_num == 1 else '使用修改后的query'}") print(f"当前query: {current_query}") print(f"{'='*60}") # 获取推荐词并评估 evaluations = await get_suggestions_with_eval(current_query, annotation, context) if evaluations: # 检查是否找到合格query qualified_queries = find_qualified_queries(evaluations, min_soft_score=0.7) if qualified_queries: return { "success": True, "results": qualified_queries, "message": f"第{round_num}轮找到{len(qualified_queries)}个合格query" } # 如果是最后一轮,不再生成策略 if round_num == max_rounds: break # 生成修改策略 print(f"\n--- 生成修改策略 ---") strategy = await generate_modification_strategy(annotation, context) print(f"\n修改策略:") print(f" 推理过程:{strategy.reasoning}") print(f" 策略类型:{strategy.strategy_type}") print(f" 基础query:{strategy.base_query} (来源: {strategy.base_query_source})") print(f" 修改动作:{strategy.modification_action}") print(f" 新query:{strategy.new_query}") # 记录修改 context.operations_history.append({ "operation_type": "modify_query", "timestamp": datetime.now().isoformat(), "reasoning": strategy.reasoning, "strategy_type": strategy.strategy_type, "base_query": strategy.base_query, "base_query_source": strategy.base_query_source, "modification_action": strategy.modification_action, "original_query": current_query, "new_query": strategy.new_query, }) # 更新当前query current_query = strategy.new_query # 所有轮次后仍未找到,从所有历史评估中降低标准查找 print(f"\n{'='*60}") print(f"{max_rounds}轮后未找到最优query,降低标准(soft_score >= 0.5)") print(f"{'='*60}") # 收集所有历史轮次的评估结果 all_evaluations = [] for op in context.operations_history: if op["operation_type"] == "get_query_suggestions" and op["evaluations"]: all_evaluations.extend(op["evaluations"]) if not all_evaluations: return { "success": False, "results": [], "message": "所有轮次均未返回推荐词" } # 降级查找:soft_score >= 0.5 acceptable_queries = find_qualified_queries(all_evaluations, min_soft_score=0.5) if acceptable_queries: return { "success": True, "results": acceptable_queries, "message": f"{max_rounds}轮后找到{len(acceptable_queries)}个可接受query(soft_score >= 0.5)" } # 完全失败:找出最接近的(essence=1, hard=1) essence_hard_ok = [ e for e in all_evaluations if e['essence_score'] == 1 and e['hard_score'] == 1 ] if essence_hard_ok: # 返回所有满足essence和hard的,按soft_score降序 closest_queries = sorted(essence_hard_ok, key=lambda x: x['soft_score'], reverse=True) return { "success": False, "results": closest_queries, "message": f"未找到合格query,但有{len(closest_queries)}个接近的推荐词(essence=1, hard=1)" } return { "success": False, "results": [], "message": "未找到任何满足本质和硬性约束的推荐词" } # ============================================================================ # 输出格式化 # ============================================================================ def format_output(optimization_result: dict, context: RunContext) -> str: """格式化输出结果""" results = optimization_result.get("results", []) if optimization_result["success"] and results: output = f"原始问题:{context.q}\n" output += f"状态:{optimization_result['message']}\n\n" output += "合格的推荐query(按soft_score降序):\n" for i, result in enumerate(results, 1): output += f"\n{i}. {result['query']}\n" output += f" - 本质匹配度:{result['essence_score']} (1=本质一致)\n" output += f" - 硬性约束匹配度:{result['hard_score']} (1=所有约束满足)\n" output += f" - 软性修饰完整度:{result['soft_score']:.2f} (0-1)\n" output += f" - 评估理由:{result['reason']}\n" return output.strip() else: output = f"原始问题:{context.q}\n" output += f"结果:未找到合格推荐query\n" output += f"原因:{optimization_result['message']}\n" if results: output += "\n最接近的推荐词(按soft_score降序):\n" for i, result in enumerate(results[:3], 1): # 只显示前3个 output += f"\n{i}. {result['query']}\n" output += f" - essence_score: {result['essence_score']}\n" output += f" - hard_score: {result['hard_score']}\n" output += f" - soft_score: {result['soft_score']:.2f}\n" output += f" - reason: {result['reason']}\n" output += "\n建议:尝试简化问题或调整需求描述" return output.strip() # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 20): current_time, log_url = set_trace() # 从目录中读取固定文件名 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') q_context = read_file_as_string(input_context_file) q = read_file_as_string(input_q_file) q_with_context = f""" <需求上下文> {q_context} <当前问题> {q} """.strip() # 获取当前文件名作为版本 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志保存目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, q_with_context=q_with_context, q_context=q_context, q=q, log_dir=log_dir, log_url=log_url, ) # 执行优化流程(代码控制) optimization_result = await optimize_query(run_context, max_rounds=max_rounds) # 格式化输出 final_output = format_output(optimization_result, run_context) print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(final_output) # 保存结果 run_context.final_output = final_output # 保存 RunContext 到 log_dir os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") with open(context_file_path, "w", encoding="utf-8") as f: json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具") parser.add_argument( "--input-dir", type=str, default="input/简单扣图", help="输入目录路径,默认: input/简单扣图" ) parser.add_argument( "--max-rounds", type=int, default=20, help="最大迭代轮数,默认: 20" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds))