sug_v2.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import asyncio
  2. import json
  3. import os
  4. from datetime import datetime
  5. from agents import Agent, Runner, function_tool
  6. from lib.my_trace import set_trace
  7. from typing import Literal
  8. from dataclasses import dataclass
  9. from pydantic import BaseModel, Field
  10. from lib.utils import read_file_as_string
  11. from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
  12. from agents import Agent, RunContextWrapper, Runner, function_tool
  13. from pydantic import BaseModel, Field
  14. class RunContext(BaseModel):
  15. version: str = Field(..., description="当前运行的脚本版本(文件名)")
  16. input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}")
  17. q_with_context: str
  18. q_context: str
  19. q: str
  20. log_url: str
  21. log_dir: str
  22. # 中间数据记录 - 按时间顺序记录所有操作
  23. operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query")
  24. eval_insrtuctions = """
  25. 你是一个专业的评估专家,负责评估给定的搜索query是否满足原始问题和需求,给出出评分和简明扼要的理由。
  26. """
  27. @dataclass
  28. class EvaluationFeedback:
  29. reason: str=Field(..., description="简明扼要的理由")
  30. score: float=Field(..., description="评估结果,1表示等价,0表示不等价,中间值表示部分等价")
  31. evaluator = Agent[None](
  32. name="评估专家",
  33. instructions=eval_insrtuctions,
  34. output_type=EvaluationFeedback,
  35. )
  36. @function_tool
  37. async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str):
  38. """Fetch search recommendations from Xiaohongshu."""
  39. xiaohongshu_api = XiaohongshuSearchRecommendations()
  40. query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
  41. print(query_suggestions)
  42. async def evaluate_single_query(q_sug: str, q_with_context: str):
  43. """Evaluate a single query suggestion."""
  44. eval_input = f"""
  45. {q_with_context}
  46. <待评估的推荐query>
  47. {q_sug}
  48. </待评估的推荐query>
  49. """
  50. evaluator_result = await Runner.run(evaluator, eval_input)
  51. result: EvaluationFeedback = evaluator_result.final_output
  52. return {
  53. "query": q_sug,
  54. "score": result.score,
  55. "reason": result.reason,
  56. }
  57. # 并发执行所有评估任务
  58. q_with_context = wrapper.context.q_with_context
  59. res = []
  60. if query_suggestions:
  61. res = await asyncio.gather(*[evaluate_single_query(q_sug, q_with_context) for q_sug in query_suggestions])
  62. else:
  63. res = '未返回任何推荐词'
  64. # 记录到 RunContext
  65. wrapper.context.operations_history.append({
  66. "operation_type": "get_query_suggestions",
  67. "timestamp": datetime.now().isoformat(),
  68. "query": query,
  69. "suggestions": query_suggestions,
  70. "evaluations": res,
  71. })
  72. return res
  73. @function_tool
  74. def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str):
  75. """
  76. Modify the search query with a specific operation.
  77. Args:
  78. original_query: The original query before modification
  79. operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合"
  80. new_query: The modified query after applying the operation
  81. reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change
  82. Returns:
  83. A dict containing the modification record and the new query to use for next search
  84. """
  85. operation_types = ["简化", "扩展", "替换", "组合"]
  86. if operation_type not in operation_types:
  87. return {
  88. "status": "error",
  89. "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}"
  90. }
  91. modification_record = {
  92. "original_query": original_query,
  93. "operation_type": operation_type,
  94. "new_query": new_query,
  95. "reason": reason,
  96. }
  97. # 记录到 RunContext
  98. wrapper.context.operations_history.append({
  99. "operation_type": "modify_query",
  100. "timestamp": datetime.now().isoformat(),
  101. "modification_type": operation_type,
  102. "original_query": original_query,
  103. "new_query": new_query,
  104. "reason": reason,
  105. })
  106. return {
  107. "status": "success",
  108. "modification_record": modification_record,
  109. "new_query": new_query,
  110. "message": f"Query modified successfully. Use '{new_query}' for the next search."
  111. }
  112. insrtuctions = """
  113. 你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。
  114. ## 核心任务
  115. 给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到与原始问题语义等价且更符合平台用户搜索习惯的推荐query。
  116. ## 重要说明
  117. - **你不需要自己评估query的等价性**
  118. - get_query_suggestions 函数内部已集成评估子agent,会自动对每个推荐词进行评估
  119. - 返回结果包含:query(推荐词)、score(评分,1表示等价,0表示不等价)、reason(评估理由)
  120. - **你的职责是分析评估结果,做出决策和策略调整**
  121. ## 防止幻觉 - 关键原则
  122. - **严禁编造数据**:只能基于 get_query_suggestions 实际返回的结果进行分析
  123. - **空结果处理**:如果返回的列表为空([]),必须明确说明"未返回任何推荐词"
  124. - **不要猜测**:在 modify_query 的 reason 中,不能引用不存在的推荐词或评分
  125. - **如实记录**:每次分析都要如实反映实际返回的数据
  126. ## 工作流程
  127. ### 1. 理解原始问题
  128. - 仔细阅读<需求上下文>和<当前问题>
  129. - 提取问题的核心需求和关键概念
  130. - 明确问题的本质意图(what)、应用场景(where)、实现方式(how)
  131. ### 2. 动态探索策略
  132. **第一轮尝试:**
  133. - 使用原始问题直接调用 get_query_suggestions(query="原始问题")
  134. - **检查返回结果**:
  135. - 如果返回空列表 []:说明"该query未返回任何推荐词",需要简化或替换query
  136. - 如果有推荐词:查看每个推荐词的 score 和 reason
  137. - **做出判断**:是否有 score >= 0.8 的高分推荐词?
  138. **后续迭代:**
  139. 如果没有高分推荐词(或返回空列表),必须先调用 modify_query 记录修改,然后再次搜索:
  140. **工具使用流程:**
  141. 1. **分析评估反馈**(必须基于实际返回的数据):
  142. - **情况A - 返回空列表**:
  143. * 在 reason 中说明:"第X轮未返回任何推荐词,可能是query过于复杂或生僻"
  144. * 不能编造任何推荐词或评分
  145. - **情况B - 有推荐词但无高分**:
  146. * 哪些推荐词得分较高?具体是多少分?评估理由是什么?
  147. * 哪些推荐词偏离了原问题?如何偏离的?
  148. * 推荐词整体趋势是什么?(过于泛化/具体化/领域偏移等)
  149. 2. **决策修改策略**:基于实际评估反馈,调用 modify_query(original_query, operation_type, new_query, reason)
  150. - reason 必须引用具体的数据,不能编造
  151. 3. 使用返回的 new_query 调用 get_query_suggestions
  152. 4. 分析新的评估结果,如果仍不满足,重复步骤1-3
  153. **四种操作类型(operation_type):**
  154. - **简化**:删除冗余词汇,提取核心关键词(当推荐词过于发散时)
  155. - **扩展**:添加限定词或场景描述(当推荐词过于泛化时)
  156. - **替换**:使用同义词、行业术语或口语化表达(当推荐词偏离核心时)
  157. - **组合**:调整关键词顺序或组合方式(当推荐词结构不合理时)
  158. **每次修改的reason必须包含:**
  159. - 上一轮评估结果的关键发现(引用具体的score和reason)
  160. - 基于评估反馈,为什么这样修改
  161. - 预期这次修改会带来什么改进
  162. ### 3. 决策标准
  163. - **score >= 0.8**:认为该推荐词与原问题等价,可以作为最终结果
  164. - **0.5 <= score < 0.8**:部分等价,分析reason看是否可接受
  165. - **score < 0.5**:不等价,需要继续优化
  166. ### 4. 迭代终止条件
  167. - **成功终止**:找到至少一个 score >= 0.8 的推荐query
  168. - **尝试上限**:最多迭代5轮,避免无限循环
  169. - **无推荐词**:推荐接口返回空列表或错误
  170. ### 5. 输出要求
  171. **成功找到等价query时,输出格式:**
  172. ```
  173. 原始问题:[原问题]
  174. 优化后的query:[最终找到的等价推荐query]
  175. 评分:[score]
  176. ```
  177. **未找到等价query时,输出格式:**
  178. ```
  179. 原始问题:[原问题]
  180. 结果:未找到完全等价的推荐query
  181. 建议:[简要建议,如:直接使用原问题搜索 或 使用最接近的推荐词]
  182. ```
  183. ## 注意事项
  184. - **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题")
  185. - **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions
  186. - **重点关注评估结果**:每次都要仔细分析返回的 score 和 reason
  187. - **基于数据决策**:修改策略必须基于评估反馈,不能凭空猜测
  188. - **引用具体评分**:在分析和决策时,引用具体的score数值和reason内容
  189. - **优先选择高分推荐词**:score >= 0.8 即可认为等价
  190. - **严禁编造数据**:
  191. * 如果返回空列表,必须在 reason 中明确说明"未返回任何推荐词"
  192. * 不能引用不存在的推荐词、评分或评估理由
  193. * 每次 modify_query 的 reason 必须基于上一轮实际返回的结果
  194. """.strip()
  195. async def main():
  196. current_time, log_url = set_trace()
  197. # 输入文件路径
  198. input_context_file = 'input/kg_v1_single_context.md'
  199. input_q_file = 'input/kg_v1_single_q.md'
  200. q_context = read_file_as_string(input_context_file)
  201. q = read_file_as_string(input_q_file)
  202. q_with_context = f"""
  203. <需求上下文>
  204. {q_context}
  205. </需求上下文>
  206. <当前问题>
  207. {q}
  208. </当前问题>
  209. """.strip()
  210. log_dir = os.path.join("logs", current_time)
  211. # 获取当前文件名作为版本
  212. version = os.path.basename(__file__)
  213. run_context = RunContext(
  214. version=version,
  215. input_files={
  216. "context_file": input_context_file,
  217. "q_file": input_q_file,
  218. },
  219. q_with_context=q_with_context,
  220. q_context=q_context,
  221. q=q,
  222. log_dir=log_dir,
  223. log_url=log_url,
  224. )
  225. agent = Agent[RunContext](
  226. name="Query Optimization Agent",
  227. instructions=insrtuctions,
  228. tools=[get_query_suggestions, modify_query],
  229. )
  230. result = await Runner.run(agent, input=q_with_context, context = run_context,)
  231. print(result.final_output)
  232. # 保存 RunContext 到 log_dir
  233. os.makedirs(run_context.log_dir, exist_ok=True)
  234. context_file_path = os.path.join(run_context.log_dir, "run_context.json")
  235. with open(context_file_path, "w", encoding="utf-8") as f:
  236. json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
  237. print(f"\nRunContext saved to: {context_file_path}")
  238. if __name__ == "__main__":
  239. asyncio.run(main())