|
|
@@ -0,0 +1,349 @@
|
|
|
+import asyncio
|
|
|
+import json
|
|
|
+import os
|
|
|
+import argparse
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+from agents import Agent, Runner, function_tool
|
|
|
+from lib.my_trace import set_trace
|
|
|
+from typing import Literal
|
|
|
+from dataclasses import dataclass
|
|
|
+from pydantic import BaseModel, Field
|
|
|
+
|
|
|
+
|
|
|
+from lib.utils import read_file_as_string
|
|
|
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
|
|
|
+
|
|
|
+from agents import Agent, RunContextWrapper, Runner, function_tool
|
|
|
+
|
|
|
+from pydantic import BaseModel, Field
|
|
|
+
|
|
|
+
|
|
|
+class RunContext(BaseModel):
|
|
|
+ version: str = Field(..., description="当前运行的脚本版本(文件名)")
|
|
|
+ input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}")
|
|
|
+ q_with_context: str
|
|
|
+ q_context: str
|
|
|
+ q: str
|
|
|
+ log_url: str
|
|
|
+ log_dir: str
|
|
|
+ # 中间数据记录 - 按时间顺序记录所有操作
|
|
|
+ operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query")
|
|
|
+ # 最终输出结果
|
|
|
+ final_output: str | None = Field(default=None, description="Agent的最终输出结果")
|
|
|
+
|
|
|
+
|
|
|
+eval_insrtuctions = """
|
|
|
+# 角色定义
|
|
|
+
|
|
|
+你是一个 **专业的语言专家和语义相关性评判专家**。
|
|
|
+
|
|
|
+你的任务是:判断我给你的平台sug词条,其与 <原始 query 问题> 和 <需求上下文> **共同形成的综合意图** 的相关度满足度百分比。
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+# 输入信息
|
|
|
+
|
|
|
+你将接收到以下输入:
|
|
|
+1. <需求上下文>:文档的解构结果或语义说明,用于理解 query 所针对的核心任务与场景。
|
|
|
+2. <原始 query 问题>:用户的初始查询问题。
|
|
|
+3. <平台sug词条>:模型或算法推荐的 平台sug词条,其中包含待评估的词条。
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+# 工作流程
|
|
|
+
|
|
|
+## 第一步:理解综合意图
|
|
|
+
|
|
|
+- 仔细阅读 <需求上下文> 与 <原始 query 问题>;
|
|
|
+- 明确 <原始 query 问题> 的核心意图、主题焦点和任务目标;
|
|
|
+- 提炼 <需求上下文> 所描述的核心任务、场景、概念和语义范围;
|
|
|
+- **将 <原始 query 问题> 和 <需求上下文> 融合成一个单一的、完整的“综合意图”**,作为后续评估的唯一基准。这个综合意图代表了用户在特定场景下最核心、最全面的需求。
|
|
|
+
|
|
|
+## 第二步:逐一评估 Sug 词条的综合相关度满足度
|
|
|
+
|
|
|
+对于 我给你的sug词条:
|
|
|
+
|
|
|
+1. **评估其与“综合意图”的相关度满足度:**
|
|
|
+ - 将当前 sug 词条的语义与第一步中确定的“综合意图”(<原始 query 问题> 和 <需求上下文> 的结合体)进行比较。
|
|
|
+ - 全面评估该 sug 词条在主题、目标动作、语义范围、场景匹配、核心概念覆盖等各方面与“综合意图”的匹配和满足程度。
|
|
|
+ - 给出 0-1 之间的浮点数,代表其“相关度满足度”。数值比越高,表示该 sug 词条越能充分且精准地满足“综合意图”。
|
|
|
+
|
|
|
+**评估标准一致性要求:在评估 sug 词条时,请务必保持你对“相关度满足度”的判断标准和百分比给定的逻辑完全一致和稳定。**
|
|
|
+
|
|
|
+**相关性依据要求:** 简短叙述相关性依据,说明主要匹配点或差异点,以及该 sug 词条如何满足或未能完全满足“综合意图”的哪些方面。
|
|
|
+"""
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class EvaluationFeedback:
|
|
|
+ reason: str=Field(..., description="简明扼要的理由")
|
|
|
+ score: float=Field(..., description="评估结果,1表示等价,0表示不等价,中间值表示部分等价")
|
|
|
+
|
|
|
+evaluator = Agent[None](
|
|
|
+ name="评估专家",
|
|
|
+ instructions=eval_insrtuctions,
|
|
|
+ output_type=EvaluationFeedback,
|
|
|
+)
|
|
|
+
|
|
|
+@function_tool
|
|
|
+async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str):
|
|
|
+ """Fetch search recommendations from Xiaohongshu."""
|
|
|
+ xiaohongshu_api = XiaohongshuSearchRecommendations()
|
|
|
+ query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
|
|
|
+ print(query_suggestions)
|
|
|
+ async def evaluate_single_query(q_sug: str, q_with_context: str):
|
|
|
+ """Evaluate a single query suggestion."""
|
|
|
+ eval_input = f"""
|
|
|
+{q_with_context}
|
|
|
+<待评估的推荐query>
|
|
|
+{q_sug}
|
|
|
+</待评估的推荐query>
|
|
|
+ """
|
|
|
+ evaluator_result = await Runner.run(evaluator, eval_input)
|
|
|
+ result: EvaluationFeedback = evaluator_result.final_output
|
|
|
+ return {
|
|
|
+ "query": q_sug,
|
|
|
+ "score": result.score,
|
|
|
+ "reason": result.reason,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 并发执行所有评估任务
|
|
|
+ q_with_context = wrapper.context.q_with_context
|
|
|
+ res = []
|
|
|
+ if query_suggestions:
|
|
|
+ res = await asyncio.gather(*[evaluate_single_query(q_sug, q_with_context) for q_sug in query_suggestions])
|
|
|
+ else:
|
|
|
+ res = '未返回任何推荐词'
|
|
|
+
|
|
|
+ # 记录到 RunContext
|
|
|
+ wrapper.context.operations_history.append({
|
|
|
+ "operation_type": "get_query_suggestions",
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "query": query,
|
|
|
+ "suggestions": query_suggestions,
|
|
|
+ "evaluations": res,
|
|
|
+ })
|
|
|
+
|
|
|
+ return res
|
|
|
+
|
|
|
+@function_tool
|
|
|
+def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str):
|
|
|
+ """
|
|
|
+ Modify the search query with a specific operation.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ original_query: The original query before modification
|
|
|
+ operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合"
|
|
|
+ new_query: The modified query after applying the operation
|
|
|
+ reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ A dict containing the modification record and the new query to use for next search
|
|
|
+ """
|
|
|
+ operation_types = ["简化", "扩展", "替换", "组合"]
|
|
|
+ if operation_type not in operation_types:
|
|
|
+ return {
|
|
|
+ "status": "error",
|
|
|
+ "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}"
|
|
|
+ }
|
|
|
+
|
|
|
+ modification_record = {
|
|
|
+ "original_query": original_query,
|
|
|
+ "operation_type": operation_type,
|
|
|
+ "new_query": new_query,
|
|
|
+ "reason": reason,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 记录到 RunContext
|
|
|
+ wrapper.context.operations_history.append({
|
|
|
+ "operation_type": "modify_query",
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "modification_type": operation_type,
|
|
|
+ "original_query": original_query,
|
|
|
+ "new_query": new_query,
|
|
|
+ "reason": reason,
|
|
|
+ })
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "success",
|
|
|
+ "modification_record": modification_record,
|
|
|
+ "new_query": new_query,
|
|
|
+ "message": f"Query modified successfully. Use '{new_query}' for the next search."
|
|
|
+ }
|
|
|
+
|
|
|
+insrtuctions = """
|
|
|
+你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。
|
|
|
+
|
|
|
+## 核心任务
|
|
|
+给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到与原始问题语义等价且更符合平台用户搜索习惯的推荐query。
|
|
|
+
|
|
|
+## 重要说明
|
|
|
+- **你不需要自己评估query的等价性**
|
|
|
+- get_query_suggestions 函数内部已集成评估子agent,会自动对每个推荐词进行评估
|
|
|
+- 返回结果包含:query(推荐词)、score(评分,1表示等价,0表示不等价)、reason(评估理由)
|
|
|
+- **你的职责是分析评估结果,做出决策和策略调整**
|
|
|
+
|
|
|
+## 防止幻觉 - 关键原则
|
|
|
+- **严禁编造数据**:只能基于 get_query_suggestions 实际返回的结果进行分析
|
|
|
+- **空结果处理**:如果返回的列表为空([]),必须明确说明"未返回任何推荐词"
|
|
|
+- **不要猜测**:在 modify_query 的 reason 中,不能引用不存在的推荐词或评分
|
|
|
+- **如实记录**:每次分析都要如实反映实际返回的数据
|
|
|
+
|
|
|
+## 工作流程
|
|
|
+
|
|
|
+### 1. 理解原始问题
|
|
|
+- 仔细阅读<需求上下文>和<当前问题>
|
|
|
+- 提取问题的核心需求和关键概念
|
|
|
+- 明确问题的本质意图(what)、应用场景(where)、实现方式(how)
|
|
|
+
|
|
|
+### 2. 动态探索策略
|
|
|
+
|
|
|
+**第一轮尝试:**
|
|
|
+- 使用原始问题直接调用 get_query_suggestions(query="原始问题")
|
|
|
+- **检查返回结果**:
|
|
|
+ - 如果返回空列表 []:说明"该query未返回任何推荐词",需要简化或替换query
|
|
|
+ - 如果有推荐词:查看每个推荐词的 score 和 reason
|
|
|
+- **做出判断**:是否有 score >= 0.8 的高分推荐词?
|
|
|
+
|
|
|
+**后续迭代:**
|
|
|
+如果没有高分推荐词(或返回空列表),必须先调用 modify_query 记录修改,然后再次搜索:
|
|
|
+
|
|
|
+**工具使用流程:**
|
|
|
+1. **分析评估反馈**(必须基于实际返回的数据):
|
|
|
+ - **情况A - 返回空列表**:
|
|
|
+ * 在 reason 中说明:"第X轮未返回任何推荐词,可能是query过于复杂或生僻"
|
|
|
+ * 不能编造任何推荐词或评分
|
|
|
+ - **情况B - 有推荐词但无高分**:
|
|
|
+ * 哪些推荐词得分较高?具体是多少分?评估理由是什么?
|
|
|
+ * 哪些推荐词偏离了原问题?如何偏离的?
|
|
|
+ * 推荐词整体趋势是什么?(过于泛化/具体化/领域偏移等)
|
|
|
+
|
|
|
+2. **决策修改策略**:基于实际评估反馈,调用 modify_query(original_query, operation_type, new_query, reason)
|
|
|
+ - reason 必须引用具体的数据,不能编造
|
|
|
+
|
|
|
+3. 使用返回的 new_query 调用 get_query_suggestions
|
|
|
+
|
|
|
+4. 分析新的评估结果,如果仍不满足,重复步骤1-3
|
|
|
+
|
|
|
+**四种操作类型(operation_type):**
|
|
|
+- **简化**:删除冗余词汇,提取核心关键词(当推荐词过于发散时)
|
|
|
+- **扩展**:添加限定词或场景描述(当推荐词过于泛化时)
|
|
|
+- **替换**:使用同义词、行业术语或口语化表达(当推荐词偏离核心时)
|
|
|
+- **组合**:调整关键词顺序或组合方式(当推荐词结构不合理时)
|
|
|
+
|
|
|
+**每次修改的reason必须包含:**
|
|
|
+- 上一轮评估结果的关键发现(引用具体的score和reason)
|
|
|
+- 基于评估反馈,为什么这样修改
|
|
|
+- 预期这次修改会带来什么改进
|
|
|
+
|
|
|
+### 3. 决策标准
|
|
|
+- **score >= 0.8**:认为该推荐词与原问题等价,可以作为最终结果
|
|
|
+- **0.5 <= score < 0.8**:部分等价,分析reason看是否可接受
|
|
|
+- **score < 0.5**:不等价,需要继续优化
|
|
|
+
|
|
|
+### 4. 迭代终止条件
|
|
|
+- **成功终止**:找到至少一个 score >= 0.8 的推荐query
|
|
|
+- **尝试上限**:最多迭代5轮,避免无限循环
|
|
|
+- **无推荐词**:推荐接口返回空列表或错误
|
|
|
+
|
|
|
+### 5. 输出要求
|
|
|
+
|
|
|
+**成功找到等价query时,输出格式:**
|
|
|
+```
|
|
|
+原始问题:[原问题]
|
|
|
+优化后的query:[最终找到的等价推荐query]
|
|
|
+评分:[score]
|
|
|
+```
|
|
|
+
|
|
|
+**未找到等价query时,输出格式:**
|
|
|
+```
|
|
|
+原始问题:[原问题]
|
|
|
+结果:未找到完全等价的推荐query
|
|
|
+建议:[简要建议,如:直接使用原问题搜索 或 使用最接近的推荐词]
|
|
|
+```
|
|
|
+
|
|
|
+## 注意事项
|
|
|
+- **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题")
|
|
|
+- **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions
|
|
|
+- **重点关注评估结果**:每次都要仔细分析返回的 score 和 reason
|
|
|
+- **基于数据决策**:修改策略必须基于评估反馈,不能凭空猜测
|
|
|
+- **引用具体评分**:在分析和决策时,引用具体的score数值和reason内容
|
|
|
+- **优先选择高分推荐词**:score >= 0.8 即可认为等价
|
|
|
+- **严禁编造数据**:
|
|
|
+ * 如果返回空列表,必须在 reason 中明确说明"未返回任何推荐词"
|
|
|
+ * 不能引用不存在的推荐词、评分或评估理由
|
|
|
+ * 每次 modify_query 的 reason 必须基于上一轮实际返回的结果
|
|
|
+""".strip()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+async def main(input_dir: str):
|
|
|
+ current_time, log_url = set_trace()
|
|
|
+
|
|
|
+ # 从目录中读取固定文件名
|
|
|
+ input_context_file = os.path.join(input_dir, 'context.md')
|
|
|
+ input_q_file = os.path.join(input_dir, 'q.md')
|
|
|
+
|
|
|
+ q_context = read_file_as_string(input_context_file)
|
|
|
+ q = read_file_as_string(input_q_file)
|
|
|
+ q_with_context = f"""
|
|
|
+<需求上下文>
|
|
|
+{q_context}
|
|
|
+</需求上下文>
|
|
|
+<当前问题>
|
|
|
+{q}
|
|
|
+</当前问题>
|
|
|
+""".strip()
|
|
|
+
|
|
|
+ # 获取当前文件名作为版本
|
|
|
+ version = os.path.basename(__file__)
|
|
|
+ version_name = os.path.splitext(version)[0] # 去掉 .py 后缀
|
|
|
+
|
|
|
+ # 日志保存到输入目录的 output/版本/时间戳 目录下
|
|
|
+ log_dir = os.path.join(input_dir, "output", version_name, current_time)
|
|
|
+
|
|
|
+ run_context = RunContext(
|
|
|
+ version=version,
|
|
|
+ input_files={
|
|
|
+ "input_dir": input_dir,
|
|
|
+ "context_file": input_context_file,
|
|
|
+ "q_file": input_q_file,
|
|
|
+ },
|
|
|
+ q_with_context=q_with_context,
|
|
|
+ q_context=q_context,
|
|
|
+ q=q,
|
|
|
+ log_dir=log_dir,
|
|
|
+ log_url=log_url,
|
|
|
+ )
|
|
|
+ agent = Agent[RunContext](
|
|
|
+ name="Query Optimization Agent",
|
|
|
+ instructions=insrtuctions,
|
|
|
+ tools=[get_query_suggestions, modify_query],
|
|
|
+
|
|
|
+ )
|
|
|
+ result = await Runner.run(agent, input=q_with_context, context = run_context,)
|
|
|
+ print(result.final_output)
|
|
|
+
|
|
|
+ # 保存最终输出到 RunContext
|
|
|
+ run_context.final_output = str(result.final_output)
|
|
|
+
|
|
|
+ # 保存 RunContext 到 log_dir
|
|
|
+ os.makedirs(run_context.log_dir, exist_ok=True)
|
|
|
+ context_file_path = os.path.join(run_context.log_dir, "run_context.json")
|
|
|
+ with open(context_file_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
|
|
|
+ print(f"\nRunContext saved to: {context_file_path}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ parser = argparse.ArgumentParser(description="搜索query优化工具")
|
|
|
+ parser.add_argument(
|
|
|
+ "--input-dir",
|
|
|
+ type=str,
|
|
|
+ default="input/简单扣图",
|
|
|
+ help="输入目录路径,默认: input/简单扣图"
|
|
|
+ )
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ asyncio.run(main(args.input_dir))
|