sug_v2_raw_v1.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import asyncio
  2. import json
  3. import os
  4. import argparse
  5. from datetime import datetime
  6. from agents import Agent, Runner, function_tool
  7. from lib.my_trace import set_trace
  8. from typing import Literal
  9. from dataclasses import dataclass
  10. from pydantic import BaseModel, Field
  11. from lib.utils import read_file_as_string
  12. from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
  13. from agents import Agent, RunContextWrapper, Runner, function_tool
  14. from pydantic import BaseModel, Field
  15. class RunContext(BaseModel):
  16. version: str = Field(..., description="当前运行的脚本版本(文件名)")
  17. input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}")
  18. q_with_context: str
  19. q_context: str
  20. q: str
  21. log_url: str
  22. log_dir: str
  23. # 中间数据记录 - 按时间顺序记录所有操作
  24. operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query")
  25. # 最终输出结果
  26. final_output: str | None = Field(default=None, description="Agent的最终输出结果")
  27. eval_insrtuctions = """
  28. 你是一个专业的评估专家,负责评估给定的搜索query是否满足原始问题和需求,给出出评分和简明扼要的理由。
  29. """
  30. @dataclass
  31. class EvaluationFeedback:
  32. reason: str=Field(..., description="简明扼要的理由")
  33. score: float=Field(..., description="评估结果,1表示等价,0表示不等价,中间值表示部分等价")
  34. evaluator = Agent[None](
  35. name="评估专家",
  36. instructions=eval_insrtuctions,
  37. output_type=EvaluationFeedback,
  38. )
  39. @function_tool
  40. async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str):
  41. """Fetch search recommendations from Xiaohongshu."""
  42. xiaohongshu_api = XiaohongshuSearchRecommendations()
  43. query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
  44. # 记录到 RunContext
  45. wrapper.context.operations_history.append({
  46. "operation_type": "get_query_suggestions",
  47. "timestamp": datetime.now().isoformat(),
  48. "query": query,
  49. "suggestions": query_suggestions,
  50. })
  51. if query_suggestions:
  52. return query_suggestions
  53. else:
  54. res = '未返回任何推荐词'
  55. return res
  56. @function_tool
  57. def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str):
  58. """
  59. Modify the search query with a specific operation.
  60. Args:
  61. original_query: The original query before modification
  62. operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合"
  63. new_query: The modified query after applying the operation
  64. reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change
  65. Returns:
  66. A dict containing the modification record and the new query to use for next search
  67. """
  68. operation_types = ["简化", "扩展", "替换", "组合"]
  69. if operation_type not in operation_types:
  70. return {
  71. "status": "error",
  72. "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}"
  73. }
  74. modification_record = {
  75. "original_query": original_query,
  76. "operation_type": operation_type,
  77. "new_query": new_query,
  78. "reason": reason,
  79. }
  80. # 记录到 RunContext
  81. wrapper.context.operations_history.append({
  82. "operation_type": "modify_query",
  83. "timestamp": datetime.now().isoformat(),
  84. "modification_type": operation_type,
  85. "original_query": original_query,
  86. "new_query": new_query,
  87. "reason": reason,
  88. })
  89. return {
  90. "status": "success",
  91. "modification_record": modification_record,
  92. "new_query": new_query,
  93. "message": f"Query modified successfully. Use '{new_query}' for the next search."
  94. }
  95. insrtuctions = """
  96. 你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。
  97. ## 核心任务
  98. 给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到与原始问题语义等价且更符合平台用户搜索习惯的推荐query。
  99. ## 工作流程
  100. ### 1. 理解原始问题
  101. - 仔细阅读<需求上下文>和<当前问题>
  102. - 提取问题的核心需求和关键概念
  103. - 明确问题的本质意图(what)、应用场景(where)、实现方式(how)
  104. ### 2. 动态探索策略
  105. 采用类似人类搜索的迭代探索方式:
  106. **第一轮尝试:**
  107. - 使用原始问题直接调用 get_query_suggestions(query="原始问题")
  108. - 仔细分析返回的推荐词列表
  109. - 判断是否有与原始问题等价的推荐词
  110. **后续迭代:**
  111. 如果推荐词不满足要求,必须先调用 modify_query 函数记录修改,然后再次搜索:
  112. **工具使用流程:**
  113. 1. 调用 modify_query(original_query, operation_type, new_query, reason)
  114. 2. 使用返回的 new_query 调用 get_query_suggestions
  115. 3. 分析新的推荐词列表
  116. 4. 如果仍不满足,重复步骤1-3
  117. **四种操作类型(operation_type):**
  118. - **简化**:删除冗余词汇,提取核心关键词
  119. - **扩展**
  120. - **替换**:使用同义词、行业术语或口语化表达
  121. - **组合**:调整关键词顺序或组合方式
  122. **每次修改的reason必须包含:**
  123. - 上一轮推荐词给你的启发
  124. - 为什么这样修改更符合平台用户习惯
  125. - 与原始问题的关系(确保核心意图不变)
  126. ### 3. 等价性判断标准
  127. 推荐词满足以下条件即可视为"与原始问题等价":
  128. **语义等价:**
  129. - 能够回答或解决原始问题的核心需求
  130. - 涵盖原始问题的关键功能或场景
  131. ### 4. 迭代终止条件
  132. - **成功终止**:找到至少一个与原始问题等价的推荐query
  133. - **尝试上限**:最多迭代5轮,避免无限循环
  134. - **无推荐词**:推荐接口返回空列表或错误
  135. ### 5. 输出要求
  136. 成功找到等价query时,输出格式:
  137. ```
  138. 原始问题:[原问题]
  139. 优化后的query:[最终找到的等价推荐query]
  140. 探索路径:
  141. 1. 第1轮:原始query "[query1]"
  142. - 推荐词:[列出关键推荐词]
  143. - 判断:不满足,[简要说明原因]
  144. 2. 第2轮:modify_query("[query1]", "简化", "[query2]", "[reason]")
  145. - 推荐词:[列出关键推荐词]
  146. - 判断:不满足,[简要说明原因]
  147. 3. 第3轮:modify_query("[query2]", "替换", "[query3]", "[reason]")
  148. - 推荐词:[列出关键推荐词]
  149. - 判断:满足!找到等价query "[最终query]"
  150. 推荐理由:
  151. - 该query来自平台官方推荐,大概率有结果
  152. - 与原始问题语义等价:[具体说明]
  153. - 更符合用户搜索习惯:[具体说明]
  154. ```
  155. 未找到等价query时,输出:
  156. ```
  157. 原始问题:[原问题]
  158. 探索结果:未找到完全等价的推荐query(已尝试[N]轮)
  159. 尝试过的query及修改记录:
  160. 1. "[query1]" (原始)
  161. 2. "[query2]" (简化:[reason])
  162. 3. "[query3]" (替换:[reason])
  163. ...
  164. 各轮推荐词分析:
  165. - 第1轮推荐词偏向:[分析]
  166. - 第2轮推荐词偏向:[分析]
  167. ...
  168. 建议:
  169. [基于探索结果给出建议,如:
  170. - 直接使用原问题搜索
  171. - 使用尝试过的某个query(虽然不完全等价但最接近)
  172. - 调整搜索策略或平台]
  173. ```
  174. ## 注意事项
  175. - **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题")
  176. - **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions,必须先通过 modify_query 记录修改
  177. - **每次调用 get_query_suggestions 后必须仔细分析**:列出关键推荐词,分析它们的特点和偏向
  178. - **修改要有理有据**:reason参数必须详细说明基于什么推荐词反馈做出此修改
  179. - **保持核心意图不变**:每次修改都要确认与原始问题的等价性
  180. - **优先选择简洁、口语化的推荐词**:如果多个推荐词都满足,选择最符合用户习惯的
  181. """.strip()
  182. async def main(input_dir: str):
  183. current_time, log_url = set_trace()
  184. # 从目录中读取固定文件名
  185. input_context_file = os.path.join(input_dir, 'context.md')
  186. input_q_file = os.path.join(input_dir, 'q.md')
  187. q_context = read_file_as_string(input_context_file)
  188. q = read_file_as_string(input_q_file)
  189. q_with_context = f"""
  190. <需求上下文>
  191. {q_context}
  192. </需求上下文>
  193. <当前问题>
  194. {q}
  195. </当前问题>
  196. """.strip()
  197. # 获取当前文件名作为版本
  198. version = os.path.basename(__file__)
  199. version_name = os.path.splitext(version)[0] # 去掉 .py 后缀
  200. # 日志保存到输入目录的 output/版本/时间戳 目录下
  201. log_dir = os.path.join(input_dir, "output", version_name, current_time)
  202. run_context = RunContext(
  203. version=version,
  204. input_files={
  205. "input_dir": input_dir,
  206. "context_file": input_context_file,
  207. "q_file": input_q_file,
  208. },
  209. q_with_context=q_with_context,
  210. q_context=q_context,
  211. q=q,
  212. log_dir=log_dir,
  213. log_url=log_url,
  214. )
  215. agent = Agent[RunContext](
  216. name="Query Optimization Agent",
  217. instructions=insrtuctions,
  218. tools=[get_query_suggestions, modify_query],
  219. )
  220. result = await Runner.run(agent, input=q_with_context, context = run_context,)
  221. print(result.final_output)
  222. # 保存最终输出到 RunContext
  223. run_context.final_output = str(result.final_output)
  224. # 保存 RunContext 到 log_dir
  225. os.makedirs(run_context.log_dir, exist_ok=True)
  226. context_file_path = os.path.join(run_context.log_dir, "run_context.json")
  227. with open(context_file_path, "w", encoding="utf-8") as f:
  228. json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
  229. print(f"\nRunContext saved to: {context_file_path}")
  230. if __name__ == "__main__":
  231. parser = argparse.ArgumentParser(description="搜索query优化工具")
  232. parser.add_argument(
  233. "--input-dir",
  234. type=str,
  235. default="input/简单扣图",
  236. help="输入目录路径,默认: input/简单扣图"
  237. )
  238. args = parser.parse_args()
  239. asyncio.run(main(args.input_dir))