sug_v4_2.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. import asyncio
  2. import json
  3. import os
  4. import argparse
  5. from datetime import datetime
  6. from agents import Agent, Runner, function_tool, AgentOutputSchema
  7. from lib.my_trace import set_trace
  8. from typing import Literal
  9. from pydantic import BaseModel, Field
  10. from lib.utils import read_file_as_string
  11. from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
  12. from agents import Agent, RunContextWrapper, Runner, function_tool
  13. from pydantic import BaseModel, Field
  14. class RunContext(BaseModel):
  15. version: str = Field(..., description="当前运行的脚本版本(文件名)")
  16. input_files: dict[str, str] = Field(..., description="输入文件路径映射,如 {'context_file': '...', 'q_file': '...'}")
  17. q_with_context: str
  18. q_context: str
  19. q: str
  20. log_url: str
  21. log_dir: str
  22. # 问题标注结果 - 直接用字符串记录
  23. question_annotation: str | None = Field(default=None, description="问题的标注结果,类似NER格式")
  24. # 中间数据记录 - 按时间顺序记录所有操作
  25. operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史,包括 get_query_suggestions 和 modify_query")
  26. # 最终输出结果
  27. final_output: str | None = Field(default=None, description="Agent的最终输出结果")
  28. # 问题标注 Agent - 三层标注
  29. question_annotation_instructions = """
  30. 你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。
  31. ## 三层结构
  32. **[本质]** - 问题的核心意图,改变后是完全不同的问题
  33. - 如何获取、教程、推荐、作品、测评等
  34. **[硬]** - 在本质意图下,必须满足的约束
  35. - 地域、时间、对象、质量要求等
  36. **[软]** - 可有可无的修饰
  37. - 能体现、特色、快速、简单等
  38. ## 输出格式
  39. 词语[本质-描述]、词语[硬-描述]、词语[软-描述]
  40. ## 注意
  41. - 只输出标注后的字符串
  42. - 结合需求背景判断意图
  43. """.strip()
  44. question_annotator = Agent[None](
  45. name="问题标注专家",
  46. instructions=question_annotation_instructions,
  47. )
  48. eval_instructions = """
  49. 你是搜索query评估专家。给定原始问题标注(三层)和推荐query,评估三个分数。
  50. ## 评估目标
  51. 用这个推荐query搜索,能否找到满足原始需求的内容?
  52. ## 三层评分
  53. ### 1. essence_score(本质/意图)= 0 或 1
  54. 推荐query的本质/意图是否与原问题一致?
  55. **原问题标注中的[本质-XXX]:**
  56. - 找方法/如何获取 → 推荐词应该是方法/获取途径
  57. - 教程/学习 → 推荐词应该是教程/教学
  58. - 作品/欣赏 → 推荐词应该是作品展示
  59. - 工具/推荐 → 推荐词应该是工具推荐
  60. **评分:**
  61. - 1 = 本质一致
  62. - 0 = 本质改变(完全答非所问)
  63. ### 2. hard_score(硬性约束)= 0 或 1
  64. 在本质一致的前提下,是否满足所有硬性约束?
  65. **原问题标注中的[硬-XXX]:**地域、时间、对象、质量、工具(如果用户明确要求)等
  66. **评分:**
  67. - 1 = 所有硬性约束都满足
  68. - 0 = 任一硬性约束不满足
  69. ### 3. soft_score(软性修饰)= 0-1
  70. 软性修饰词保留了多少?
  71. **原问题标注中的[软-XXX]:**修饰词、非关键限定等
  72. **评分参考:**
  73. - 1.0 = 完整保留
  74. - 0.7-0.9 = 保留核心
  75. - 0.4-0.6 = 部分丢失
  76. - 0-0.3 = 大量丢失
  77. ## 注意
  78. - essence=0 直接拒绝,不管hard/soft多高
  79. - essence=1, hard=0 也要拒绝
  80. - essence=1, hard=1 才看soft_score
  81. """.strip()
  82. class EvaluationFeedback(BaseModel):
  83. """评估反馈模型 - 三层评分"""
  84. essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1。1=问题本质/意图一致,0=本质改变")
  85. hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1。1=所有硬性约束都满足,0=任一硬性约束不满足")
  86. soft_score: float = Field(..., description="软性修饰完整度,0-1的浮点数。1.0=完整保留,0.7-0.9=保留核心,0.4-0.6=泛化较大,0-0.3=大量丢失")
  87. reason: str = Field(..., description="评估理由,包括:1)本质/意图是否一致 2)硬性约束是否满足 3)软性修饰保留情况 4)搜索预期")
  88. evaluator = Agent[None](
  89. name="评估专家",
  90. instructions=eval_instructions,
  91. output_type=EvaluationFeedback,
  92. )
  93. @function_tool
  94. async def get_query_suggestions(wrapper: RunContextWrapper[RunContext], query: str):
  95. """Fetch search recommendations from Xiaohongshu."""
  96. # 1. 首次调用时,先标注问题(带需求背景)
  97. if wrapper.context.question_annotation is None:
  98. print("正在标注问题...")
  99. annotation_result = await Runner.run(question_annotator, wrapper.context.q_with_context)
  100. wrapper.context.question_annotation = str(annotation_result.final_output)
  101. print(f"问题标注完成:{wrapper.context.question_annotation}")
  102. # 2. 获取推荐词
  103. xiaohongshu_api = XiaohongshuSearchRecommendations()
  104. query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
  105. print(f"获取到 {len(query_suggestions) if query_suggestions else 0} 个推荐词:{query_suggestions}")
  106. # 3. 评估推荐词(三层评分)
  107. async def evaluate_single_query(q_sug: str):
  108. """Evaluate a single query suggestion."""
  109. eval_input = f"""
  110. <原始问题标注(三层)>
  111. {wrapper.context.question_annotation}
  112. </原始问题标注(三层)>
  113. <待评估的推荐query>
  114. {q_sug}
  115. </待评估的推荐query>
  116. 请评估该推荐query的三个分数:
  117. 1. essence_score: 本质/意图是否一致(0或1)
  118. 2. hard_score: 硬性约束是否满足(0或1)
  119. 3. soft_score: 软性修饰保留程度(0-1)
  120. 4. reason: 详细的评估理由
  121. """
  122. evaluator_result = await Runner.run(evaluator, eval_input)
  123. result: EvaluationFeedback = evaluator_result.final_output
  124. return {
  125. "query": q_sug,
  126. "essence_score": result.essence_score,
  127. "hard_score": result.hard_score,
  128. "soft_score": result.soft_score,
  129. "reason": result.reason,
  130. }
  131. # 并发执行所有评估任务
  132. res = []
  133. if query_suggestions:
  134. res = await asyncio.gather(*[evaluate_single_query(q_sug) for q_sug in query_suggestions])
  135. else:
  136. res = '未返回任何推荐词'
  137. # 记录到 RunContext
  138. wrapper.context.operations_history.append({
  139. "operation_type": "get_query_suggestions",
  140. "timestamp": datetime.now().isoformat(),
  141. "query": query,
  142. "suggestions": query_suggestions,
  143. "evaluations": res,
  144. })
  145. return res
  146. @function_tool
  147. def modify_query(wrapper: RunContextWrapper[RunContext], original_query: str, operation_type: str, new_query: str, reason: str):
  148. """
  149. Modify the search query with a specific operation.
  150. Args:
  151. original_query: The original query before modification
  152. operation_type: Type of modification - must be one of: "简化", "扩展", "替换", "组合"
  153. new_query: The modified query after applying the operation
  154. reason: Detailed explanation of why this modification was made and what insight from previous suggestions led to this change
  155. Returns:
  156. A dict containing the modification record and the new query to use for next search
  157. """
  158. operation_types = ["简化", "扩展", "替换", "组合"]
  159. if operation_type not in operation_types:
  160. return {
  161. "status": "error",
  162. "message": f"Invalid operation_type. Must be one of: {', '.join(operation_types)}"
  163. }
  164. modification_record = {
  165. "original_query": original_query,
  166. "operation_type": operation_type,
  167. "new_query": new_query,
  168. "reason": reason,
  169. }
  170. # 记录到 RunContext
  171. wrapper.context.operations_history.append({
  172. "operation_type": "modify_query",
  173. "timestamp": datetime.now().isoformat(),
  174. "modification_type": operation_type,
  175. "original_query": original_query,
  176. "new_query": new_query,
  177. "reason": reason,
  178. })
  179. return {
  180. "status": "success",
  181. "modification_record": modification_record,
  182. "new_query": new_query,
  183. "message": f"Query modified successfully. Use '{new_query}' for the next search."
  184. }
  185. instructions = """
  186. 你是一个专业的搜索query优化专家,擅长通过动态探索找到最符合用户搜索习惯的query。
  187. ## 核心任务
  188. 给定原始问题,通过迭代调用搜索推荐接口(get_query_suggestions),找到满足硬性要求且尽量保留软性信息的推荐query。
  189. ## 重要说明
  190. - **你不需要自己评估query的适配性**
  191. - get_query_suggestions 函数会:
  192. 1. 首次调用时自动标注问题(三层:本质、硬性、软性)
  193. 2. 对每个推荐词进行三维度评估
  194. - 返回结果包含:
  195. - **query**:推荐词
  196. - **essence_score**:本质/意图匹配度(0或1),0=本质改变,1=本质一致
  197. - **hard_score**:硬性约束匹配度(0或1),0=不满足约束,1=满足所有约束
  198. - **soft_score**:软性修饰完整度(0-1),越高表示保留的信息越完整
  199. - **reason**:详细的评估理由
  200. - **你的职责是分析评估结果,做出决策和策略调整**
  201. ## 防止幻觉 - 关键原则
  202. - **严禁编造数据**:只能基于 get_query_suggestions 实际返回的结果进行分析
  203. - **空结果处理**:如果返回的列表为空([]),必须明确说明"未返回任何推荐词"
  204. - **不要猜测**:在 modify_query 的 reason 中,不能引用不存在的推荐词或评分
  205. - **如实记录**:每次分析都要如实反映实际返回的数据
  206. ## 工作流程
  207. ### 1. 理解原始问题
  208. - 仔细阅读<需求上下文>和<当前问题>
  209. - 提取问题的核心需求和关键概念
  210. - 明确问题的本质意图(what)、应用场景(where)、实现方式(how)
  211. ### 2. 动态探索策略
  212. **第一轮尝试:**
  213. - 使用原始问题直接调用 get_query_suggestions(query="原始问题")
  214. - 第一次调用会自动标注问题(三层),后续调用会复用该标注
  215. - **检查返回结果**:
  216. - 如果返回空列表 []:说明"该query未返回任何推荐词",需要简化或替换query
  217. - 如果有推荐词:查看每个推荐词的 essence_score、hard_score、soft_score 和 reason
  218. - **做出判断**:是否有 essence_score=1 且 hard_score=1 且 soft_score >= 0.7 的推荐词?
  219. **后续迭代:**
  220. 如果没有合格推荐词(或返回空列表),必须先调用 modify_query 记录修改,然后再次搜索:
  221. **工具使用流程:**
  222. 1. **分析评估反馈**(必须基于实际返回的数据):
  223. - **情况A - 返回空列表**:
  224. * 在 reason 中说明:"第X轮未返回任何推荐词,可能是query过于复杂或生僻"
  225. * 不能编造任何推荐词或评分
  226. - **情况B - 有推荐词但无合格词**:
  227. * **首先检查 essence_score**:
  228. - 如果全是 essence_score=0:本质/意图完全不对,需要重新理解问题
  229. - 如果有 essence_score=1:本质对了,继续分析
  230. * **分析 essence_score=1 且 hard_score=1 的推荐词**:
  231. - 有哪些?soft_score 是多少?
  232. - 如果 soft_score 较低(<0.7),reason 中说明丢失了哪些信息?
  233. - 能否通过修改query提高 soft_score?
  234. * **如果 essence_score=1 但全是 hard_score=0**:
  235. - reason 中说明了哪些硬性约束不满足?(地域、时间、对象、质量等)
  236. - 需要如何调整query才能满足硬性约束?
  237. 2. **决策修改策略**:基于实际评估反馈,调用 modify_query(original_query, operation_type, new_query, reason)
  238. - reason 必须引用具体的 essence_score、hard_score、soft_score 和评估理由
  239. - 不能编造任何数据
  240. 3. 使用返回的 new_query 调用 get_query_suggestions
  241. 4. 分析新的评估结果,如果仍不满足,重复步骤1-3
  242. **四种操作类型(operation_type):**
  243. - **简化**:删除冗余词汇,提取核心关键词(当推荐词过于发散时)
  244. - **扩展**:添加限定词或场景描述(当推荐词过于泛化时)
  245. - **替换**:使用同义词、行业术语或口语化表达(当推荐词偏离核心时)
  246. - **组合**:调整关键词顺序或组合方式(当推荐词结构不合理时)
  247. **每次修改的reason必须包含:**
  248. - 上一轮评估结果的关键发现(引用具体的 essence_score、hard_score、soft_score 和评估理由)
  249. - 基于评估反馈,为什么这样修改
  250. - 预期这次修改会带来什么改进
  251. ### 3. 决策标准
  252. 采用**三级评分标准**:
  253. **优先级1:本质/意图(最高优先级)**
  254. - **essence_score = 1**:本质一致,继续检查
  255. - **essence_score = 0**:本质改变,**直接放弃**
  256. **优先级2:硬性约束(必须满足)**
  257. - **hard_score = 1**:所有约束满足,继续检查
  258. - **hard_score = 0**:约束不满足,**直接放弃**
  259. **优先级3:软性修饰(越高越好)**
  260. - **soft_score >= 0.7**:信息保留较完整,**理想结果**
  261. - **0.5 <= soft_score < 0.7**:有所丢失但可接受,**备选结果**
  262. - **soft_score < 0.5**:丢失过多,继续优化
  263. **采纳标准:**
  264. - **最优**:essence=1 且 hard=1 且 soft >= 0.7
  265. - **可接受**:essence=1 且 hard=1 且 soft >= 0.5(多次尝试后)
  266. - **不可接受**:essence=0 或 hard=0(无论soft多高)
  267. ### 4. 迭代终止条件
  268. - **成功终止**:找到 essence=1 且 hard=1 且 soft >= 0.7 的推荐query
  269. - **可接受终止**:5轮后找到 essence=1 且 hard=1 且 soft >= 0.5 的推荐query
  270. - **失败终止**:最多5轮
  271. - **无推荐词**:返回空列表或错误
  272. ### 5. 输出要求
  273. **成功找到合格query时:**
  274. ```
  275. 原始问题:[原问题]
  276. 优化后的query:[最终推荐query]
  277. 本质匹配度:[essence_score] (1=本质一致)
  278. 硬性约束匹配度:[hard_score] (1=所有约束满足)
  279. 软性修饰完整度:[soft_score] (0-1)
  280. 评估理由:[简要说明]
  281. ```
  282. **未找到合格query时:**
  283. ```
  284. 原始问题:[原问题]
  285. 结果:未找到合格推荐query
  286. 原因:[本质不符 / 硬性约束不满足 / 软性信息丢失过多]
  287. 最接近的推荐词:[如果有essence=1且hard=1的词,列出soft最高的]
  288. 建议:[简要建议]
  289. ```
  290. ## 注意事项
  291. - **第一轮必须使用原始问题**:直接调用 get_query_suggestions(query="原始问题")
  292. - 第一次调用会自动标注问题(三层),打印出标注结果
  293. - 后续调用会复用该标注进行评估
  294. - **后续修改必须调用 modify_query**:不能直接用新query调用 get_query_suggestions
  295. - **重点关注评估结果**:每次都要仔细分析返回的三个分数
  296. - **essence_score=0 直接放弃**,本质不对
  297. - **hard_score=0 也直接放弃**,约束不满足
  298. - **优先关注 essence=1 且 hard=1 的推荐词**,分析如何提升 soft_score
  299. - **基于数据决策**:修改策略必须基于评估反馈
  300. - 引用具体的 essence_score、hard_score、soft_score
  301. - 引用 reason 中的关键发现
  302. - **采纳标准明确**:
  303. - **最优**:essence=1 且 hard=1 且 soft >= 0.7,立即采纳
  304. - **可接受**:essence=1 且 hard=1 且 soft >= 0.5,多次尝试后可采纳
  305. - **不可接受**:essence=0 或 hard=0,无论soft多高都不能用
  306. - **严禁编造数据**:
  307. * 如果返回空列表,必须明确说明"未返回任何推荐词"
  308. * 不能引用不存在的推荐词、分数或评估理由
  309. * 每次 modify_query 的 reason 必须基于上一轮实际返回的结果
  310. """.strip()
  311. async def main(input_dir: str):
  312. current_time, log_url = set_trace()
  313. # 从目录中读取固定文件名
  314. input_context_file = os.path.join(input_dir, 'context.md')
  315. input_q_file = os.path.join(input_dir, 'q.md')
  316. q_context = read_file_as_string(input_context_file)
  317. q = read_file_as_string(input_q_file)
  318. q_with_context = f"""
  319. <需求上下文>
  320. {q_context}
  321. </需求上下文>
  322. <当前问题>
  323. {q}
  324. </当前问题>
  325. """.strip()
  326. # 获取当前文件名作为版本
  327. version = os.path.basename(__file__)
  328. version_name = os.path.splitext(version)[0] # 去掉 .py 后缀
  329. # 日志保存到输入目录的 output/版本/时间戳 目录下
  330. log_dir = os.path.join(input_dir, "output", version_name, current_time)
  331. run_context = RunContext(
  332. version=version,
  333. input_files={
  334. "input_dir": input_dir,
  335. "context_file": input_context_file,
  336. "q_file": input_q_file,
  337. },
  338. q_with_context=q_with_context,
  339. q_context=q_context,
  340. q=q,
  341. log_dir=log_dir,
  342. log_url=log_url,
  343. )
  344. agent = Agent[RunContext](
  345. name="Query Optimization Agent",
  346. instructions=instructions,
  347. tools=[get_query_suggestions, modify_query],
  348. )
  349. result = await Runner.run(agent, input=q_with_context, context = run_context,)
  350. print(result.final_output)
  351. # 保存最终输出到 RunContext
  352. run_context.final_output = str(result.final_output)
  353. # 保存 RunContext 到 log_dir
  354. os.makedirs(run_context.log_dir, exist_ok=True)
  355. context_file_path = os.path.join(run_context.log_dir, "run_context.json")
  356. with open(context_file_path, "w", encoding="utf-8") as f:
  357. json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
  358. print(f"\nRunContext saved to: {context_file_path}")
  359. if __name__ == "__main__":
  360. parser = argparse.ArgumentParser(description="搜索query优化工具")
  361. parser.add_argument(
  362. "--input-dir",
  363. type=str,
  364. default="input/简单扣图",
  365. help="输入目录路径,默认: input/简单扣图"
  366. )
  367. args = parser.parse_args()
  368. asyncio.run(main(args.input_dir))