sug_v3.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. import asyncio
  2. import json
  3. import os
  4. import argparse
  5. from datetime import datetime
  6. from agents import Agent, Runner, function_tool
  7. from lib.my_trace import set_trace
  8. from typing import Literal
  9. from dataclasses import dataclass
  10. from pydantic import BaseModel, Field
  11. from lib.utils import read_file_as_string
  12. from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
  13. from agents import Agent, RunContextWrapper, Runner, function_tool
  14. from pydantic import BaseModel, Field
  15. class RunContext(BaseModel):
  16. version: str = Field(..., description="当前运行的脚本版本(文件名)")
  17. input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}")
  18. q_with_context: str
  19. q_context: str
  20. q: str
  21. log_url: str
  22. log_dir: str
  23. # 轮数计数器
  24. current_round: int = Field(default=0, description="当前迭代轮数")
  25. max_rounds: int = Field(default=100, description="最大迭代轮数")
  26. # 中间数据记录 - 按时间顺序记录所有操作
  27. operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query")
  28. # 最终输出结果
  29. final_output: str | None = Field(default=None, description="Agent的最终输出结果")
  30. eval_insrtuctions = """
  31. 你是一个专业的评估专家,负责评估给定的搜索query是否满足原始问题和需求,给出出评分和简明扼要的理由。
  32. """
  33. @dataclass
  34. class EvaluationFeedback:
  35. reason: str=Field(..., description="简明扼要的理由")
  36. score: float=Field(..., description="评估结果,1表示等价,0表示不等价,中间值表示部分等价")
  37. evaluator = Agent[None](
  38. name="评估专家",
  39. instructions=eval_insrtuctions,
  40. output_type=EvaluationFeedback,
  41. )
  42. @function_tool
  43. async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str):
  44. """Fetch search recommendations from Xiaohongshu."""
  45. # 递增轮数
  46. wrapper.context.current_round += 1
  47. current_round = wrapper.context.current_round
  48. max_rounds = wrapper.context.max_rounds
  49. xiaohongshu_api = XiaohongshuSearchRecommendations()
  50. query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
  51. print(query_suggestions)
  52. async def evaluate_single_query(q_sug: str, q_with_context: str):
  53. """Evaluate a single query suggestion."""
  54. eval_input = f"""
  55. {q_with_context}
  56. <待评估的推荐query>
  57. {q_sug}
  58. </待评估的推荐query>
  59. """
  60. evaluator_result = await Runner.run(evaluator, eval_input)
  61. result: EvaluationFeedback = evaluator_result.final_output
  62. return {
  63. "query": q_sug,
  64. "score": result.score,
  65. "reason": result.reason,
  66. }
  67. # 并发执行所有评估任务
  68. q_with_context = wrapper.context.q_with_context
  69. evaluations = []
  70. if query_suggestions:
  71. evaluations = await asyncio.gather(*[evaluate_single_query(q_sug, q_with_context) for q_sug in query_suggestions])
  72. else:
  73. evaluations = '未返回任何推荐词'
  74. # 判断是否有满足要求的推荐词(score >= 0.8)
  75. has_qualified = False
  76. best_score = 0.0
  77. best_query = None
  78. if isinstance(evaluations, list) and len(evaluations) > 0:
  79. for eval_item in evaluations:
  80. if isinstance(eval_item, dict) and eval_item.get('score', 0) >= 0.8:
  81. has_qualified = True
  82. if eval_item.get('score', 0) > best_score:
  83. best_score = eval_item['score']
  84. best_query = eval_item['query']
  85. # 决定是否需要继续优化
  86. should_continue = not has_qualified and current_round < max_rounds
  87. # 构建明确的指示信息
  88. if has_qualified:
  89. action = f"找到满足要求的推荐词!最佳推荐:'{best_query}'(评分:{best_score}),可以结束优化。"
  90. elif current_round >= max_rounds:
  91. action = "已达到最大轮数限制,必须停止优化。"
  92. else:
  93. action = "未找到满足要求的推荐词(score >= 0.8),请继续优化。"
  94. # 记录到 RunContext
  95. wrapper.context.operations_history.append({
  96. "operation_type": "get_query_suggestions",
  97. "timestamp": datetime.now().isoformat(),
  98. "round": current_round,
  99. "query": query,
  100. "suggestions": query_suggestions,
  101. "evaluations": evaluations,
  102. "has_qualified": has_qualified,
  103. "should_continue": should_continue,
  104. })
  105. # 返回结果,包含轮数信息和明确的行动指示
  106. return {
  107. "current_round": current_round,
  108. "max_rounds": max_rounds,
  109. "query": query,
  110. "evaluations": evaluations,
  111. "has_qualified": has_qualified,
  112. "should_continue": should_continue,
  113. "best_result": {"query": best_query, "score": best_score} if has_qualified else None,
  114. "action": action,
  115. "message": f"第 {current_round}/{max_rounds} 轮搜索完成。{action}"
  116. }
  117. @function_tool
  118. def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str):
  119. """
  120. Modify the search query with a specific operation.
  121. Args:
  122. original_query: The original query before modification
  123. operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合"
  124. new_query: The modified query after applying the operation
  125. reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change
  126. Returns:
  127. A dict containing the modification record and the new query to use for next search
  128. """
  129. operation_types = ["简化", "扩展", "替换", "组合"]
  130. if operation_type not in operation_types:
  131. return {
  132. "status": "error",
  133. "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}"
  134. }
  135. modification_record = {
  136. "original_query": original_query,
  137. "operation_type": operation_type,
  138. "new_query": new_query,
  139. "reason": reason,
  140. }
  141. # 记录到 RunContext
  142. wrapper.context.operations_history.append({
  143. "operation_type": "modify_query",
  144. "timestamp": datetime.now().isoformat(),
  145. "modification_type": operation_type,
  146. "original_query": original_query,
  147. "new_query": new_query,
  148. "reason": reason,
  149. })
  150. return {
  151. "status": "success",
  152. "modification_record": modification_record,
  153. "new_query": new_query,
  154. "message": f"Query modified successfully. Use '{new_query}' for the next search."
  155. }
  156. insrtuctions = """
  157. 你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。
  158. ## 核心任务
  159. 给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到与原始问题语义等价且更符合平台用户搜索习惯的推荐query。
  160. ## 重要说明
  161. - **你不需要自己评估query的等价性,也不需要自己判断是否继续优化**
  162. - get_query_suggestions 函数内部已集成评估子agent,会自动完成评估和判断
  163. - **返回结果结构**:
  164. - current_round: 当前迭代轮数
  165. - max_rounds: 最大允许轮数(100轮)
  166. - query: 本次搜索使用的query
  167. - evaluations: 评估结果列表(仅供参考)
  168. - **has_qualified**: 是否找到满足要求的推荐词(score >= 0.8)
  169. - **should_continue**: 是否应该继续优化(true表示继续,false表示停止)
  170. - **best_result**: 如果找到满足要求的推荐词,这里会显示最佳结果
  171. - **action**: 明确的行动指示,告诉你接下来该做什么
  172. - message: 完整的提示信息
  173. - **你的职责是遵循函数返回的指示**:
  174. - 如果 should_continue = true,分析evaluations并调用 modify_query 继续优化
  175. - 如果 should_continue = false 且 has_qualified = true,输出找到的最佳推荐词
  176. - 如果 should_continue = false 且 has_qualified = false,说明已达到最大轮数但未找到满意结果
  177. ## 防止幻觉 - 关键原则
  178. - **严禁编造数据**:只能基于 get_query_suggestions 实际返回的结果进行分析
  179. - **空结果处理**:如果返回的列表为空([]),必须明确说明"未返回任何推荐词"
  180. - **不要猜测**:在 modify_query 的 reason 中,不能引用不存在的推荐词或评分
  181. - **如实记录**:每次分析都要如实反映实际返回的数据
  182. ## 工作流程
  183. ### 1. 理解原始问题
  184. - 仔细阅读<需求上下文>和<当前问题>
  185. - 提取问题的核心需求和关键概念
  186. - 明确问题的本质意图(what)、应用场景(where)、实现方式(how)
  187. ### 2. 动态探索策略
  188. **第一轮尝试:**
  189. - 使用原始问题直接调用 get_query_suggestions(query="原始问题")
  190. - **查看返回结果的关键字段**:
  191. - **action**: 明确告诉你接下来该做什么
  192. - **should_continue**: 是否需要继续优化
  193. - **has_qualified**: 是否已找到满足要求的推荐词
  194. - **best_result**: 如果找到了,这里会显示最佳推荐词和评分
  195. - **遵循指示行动**:直接按照 action 字段的指示执行
  196. **后续迭代:**
  197. 如果 should_continue = true,必须先调用 modify_query 记录修改,然后再次调用 get_query_suggestions:
  198. **工具使用流程:**
  199. 1. **分析评估反馈**(必须基于实际返回的数据):
  200. - **情况A - 返回空列表**:
  201. * 在 reason 中说明:"第X轮未返回任何推荐词,可能是query过于复杂或生僻"
  202. * 不能编造任何推荐词或评分
  203. - **情况B - 有推荐词但无高分**:
  204. * 哪些推荐词得分较高?具体是多少分?评估理由是什么?
  205. * 哪些推荐词偏离了原问题?如何偏离的?
  206. * 推荐词整体趋势是什么?(过于泛化/具体化/领域偏移等)
  207. 2. **决策修改策略**:基于实际评估反馈,调用 modify_query(original_query, operation_type, new_query, reason)
  208. - reason 必须引用具体的数据,不能编造
  209. 3. 使用返回的 new_query 调用 get_query_suggestions
  210. 4. 分析新的评估结果,如果仍不满足,重复步骤1-3
  211. **四种操作类型(operation_type):**
  212. - **简化**:删除冗余词汇,提取核心关键词(当推荐词过于发散时)
  213. - **扩展**:添加限定词或场景描述(当推荐词过于泛化时)
  214. - **替换**:使用同义词、行业术语或口语化表达(当推荐词偏离核心时)
  215. - **组合**:调整关键词顺序或组合方式(当推荐词结构不合理时)
  216. **每次修改的reason必须包含:**
  217. - 上一轮评估结果的关键发现(引用具体的score和reason)
  218. - 基于评估反馈,为什么这样修改
  219. - 预期这次修改会带来什么改进
  220. ### 3. 决策标准(由函数自动判断,你只需遵循)
  221. - 函数内部已自动按照 score >= 0.8 的标准判断是否找到满意的推荐词
  222. - 你只需要查看 has_qualified 和 should_continue 字段即可
  223. - 不需要自己遍历 evaluations 来判断分数
  224. ### 4. 迭代终止条件(由函数自动判断)
  225. - 函数会自动判断何时应该停止或继续
  226. - **should_continue = false** 时必须停止:
  227. - 情况1: has_qualified = true(找到了满意的推荐词)
  228. - 情况2: current_round >= max_rounds(达到最大轮数)
  229. - **should_continue = true** 时应该继续优化
  230. ### 5. 输出要求
  231. **成功找到等价query时,输出格式:**
  232. ```
  233. 原始问题:[原问题]
  234. 优化后的query:[最终找到的等价推荐query]
  235. 评分:[score]
  236. ```
  237. **未找到等价query时,输出格式:**
  238. ```
  239. 原始问题:[原问题]
  240. 结果:未找到完全等价的推荐query
  241. 建议:[简要建议,如:直接使用原问题搜索 或 使用最接近的推荐词]
  242. ```
  243. ## 注意事项
  244. - **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题")
  245. - **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions
  246. - **遵循函数返回的指示**:
  247. - 重点关注 **should_continue** 和 **action** 字段
  248. - 不需要自己判断是否继续优化,函数已经帮你判断好了
  249. - 如果 should_continue = true,就继续优化;如果 false,就停止
  250. - **分析 evaluations 用于优化策略**:虽然不需要自己判断是否满足要求,但需要分析 evaluations 来决定如何修改query
  251. - **基于数据决策**:修改策略必须基于评估反馈,不能凭空猜测
  252. - **引用具体评分**:在 modify_query 的 reason 中,引用具体的score数值和reason内容
  253. - **坚持优化**:只要 should_continue = true,就应该继续尝试不同策略
  254. - **严禁编造数据**:
  255. * 如果返回空列表,必须在 reason 中明确说明"未返回任何推荐词"
  256. * 不能引用不存在的推荐词、评分或评估理由
  257. * 每次 modify_query 的 reason 必须基于上一轮实际返回的结果
  258. """.strip()
  259. async def main(input_dir: str):
  260. current_time, log_url = set_trace()
  261. # 从目录中读取固定文件名
  262. input_context_file = os.path.join(input_dir, 'context.md')
  263. input_q_file = os.path.join(input_dir, 'q.md')
  264. q_context = read_file_as_string(input_context_file)
  265. q = read_file_as_string(input_q_file)
  266. q_with_context = f"""
  267. <需求上下文>
  268. {q_context}
  269. </需求上下文>
  270. <当前问题>
  271. {q}
  272. </当前问题>
  273. """.strip()
  274. # 获取当前文件名作为版本
  275. version = os.path.basename(__file__)
  276. version_name = os.path.splitext(version)[0] # 去掉 .py 后缀
  277. # 日志保存到输入目录的 output/版本/时间戳 目录下
  278. log_dir = os.path.join(input_dir, "output", version_name, current_time)
  279. run_context = RunContext(
  280. version=version,
  281. input_files={
  282. "input_dir": input_dir,
  283. "context_file": input_context_file,
  284. "q_file": input_q_file,
  285. },
  286. q_with_context=q_with_context,
  287. q_context=q_context,
  288. q=q,
  289. log_dir=log_dir,
  290. log_url=log_url,
  291. )
  292. agent = Agent[RunContext](
  293. name="Query Optimization Agent",
  294. instructions=insrtuctions,
  295. tools=[get_query_suggestions, modify_query],
  296. )
  297. result = await Runner.run(agent, input=q_with_context, context = run_context, max_turns=200)
  298. print(result.final_output)
  299. # 保存最终输出到 RunContext
  300. run_context.final_output = str(result.final_output)
  301. # 保存 RunContext 到 log_dir
  302. os.makedirs(run_context.log_dir, exist_ok=True)
  303. context_file_path = os.path.join(run_context.log_dir, "run_context.json")
  304. with open(context_file_path, "w", encoding="utf-8") as f:
  305. json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
  306. print(f"\nRunContext saved to: {context_file_path}")
  307. if __name__ == "__main__":
  308. parser = argparse.ArgumentParser(description="搜索query优化工具")
  309. parser.add_argument(
  310. "--input-dir",
  311. type=str,
  312. default="input/简单扣图",
  313. help="输入目录路径,默认: input/简单扣图"
  314. )
  315. args = parser.parse_args()
  316. asyncio.run(main(args.input_dir))