import asyncio import json import os import argparse from datetime import datetime from agents import Agent, Runner, function_tool from lib.my_trace import set_trace from typing import Literal from dataclasses import dataclass from pydantic import BaseModel, Field from lib.utils import read_file_as_string from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from agents import Agent, RunContextWrapper, Runner, function_tool from pydantic import BaseModel, Field class RunContext(BaseModel): version: str = Field(..., description="当前运行的脚本版本(文件名)") input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}") q_with_context: str q_context: str q: str log_url: str log_dir: str # 轮数计数器 current_round: int = Field(default=0, description="当前迭代轮数") max_rounds: int = Field(default=100, description="最大迭代轮数") # 中间数据记录 - 按时间顺序记录所有操作 operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query") # 最终输出结果 final_output: str | None = Field(default=None, description="Agent的最终输出结果") eval_insrtuctions = """ 你是一个专业的评估专家,负责评估给定的搜索query是否满足原始问题和需求,给出出评分和简明扼要的理由。 """ @dataclass class EvaluationFeedback: reason: str=Field(..., description="简明扼要的理由") score: float=Field(..., description="评估结果,1表示等价,0表示不等价,中间值表示部分等价") evaluator = Agent[None]( name="评估专家", instructions=eval_insrtuctions, output_type=EvaluationFeedback, ) @function_tool async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str): """Fetch search recommendations from Xiaohongshu.""" # 递增轮数 wrapper.context.current_round += 1 current_round = wrapper.context.current_round max_rounds = wrapper.context.max_rounds xiaohongshu_api = XiaohongshuSearchRecommendations() query_suggestions = xiaohongshu_api.get_recommendations(keyword=query) print(query_suggestions) async def evaluate_single_query(q_sug: str, q_with_context: str): """Evaluate a single query suggestion.""" eval_input = f""" {q_with_context} <待评估的推荐query> {q_sug} """ evaluator_result = await Runner.run(evaluator, eval_input) result: EvaluationFeedback = evaluator_result.final_output return { "query": q_sug, "score": result.score, "reason": result.reason, } # 并发执行所有评估任务 q_with_context = wrapper.context.q_with_context evaluations = [] if query_suggestions: evaluations = await asyncio.gather(*[evaluate_single_query(q_sug, q_with_context) for q_sug in query_suggestions]) else: evaluations = '未返回任何推荐词' # 判断是否有满足要求的推荐词(score >= 0.8) has_qualified = False best_score = 0.0 best_query = None if isinstance(evaluations, list) and len(evaluations) > 0: for eval_item in evaluations: if isinstance(eval_item, dict) and eval_item.get('score', 0) >= 0.8: has_qualified = True if eval_item.get('score', 0) > best_score: best_score = eval_item['score'] best_query = eval_item['query'] # 决定是否需要继续优化 should_continue = not has_qualified and current_round < max_rounds # 构建明确的指示信息 if has_qualified: action = f"找到满足要求的推荐词!最佳推荐:'{best_query}'(评分:{best_score}),可以结束优化。" elif current_round >= max_rounds: action = "已达到最大轮数限制,必须停止优化。" else: action = "未找到满足要求的推荐词(score >= 0.8),请继续优化。" # 记录到 RunContext wrapper.context.operations_history.append({ "operation_type": "get_query_suggestions", "timestamp": datetime.now().isoformat(), "round": current_round, "query": query, "suggestions": query_suggestions, "evaluations": evaluations, "has_qualified": has_qualified, "should_continue": should_continue, }) # 返回结果,包含轮数信息和明确的行动指示 return { "current_round": current_round, "max_rounds": max_rounds, "query": query, "evaluations": evaluations, "has_qualified": has_qualified, "should_continue": should_continue, "best_result": {"query": best_query, "score": best_score} if has_qualified else None, "action": action, "message": f"第 {current_round}/{max_rounds} 轮搜索完成。{action}" } @function_tool def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str): """ Modify the search query with a specific operation. Args: original_query: The original query before modification operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合" new_query: The modified query after applying the operation reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change Returns: A dict containing the modification record and the new query to use for next search """ operation_types = ["简化", "扩展", "替换", "组合"] if operation_type not in operation_types: return { "status": "error", "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}" } modification_record = { "original_query": original_query, "operation_type": operation_type, "new_query": new_query, "reason": reason, } # 记录到 RunContext wrapper.context.operations_history.append({ "operation_type": "modify_query", "timestamp": datetime.now().isoformat(), "modification_type": operation_type, "original_query": original_query, "new_query": new_query, "reason": reason, }) return { "status": "success", "modification_record": modification_record, "new_query": new_query, "message": f"Query modified successfully. Use '{new_query}' for the next search." } insrtuctions = """ 你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。 ## 核心任务 给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到与原始问题语义等价且更符合平台用户搜索习惯的推荐query。 ## 重要说明 - **你不需要自己评估query的等价性,也不需要自己判断是否继续优化** - get_query_suggestions 函数内部已集成评估子agent,会自动完成评估和判断 - **返回结果结构**: - current_round: 当前迭代轮数 - max_rounds: 最大允许轮数(100轮) - query: 本次搜索使用的query - evaluations: 评估结果列表(仅供参考) - **has_qualified**: 是否找到满足要求的推荐词(score >= 0.8) - **should_continue**: 是否应该继续优化(true表示继续,false表示停止) - **best_result**: 如果找到满足要求的推荐词,这里会显示最佳结果 - **action**: 明确的行动指示,告诉你接下来该做什么 - message: 完整的提示信息 - **你的职责是遵循函数返回的指示**: - 如果 should_continue = true,分析evaluations并调用 modify_query 继续优化 - 如果 should_continue = false 且 has_qualified = true,输出找到的最佳推荐词 - 如果 should_continue = false 且 has_qualified = false,说明已达到最大轮数但未找到满意结果 ## 防止幻觉 - 关键原则 - **严禁编造数据**:只能基于 get_query_suggestions 实际返回的结果进行分析 - **空结果处理**:如果返回的列表为空([]),必须明确说明"未返回任何推荐词" - **不要猜测**:在 modify_query 的 reason 中,不能引用不存在的推荐词或评分 - **如实记录**:每次分析都要如实反映实际返回的数据 ## 工作流程 ### 1. 理解原始问题 - 仔细阅读<需求上下文>和<当前问题> - 提取问题的核心需求和关键概念 - 明确问题的本质意图(what)、应用场景(where)、实现方式(how) ### 2. 动态探索策略 **第一轮尝试:** - 使用原始问题直接调用 get_query_suggestions(query="原始问题") - **查看返回结果的关键字段**: - **action**: 明确告诉你接下来该做什么 - **should_continue**: 是否需要继续优化 - **has_qualified**: 是否已找到满足要求的推荐词 - **best_result**: 如果找到了,这里会显示最佳推荐词和评分 - **遵循指示行动**:直接按照 action 字段的指示执行 **后续迭代:** 如果 should_continue = true,必须先调用 modify_query 记录修改,然后再次调用 get_query_suggestions: **工具使用流程:** 1. **分析评估反馈**(必须基于实际返回的数据): - **情况A - 返回空列表**: * 在 reason 中说明:"第X轮未返回任何推荐词,可能是query过于复杂或生僻" * 不能编造任何推荐词或评分 - **情况B - 有推荐词但无高分**: * 哪些推荐词得分较高?具体是多少分?评估理由是什么? * 哪些推荐词偏离了原问题?如何偏离的? * 推荐词整体趋势是什么?(过于泛化/具体化/领域偏移等) 2. **决策修改策略**:基于实际评估反馈,调用 modify_query(original_query, operation_type, new_query, reason) - reason 必须引用具体的数据,不能编造 3. 使用返回的 new_query 调用 get_query_suggestions 4. 分析新的评估结果,如果仍不满足,重复步骤1-3 **四种操作类型(operation_type):** - **简化**:删除冗余词汇,提取核心关键词(当推荐词过于发散时) - **扩展**:添加限定词或场景描述(当推荐词过于泛化时) - **替换**:使用同义词、行业术语或口语化表达(当推荐词偏离核心时) - **组合**:调整关键词顺序或组合方式(当推荐词结构不合理时) **每次修改的reason必须包含:** - 上一轮评估结果的关键发现(引用具体的score和reason) - 基于评估反馈,为什么这样修改 - 预期这次修改会带来什么改进 ### 3. 决策标准(由函数自动判断,你只需遵循) - 函数内部已自动按照 score >= 0.8 的标准判断是否找到满意的推荐词 - 你只需要查看 has_qualified 和 should_continue 字段即可 - 不需要自己遍历 evaluations 来判断分数 ### 4. 迭代终止条件(由函数自动判断) - 函数会自动判断何时应该停止或继续 - **should_continue = false** 时必须停止: - 情况1: has_qualified = true(找到了满意的推荐词) - 情况2: current_round >= max_rounds(达到最大轮数) - **should_continue = true** 时应该继续优化 ### 5. 输出要求 **成功找到等价query时,输出格式:** ``` 原始问题:[原问题] 优化后的query:[最终找到的等价推荐query] 评分:[score] ``` **未找到等价query时,输出格式:** ``` 原始问题:[原问题] 结果:未找到完全等价的推荐query 建议:[简要建议,如:直接使用原问题搜索 或 使用最接近的推荐词] ``` ## 注意事项 - **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题") - **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions - **遵循函数返回的指示**: - 重点关注 **should_continue** 和 **action** 字段 - 不需要自己判断是否继续优化,函数已经帮你判断好了 - 如果 should_continue = true,就继续优化;如果 false,就停止 - **分析 evaluations 用于优化策略**:虽然不需要自己判断是否满足要求,但需要分析 evaluations 来决定如何修改query - **基于数据决策**:修改策略必须基于评估反馈,不能凭空猜测 - **引用具体评分**:在 modify_query 的 reason 中,引用具体的score数值和reason内容 - **坚持优化**:只要 should_continue = true,就应该继续尝试不同策略 - **严禁编造数据**: * 如果返回空列表,必须在 reason 中明确说明"未返回任何推荐词" * 不能引用不存在的推荐词、评分或评估理由 * 每次 modify_query 的 reason 必须基于上一轮实际返回的结果 """.strip() async def main(input_dir: str): current_time, log_url = set_trace() # 从目录中读取固定文件名 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') q_context = read_file_as_string(input_context_file) q = read_file_as_string(input_q_file) q_with_context = f""" <需求上下文> {q_context} <当前问题> {q} """.strip() # 获取当前文件名作为版本 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 去掉 .py 后缀 # 日志保存到输入目录的 output/版本/时间戳 目录下 log_dir = os.path.join(input_dir, "output", version_name, current_time) run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, q_with_context=q_with_context, q_context=q_context, q=q, log_dir=log_dir, log_url=log_url, ) agent = Agent[RunContext]( name="Query Optimization Agent", instructions=insrtuctions, tools=[get_query_suggestions, modify_query], ) result = await Runner.run(agent, input=q_with_context, context = run_context, max_turns=200) print(result.final_output) # 保存最终输出到 RunContext run_context.final_output = str(result.final_output) # 保存 RunContext 到 log_dir os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") with open(context_file_path, "w", encoding="utf-8") as f: json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具") parser.add_argument( "--input-dir", type=str, default="input/简单扣图", help="输入目录路径,默认: input/简单扣图" ) args = parser.parse_args() asyncio.run(main(args.input_dir))