import asyncio import json import os import argparse from datetime import datetime from agents import Agent, Runner, function_tool from lib.my_trace import set_trace from typing import Literal from dataclasses import dataclass from pydantic import BaseModel, Field from lib.utils import read_file_as_string from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from agents import Agent, RunContextWrapper, Runner, function_tool from pydantic import BaseModel, Field class RunContext(BaseModel): version: str = Field(..., description="当前运行的脚本版本(文件名)") input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}") q_with_context: str q_context: str q: str log_url: str log_dir: str # 中间数据记录 - 按时间顺序记录所有操作 operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query") # 最终输出结果 final_output: str | None = Field(default=None, description="Agent的最终输出结果") eval_insrtuctions = """ 你是一个专业的评估专家,负责评估给定的搜索query是否满足原始问题和需求,给出出评分和简明扼要的理由。 """ @dataclass class EvaluationFeedback: reason: str=Field(..., description="简明扼要的理由") score: float=Field(..., description="评估结果,1表示等价,0表示不等价,中间值表示部分等价") evaluator = Agent[None]( name="评估专家", instructions=eval_insrtuctions, output_type=EvaluationFeedback, ) @function_tool async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str): """Fetch search recommendations from Xiaohongshu.""" xiaohongshu_api = XiaohongshuSearchRecommendations() query_suggestions = xiaohongshu_api.get_recommendations(keyword=query) # 记录到 RunContext wrapper.context.operations_history.append({ "operation_type": "get_query_suggestions", "timestamp": datetime.now().isoformat(), "query": query, "suggestions": query_suggestions, }) if query_suggestions: return query_suggestions else: res = '未返回任何推荐词' return res @function_tool def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str): """ Modify the search query with a specific operation. Args: original_query: The original query before modification operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合" new_query: The modified query after applying the operation reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change Returns: A dict containing the modification record and the new query to use for next search """ operation_types = ["简化", "扩展", "替换", "组合"] if operation_type not in operation_types: return { "status": "error", "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}" } modification_record = { "original_query": original_query, "operation_type": operation_type, "new_query": new_query, "reason": reason, } # 记录到 RunContext wrapper.context.operations_history.append({ "operation_type": "modify_query", "timestamp": datetime.now().isoformat(), "modification_type": operation_type, "original_query": original_query, "new_query": new_query, "reason": reason, }) return { "status": "success", "modification_record": modification_record, "new_query": new_query, "message": f"Query modified successfully. Use '{new_query}' for the next search." } insrtuctions = """ 你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。 ## 核心任务 给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到与原始问题语义等价且更符合平台用户搜索习惯的推荐query。 ## 工作流程 ### 1. 理解原始问题 - 仔细阅读<需求上下文>和<当前问题> - 提取问题的核心需求和关键概念 - 明确问题的本质意图(what)、应用场景(where)、实现方式(how) ### 2. 动态探索策略 采用类似人类搜索的迭代探索方式: **第一轮尝试:** - 使用原始问题直接调用 get_query_suggestions(query="原始问题") - 仔细分析返回的推荐词列表 - 判断是否有与原始问题等价的推荐词 **后续迭代:** 如果推荐词不满足要求,必须先调用 modify_query 函数记录修改,然后再次搜索: **工具使用流程:** 1. 调用 modify_query(original_query, operation_type, new_query, reason) 2. 使用返回的 new_query 调用 get_query_suggestions 3. 分析新的推荐词列表 4. 如果仍不满足,重复步骤1-3 **四种操作类型(operation_type):** - **简化**:删除冗余词汇,提取核心关键词 - **扩展** - **替换**:使用同义词、行业术语或口语化表达 - **组合**:调整关键词顺序或组合方式 **每次修改的reason必须包含:** - 上一轮推荐词给你的启发 - 为什么这样修改更符合平台用户习惯 - 与原始问题的关系(确保核心意图不变) ### 3. 等价性判断标准 推荐词满足以下条件即可视为"与原始问题等价": **语义等价:** - 能够回答或解决原始问题的核心需求 - 涵盖原始问题的关键功能或场景 ### 4. 迭代终止条件 - **成功终止**:找到至少一个与原始问题等价的推荐query - **尝试上限**:最多迭代5轮,避免无限循环 - **无推荐词**:推荐接口返回空列表或错误 ### 5. 输出要求 成功找到等价query时,输出格式: ``` 原始问题:[原问题] 优化后的query:[最终找到的等价推荐query] 探索路径: 1. 第1轮:原始query "[query1]" - 推荐词:[列出关键推荐词] - 判断:不满足,[简要说明原因] 2. 第2轮:modify_query("[query1]", "简化", "[query2]", "[reason]") - 推荐词:[列出关键推荐词] - 判断:不满足,[简要说明原因] 3. 第3轮:modify_query("[query2]", "替换", "[query3]", "[reason]") - 推荐词:[列出关键推荐词] - 判断:满足!找到等价query "[最终query]" 推荐理由: - 该query来自平台官方推荐,大概率有结果 - 与原始问题语义等价:[具体说明] - 更符合用户搜索习惯:[具体说明] ``` 未找到等价query时,输出: ``` 原始问题:[原问题] 探索结果:未找到完全等价的推荐query(已尝试[N]轮) 尝试过的query及修改记录: 1. "[query1]" (原始) 2. "[query2]" (简化:[reason]) 3. "[query3]" (替换:[reason]) ... 各轮推荐词分析: - 第1轮推荐词偏向:[分析] - 第2轮推荐词偏向:[分析] ... 建议: [基于探索结果给出建议,如: - 直接使用原问题搜索 - 使用尝试过的某个query(虽然不完全等价但最接近) - 调整搜索策略或平台] ``` ## 注意事项 - **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题") - **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions,必须先通过 modify_query 记录修改 - **每次调用 get_query_suggestions 后必须仔细分析**:列出关键推荐词,分析它们的特点和偏向 - **修改要有理有据**:reason参数必须详细说明基于什么推荐词反馈做出此修改 - **保持核心意图不变**:每次修改都要确认与原始问题的等价性 - **优先选择简洁、口语化的推荐词**:如果多个推荐词都满足,选择最符合用户习惯的 """.strip() async def main(input_dir: str): current_time, log_url = set_trace() # 从目录中读取固定文件名 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') q_context = read_file_as_string(input_context_file) q = read_file_as_string(input_q_file) q_with_context = f""" <需求上下文> {q_context} <当前问题> {q} """.strip() # 获取当前文件名作为版本 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 去掉 .py 后缀 # 日志保存到输入目录的 output/版本/时间戳 目录下 log_dir = os.path.join(input_dir, "output", version_name, current_time) run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, q_with_context=q_with_context, q_context=q_context, q=q, log_dir=log_dir, log_url=log_url, ) agent = Agent[RunContext]( name="Query Optimization Agent", instructions=insrtuctions, tools=[get_query_suggestions, modify_query], ) result = await Runner.run(agent, input=q_with_context, context = run_context,) print(result.final_output) # 保存最终输出到 RunContext run_context.final_output = str(result.final_output) # 保存 RunContext 到 log_dir os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") with open(context_file_path, "w", encoding="utf-8") as f: json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具") parser.add_argument( "--input-dir", type=str, default="input/简单扣图", help="输入目录路径,默认: input/简单扣图" ) args = parser.parse_args() asyncio.run(main(args.input_dir))