sug_v5.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. import asyncio
  2. import json
  3. import os
  4. import argparse
  5. from datetime import datetime
  6. from agents import Agent, Runner
  7. from lib.my_trace import set_trace
  8. from typing import Literal
  9. from pydantic import BaseModel, Field
  10. from lib.utils import read_file_as_string
  11. from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
  12. class RunContext(BaseModel):
  13. version: str = Field(..., description="当前运行的脚本版本(文件名)")
  14. input_files: dict[str, str] = Field(..., description="输入文件路径映射")
  15. q_with_context: str
  16. q_context: str
  17. q: str
  18. log_url: str
  19. log_dir: str
  20. question_annotation: str | None = Field(default=None, description="问题的标注结果")
  21. operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史")
  22. final_output: str | None = Field(default=None, description="最终输出结果")
  23. # ============================================================================
  24. # Agent 1: 问题标注专家
  25. # ============================================================================
  26. question_annotation_instructions = """
  27. 你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。
  28. ## 三层结构
  29. **[本质]** - 问题的核心意图,改变后是完全不同的问题
  30. - 如何获取、教程、推荐、作品、测评等
  31. **[硬]** - 在本质意图下,必须满足的约束
  32. - 地域、时间、对象、质量要求等
  33. **[软]** - 可有可无的修饰
  34. - 能体现、特色、快速、简单等
  35. ## 输出格式
  36. 词语[本质-描述]、词语[硬-描述]、词语[软-描述]
  37. ## 示例
  38. 输入:如何获取能体现川西秋季特色的高质量风光摄影素材?
  39. 输出:如何获取[本质-找方法] 川西[硬-地域] 秋季[硬-季节] 高质量[硬-质量] 风光摄影素材[硬-对象] 能体现[软-修饰] 特色[软-修饰]
  40. 输入:PS抠图教程
  41. 输出:PS[硬-工具] 抠图[硬-需求] 教程[本质-学习]
  42. 输入:川西秋季风光摄影作品
  43. 输出:川西[硬-地域] 秋季[硬-季节] 风光摄影[硬-对象] 作品[本质-欣赏]
  44. ## 注意
  45. - 只输出标注后的字符串
  46. - 结合需求背景判断意图
  47. """.strip()
  48. question_annotator = Agent[None](
  49. name="问题标注专家",
  50. instructions=question_annotation_instructions,
  51. )
  52. # ============================================================================
  53. # Agent 2: 评估专家
  54. # ============================================================================
  55. eval_instructions = """
  56. 你是搜索query评估专家。给定原始问题标注(三层)和推荐query,评估三个分数。
  57. ## 评估目标
  58. 用这个推荐query搜索,能否找到满足原始需求的内容?
  59. ## 三层评分
  60. ### 1. essence_score(本质/意图)= 0 或 1
  61. 推荐query的本质/意图是否与原问题一致?
  62. **原问题标注中的[本质-XXX]:**
  63. - 找方法/如何获取 → 推荐词应该是方法/获取途径
  64. - 教程/学习 → 推荐词应该是教程/教学
  65. - 作品/欣赏 → 推荐词应该是作品展示
  66. - 工具/推荐 → 推荐词应该是工具推荐
  67. **评分:**
  68. - 1 = 本质一致
  69. - 0 = 本质改变(完全答非所问)
  70. ### 2. hard_score(硬性约束)= 0 或 1
  71. 在本质一致的前提下,是否满足所有硬性约束?
  72. **原问题标注中的[硬-XXX]:**地域、时间、对象、质量、工具等
  73. **评分:**
  74. - 1 = 所有硬性约束都满足
  75. - 0 = 任一硬性约束不满足
  76. ### 3. soft_score(软性修饰)= 0-1
  77. 软性修饰词保留了多少?
  78. **评分参考:**
  79. - 1.0 = 完整保留
  80. - 0.7-0.9 = 保留核心
  81. - 0.4-0.6 = 部分丢失
  82. - 0-0.3 = 大量丢失
  83. ## 示例
  84. **原问题标注:** 如何获取[本质-找方法] 川西[硬-地域] 秋季[硬-季节] 高质量[硬-质量] 风光摄影素材[硬-对象] 能体现[软-修饰] 特色[软-修饰]
  85. **推荐query1:** 川西秋季风光摄影素材视频
  86. - essence_score=0(找方法→找素材本身,本质变了)
  87. - hard_score=1(地域、季节、对象都符合)
  88. - soft_score=0.5(丢失"高质量")
  89. - reason: 本质改变,用户要找获取素材的方法,推荐词是素材内容本身
  90. **推荐query2:** 川西秋季风光摄影素材网站推荐
  91. - essence_score=1(找方法→推荐网站,本质一致)
  92. - hard_score=1(所有硬性约束满足)
  93. - soft_score=0.8(保留核心,"高质量"未明确但推荐通常筛选过)
  94. - reason: 本质一致,硬性约束满足,软性略有丢失但可接受
  95. ## 注意
  96. - essence=0 直接拒绝,不管hard/soft多高
  97. - essence=1, hard=0 也要拒绝
  98. - essence=1, hard=1 才看soft_score
  99. """.strip()
  100. class EvaluationFeedback(BaseModel):
  101. """评估反馈模型 - 三层评分"""
  102. essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1")
  103. hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1")
  104. soft_score: float = Field(..., description="软性修饰完整度,0-1")
  105. reason: str = Field(..., description="评估理由")
  106. evaluator = Agent[None](
  107. name="评估专家",
  108. instructions=eval_instructions,
  109. output_type=EvaluationFeedback,
  110. )
  111. # ============================================================================
  112. # Agent 3: 修改策略生成专家
  113. # ============================================================================
  114. strategy_instructions = """
  115. 你是query修改策略专家。基于所有历史尝试,生成下一步的query修改策略。
  116. ## 输入
  117. - 原始问题标注(三层)
  118. - 当前query
  119. - 当前轮推荐词及其评估结果(essence_score, hard_score, soft_score, reason)
  120. - **历史尝试记录**:之前所有轮次的query、修改策略、评估结果
  121. ## 分析维度
  122. ### 0. 首先查看历史尝试(关键!)
  123. - 之前用过哪些query?结果如何?
  124. - 哪些操作类型已经尝试过?(简化、扩展、替换、组合)
  125. - 哪些方向是死路?(多次简化仍返回空/essence=0)
  126. - 是否有改进趋势?(从空列表→有推荐词,从essence=0→essence=1)
  127. - **避免重复无效的策略**
  128. ### 1. 检查当前轮是否有推荐词
  129. - 如果返回空列表:query可能过于复杂或生僻
  130. - 结合历史:之前是否也返回空列表?如果多次空列表,需要大幅调整
  131. ### 2. 分析 essence_score
  132. - 如果全是 essence_score=0:本质方向错了
  133. - 结合历史:之前essence情况如何?是否曾经有essence=1的结果?
  134. ### 3. 分析 hard_score(仅看 essence_score=1 的)
  135. - 如果全是 hard_score=0:查看reason,哪些硬性约束不满足?
  136. - 结合历史:哪些约束反复不满足?
  137. ### 4. 分析 soft_score(仅看 essence=1 且 hard=1 的)
  138. - 如果 soft_score < 0.7:查看reason,丢失了哪些信息?
  139. - 结合历史:soft_score的变化趋势?
  140. ## 四种操作类型
  141. **简化**:删除冗余词汇,提取核心关键词
  142. - 适用场景:query过于复杂,返回空列表或推荐词发散
  143. - 注意:如果历史中已多次简化仍无效,应尝试其他操作
  144. **扩展**:添加限定词或场景描述
  145. - 适用场景:hard_score低,需要补充约束信息
  146. - 注意:查看历史中哪些约束信息被丢失
  147. **替换**:使用同义词、行业术语或口语化表达
  148. - 适用场景:essence_score低,核心概念理解偏差
  149. - 注意:如果历史显示简化无效,替换可能是突破口
  150. **组合**:调整关键词顺序或组合方式
  151. - 适用场景:结构不合理,需要调整重点
  152. - 注意:基于历史中哪些关键词组合更有效
  153. ## 输出要求
  154. 必须输出以下字段:
  155. - operation_type: "简化" | "扩展" | "替换" | "组合"
  156. - new_query: 修改后的新query
  157. - reason: 详细理由,必须包括:
  158. 1. **历史尝试总结**:之前尝试了哪些query和策略?结果如何?
  159. 2. **当前轮评估发现**:引用具体的分数和理由
  160. 3. **为什么这样修改**:结合历史经验说明
  161. 4. **预期改进**:基于历史趋势的预期
  162. ## 注意
  163. - **优先考虑历史经验,避免重复失败的策略**
  164. - 理由必须基于实际数据,不能编造
  165. - 如果返回空列表,必须在reason中说明,并结合历史判断原因
  166. """.strip()
  167. class ModificationStrategy(BaseModel):
  168. """修改策略模型"""
  169. operation_type: Literal["简化", "扩展", "替换", "组合"] = Field(..., description="操作类型")
  170. new_query: str = Field(..., description="修改后的新query")
  171. reason: str = Field(..., description="修改理由")
  172. strategy_generator = Agent[None](
  173. name="策略生成专家",
  174. instructions=strategy_instructions,
  175. output_type=ModificationStrategy,
  176. )
  177. # ============================================================================
  178. # 核心函数
  179. # ============================================================================
  180. async def annotate_question(q_with_context: str) -> str:
  181. """标注问题(三层)"""
  182. print("\n正在标注问题...")
  183. result = await Runner.run(question_annotator, q_with_context)
  184. annotation = str(result.final_output)
  185. print(f"问题标注完成:{annotation}")
  186. return annotation
  187. async def get_suggestions_with_eval(query: str, annotation: str, context: RunContext) -> list[dict]:
  188. """获取推荐词并评估"""
  189. print(f"\n正在获取推荐词:{query}")
  190. # 1. 调用小红书API
  191. xiaohongshu_api = XiaohongshuSearchRecommendations()
  192. query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
  193. print(f"获取到 {len(query_suggestions) if query_suggestions else 0} 个推荐词:{query_suggestions}")
  194. if not query_suggestions:
  195. # 记录到历史
  196. context.operations_history.append({
  197. "operation_type": "get_query_suggestions",
  198. "timestamp": datetime.now().isoformat(),
  199. "query": query,
  200. "suggestions": [],
  201. "evaluations": "未返回任何推荐词",
  202. })
  203. return []
  204. # 2. 并发评估所有推荐词
  205. async def evaluate_single_query(q_sug: str):
  206. eval_input = f"""
  207. <原始问题标注(三层)>
  208. {annotation}
  209. </原始问题标注(三层)>
  210. <待评估的推荐query>
  211. {q_sug}
  212. </待评估的推荐query>
  213. 请评估该推荐query的三个分数:
  214. 1. essence_score: 本质/意图是否一致(0或1)
  215. 2. hard_score: 硬性约束是否满足(0或1)
  216. 3. soft_score: 软性修饰保留程度(0-1)
  217. 4. reason: 详细的评估理由
  218. """
  219. evaluator_result = await Runner.run(evaluator, eval_input)
  220. result: EvaluationFeedback = evaluator_result.final_output
  221. return {
  222. "query": q_sug,
  223. "essence_score": result.essence_score,
  224. "hard_score": result.hard_score,
  225. "soft_score": result.soft_score,
  226. "reason": result.reason,
  227. }
  228. evaluations = await asyncio.gather(*[evaluate_single_query(q_sug) for q_sug in query_suggestions])
  229. # 3. 记录到历史
  230. context.operations_history.append({
  231. "operation_type": "get_query_suggestions",
  232. "timestamp": datetime.now().isoformat(),
  233. "query": query,
  234. "suggestions": query_suggestions,
  235. "evaluations": evaluations,
  236. })
  237. return evaluations
  238. async def generate_modification_strategy(
  239. current_query: str,
  240. evaluations: list[dict],
  241. annotation: str,
  242. context: RunContext
  243. ) -> ModificationStrategy:
  244. """生成修改策略"""
  245. print("\n正在生成修改策略...")
  246. # 整理历史尝试记录 - 完整保留推荐词和评估结果
  247. history_records = []
  248. round_num = 0
  249. for op in context.operations_history:
  250. if op["operation_type"] == "get_query_suggestions":
  251. round_num += 1
  252. record = {
  253. "round": round_num,
  254. "query": op["query"],
  255. "suggestions": op["suggestions"],
  256. "evaluations": op["evaluations"]
  257. }
  258. history_records.append(record)
  259. elif op["operation_type"] == "modify_query":
  260. # 修改操作也记录,但不增加轮数
  261. history_records.append({
  262. "operation": "modify_query",
  263. "modification_type": op["modification_type"],
  264. "original_query": op["original_query"],
  265. "new_query": op["new_query"],
  266. "reason": op["reason"]
  267. })
  268. # 格式化历史记录为JSON
  269. history_json = json.dumps(history_records, ensure_ascii=False, indent=2)
  270. strategy_input = f"""
  271. <原始问题标注(三层)>
  272. {annotation}
  273. </原始问题标注(三层)>
  274. <历史尝试记录(完整)>
  275. {history_json}
  276. </历史尝试记录(完整)>
  277. <当前query>
  278. {current_query}
  279. </当前query>
  280. <当前轮推荐词评估结果>
  281. {json.dumps(evaluations, ensure_ascii=False, indent=2) if evaluations else "空列表"}
  282. </当前轮推荐词评估结果>
  283. 请基于所有历史尝试和当前评估结果,生成下一步的query修改策略。
  284. 重点关注:
  285. 1. **查看历史中每个推荐词的详细评估**:
  286. - 哪些推荐词的essence_score=1?它们的表述有什么特点?
  287. - 哪些推荐词的hard_score=1?它们保留了哪些约束?
  288. - 每个推荐词的reason中提到了什么问题?
  289. 2. **避免重复失败的方向**:
  290. - 如果某个方向多次返回essence=0,避免继续
  291. - 如果某个操作类型反复无效,尝试其他类型
  292. 3. **识别有效的表述方式**:
  293. - 历史中是否有essence=1的推荐词?学习它们的表述
  294. - 哪些关键词组合更容易被正确理解?
  295. 4. **基于趋势做决策**:
  296. - 是否有改进趋势(空列表→有推荐词,essence=0→essence=1)?
  297. - 还是陷入死胡同(多次尝试无改善)?
  298. """
  299. result = await Runner.run(strategy_generator, strategy_input)
  300. strategy: ModificationStrategy = result.final_output
  301. return strategy
  302. def find_best_qualified_query(evaluations: list[dict], min_soft_score: float = 0.7) -> dict | None:
  303. """查找最佳合格query"""
  304. qualified = [
  305. e for e in evaluations
  306. if e['essence_score'] == 1
  307. and e['hard_score'] == 1
  308. and e['soft_score'] >= min_soft_score
  309. ]
  310. if qualified:
  311. return max(qualified, key=lambda x: x['soft_score'])
  312. return None
  313. # ============================================================================
  314. # 主流程(代码控制)
  315. # ============================================================================
  316. async def optimize_query(context: RunContext, max_rounds: int = 20) -> dict:
  317. """
  318. 主优化流程 - 由代码控制
  319. Args:
  320. context: 运行上下文
  321. max_rounds: 最大迭代轮数,默认20
  322. 返回格式:
  323. {
  324. "success": True/False,
  325. "result": {...} or None,
  326. "message": "..."
  327. }
  328. """
  329. # 1. 标注问题(仅一次)
  330. annotation = await annotate_question(context.q_with_context)
  331. context.question_annotation = annotation
  332. # 2. 迭代优化
  333. current_query = context.q
  334. for round_num in range(1, max_rounds + 1):
  335. print(f"\n{'='*60}")
  336. print(f"第 {round_num} 轮:{'使用原始问题' if round_num == 1 else '使用修改后的query'}")
  337. print(f"当前query: {current_query}")
  338. print(f"{'='*60}")
  339. # 获取推荐词并评估
  340. evaluations = await get_suggestions_with_eval(current_query, annotation, context)
  341. if evaluations:
  342. # 检查是否找到合格query
  343. best = find_best_qualified_query(evaluations, min_soft_score=0.7)
  344. if best:
  345. return {
  346. "success": True,
  347. "result": best,
  348. "message": f"第{round_num}轮找到合格query"
  349. }
  350. # 如果是最后一轮,不再生成策略
  351. if round_num == max_rounds:
  352. break
  353. # 生成修改策略
  354. print(f"\n--- 生成修改策略 ---")
  355. strategy = await generate_modification_strategy(current_query, evaluations, annotation, context)
  356. print(f"\n修改策略:")
  357. print(f" 操作类型:{strategy.operation_type}")
  358. print(f" 原query:{current_query}")
  359. print(f" 新query:{strategy.new_query}")
  360. print(f" 理由:{strategy.reason}")
  361. # 记录修改
  362. context.operations_history.append({
  363. "operation_type": "modify_query",
  364. "timestamp": datetime.now().isoformat(),
  365. "modification_type": strategy.operation_type,
  366. "original_query": current_query,
  367. "new_query": strategy.new_query,
  368. "reason": strategy.reason,
  369. })
  370. # 更新当前query
  371. current_query = strategy.new_query
  372. # 所有轮次后仍未找到,降低标准查找
  373. print(f"\n{'='*60}")
  374. print(f"{max_rounds}轮后未找到最优query,降低标准(soft_score >= 0.5)")
  375. print(f"{'='*60}")
  376. best_acceptable = find_best_qualified_query(evaluations, min_soft_score=0.5)
  377. if best_acceptable:
  378. return {
  379. "success": True,
  380. "result": best_acceptable,
  381. "message": f"{max_rounds}轮后找到可接受query(soft_score >= 0.5)"
  382. }
  383. # 完全失败:找出最接近的
  384. essence_hard_ok = [
  385. e for e in evaluations
  386. if e['essence_score'] == 1 and e['hard_score'] == 1
  387. ]
  388. if essence_hard_ok:
  389. closest = max(essence_hard_ok, key=lambda x: x['soft_score'])
  390. return {
  391. "success": False,
  392. "result": closest,
  393. "message": f"未找到合格query,最接近的soft_score={closest['soft_score']}"
  394. }
  395. return {
  396. "success": False,
  397. "result": None,
  398. "message": "未找到任何满足本质和硬性约束的推荐词"
  399. }
  400. # ============================================================================
  401. # 输出格式化
  402. # ============================================================================
  403. def format_output(optimization_result: dict, context: RunContext) -> str:
  404. """格式化输出结果"""
  405. if optimization_result["success"]:
  406. result = optimization_result["result"]
  407. return f"""
  408. 原始问题:{context.q}
  409. 优化后的query:{result['query']}
  410. 本质匹配度:{result['essence_score']} (1=本质一致)
  411. 硬性约束匹配度:{result['hard_score']} (1=所有约束满足)
  412. 软性修饰完整度:{result['soft_score']} (0-1)
  413. 评估理由:{result['reason']}
  414. 状态:{optimization_result['message']}
  415. """.strip()
  416. else:
  417. output = f"""
  418. 原始问题:{context.q}
  419. 结果:未找到合格推荐query
  420. 原因:{optimization_result['message']}
  421. """
  422. if optimization_result["result"]:
  423. result = optimization_result["result"]
  424. output += f"""
  425. 最接近的推荐词:{result['query']}
  426. - essence_score: {result['essence_score']}
  427. - hard_score: {result['hard_score']}
  428. - soft_score: {result['soft_score']}
  429. - reason: {result['reason']}
  430. """
  431. output += "\n建议:尝试简化问题或调整需求描述"
  432. return output.strip()
  433. # ============================================================================
  434. # 主函数
  435. # ============================================================================
  436. async def main(input_dir: str, max_rounds: int = 20):
  437. current_time, log_url = set_trace()
  438. # 从目录中读取固定文件名
  439. input_context_file = os.path.join(input_dir, 'context.md')
  440. input_q_file = os.path.join(input_dir, 'q.md')
  441. q_context = read_file_as_string(input_context_file)
  442. q = read_file_as_string(input_q_file)
  443. q_with_context = f"""
  444. <需求上下文>
  445. {q_context}
  446. </需求上下文>
  447. <当前问题>
  448. {q}
  449. </当前问题>
  450. """.strip()
  451. # 获取当前文件名作为版本
  452. version = os.path.basename(__file__)
  453. version_name = os.path.splitext(version)[0]
  454. # 日志保存目录
  455. log_dir = os.path.join(input_dir, "output", version_name, current_time)
  456. run_context = RunContext(
  457. version=version,
  458. input_files={
  459. "input_dir": input_dir,
  460. "context_file": input_context_file,
  461. "q_file": input_q_file,
  462. },
  463. q_with_context=q_with_context,
  464. q_context=q_context,
  465. q=q,
  466. log_dir=log_dir,
  467. log_url=log_url,
  468. )
  469. # 执行优化流程(代码控制)
  470. optimization_result = await optimize_query(run_context, max_rounds=max_rounds)
  471. # 格式化输出
  472. final_output = format_output(optimization_result, run_context)
  473. print(f"\n{'='*60}")
  474. print("最终结果")
  475. print(f"{'='*60}")
  476. print(final_output)
  477. # 保存结果
  478. run_context.final_output = final_output
  479. # 保存 RunContext 到 log_dir
  480. os.makedirs(run_context.log_dir, exist_ok=True)
  481. context_file_path = os.path.join(run_context.log_dir, "run_context.json")
  482. with open(context_file_path, "w", encoding="utf-8") as f:
  483. json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
  484. print(f"\nRunContext saved to: {context_file_path}")
  485. if __name__ == "__main__":
  486. parser = argparse.ArgumentParser(description="搜索query优化工具")
  487. parser.add_argument(
  488. "--input-dir",
  489. type=str,
  490. default="input/简单扣图",
  491. help="输入目录路径,默认: input/简单扣图"
  492. )
  493. parser.add_argument(
  494. "--max-rounds",
  495. type=int,
  496. default=20,
  497. help="最大迭代轮数,默认: 20"
  498. )
  499. args = parser.parse_args()
  500. asyncio.run(main(args.input_dir, max_rounds=args.max_rounds))