| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- import asyncio
- import json
- import os
- import argparse
- from datetime import datetime
- from agents import Agent, Runner
- from lib.my_trace import set_trace
- from typing import Literal
- from pydantic import BaseModel, Field
- from lib.utils import read_file_as_string
- from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
- class RunContext(BaseModel):
- version: str = Field(..., description="当前运行的脚本版本(文件名)")
- input_files: dict[str, str] = Field(..., description="输入文件路径映射")
- q_with_context: str
- q_context: str
- q: str
- log_url: str
- log_dir: str
- question_annotation: str | None = Field(default=None, description="问题的标注结果")
- operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史")
- final_output: str | None = Field(default=None, description="最终输出结果")
- # ============================================================================
- # Agent 1: 问题标注专家
- # ============================================================================
- question_annotation_instructions = """
- 你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。
- ## 三层结构
- **[本质]** - 问题的核心意图,改变后是完全不同的问题
- - 如何获取、教程、推荐、作品、测评等
- **[硬]** - 在本质意图下,必须满足的约束
- - 地域、时间、对象、质量要求等
- **[软]** - 可有可无的修饰
- - 能体现、特色、快速、简单等
- ## 输出格式
- 词语[本质-描述]、词语[硬-描述]、词语[软-描述]
- ## 示例
- 输入:如何获取能体现川西秋季特色的高质量风光摄影素材?
- 输出:如何获取[本质-找方法] 川西[硬-地域] 秋季[硬-季节] 高质量[硬-质量] 风光摄影素材[硬-对象] 能体现[软-修饰] 特色[软-修饰]
- 输入:PS抠图教程
- 输出:PS[硬-工具] 抠图[硬-需求] 教程[本质-学习]
- 输入:川西秋季风光摄影作品
- 输出:川西[硬-地域] 秋季[硬-季节] 风光摄影[硬-对象] 作品[本质-欣赏]
- ## 注意
- - 只输出标注后的字符串
- - 结合需求背景判断意图
- """.strip()
- question_annotator = Agent[None](
- name="问题标注专家",
- instructions=question_annotation_instructions,
- )
- # ============================================================================
- # Agent 2: 评估专家
- # ============================================================================
- eval_instructions = """
- 你是搜索query评估专家。给定原始问题标注(三层)和推荐query,评估三个分数。
- ## 评估目标
- 用这个推荐query搜索,能否找到满足原始需求的内容?
- ## 三层评分
- ### 1. essence_score(本质/意图)= 0 或 1
- 推荐query的本质/意图是否与原问题一致?
- **原问题标注中的[本质-XXX]:**
- - 找方法/如何获取 → 推荐词应该是方法/获取途径
- - 教程/学习 → 推荐词应该是教程/教学
- - 作品/欣赏 → 推荐词应该是作品展示
- - 工具/推荐 → 推荐词应该是工具推荐
- **评分:**
- - 1 = 本质一致
- - 0 = 本质改变(完全答非所问)
- ### 2. hard_score(硬性约束)= 0 或 1
- 在本质一致的前提下,是否满足所有硬性约束?
- **原问题标注中的[硬-XXX]:**地域、时间、对象、质量、工具等
- **评分:**
- - 1 = 所有硬性约束都满足
- - 0 = 任一硬性约束不满足
- ### 3. soft_score(软性修饰)= 0-1
- 软性修饰词保留了多少?
- **评分参考:**
- - 1.0 = 完整保留
- - 0.7-0.9 = 保留核心
- - 0.4-0.6 = 部分丢失
- - 0-0.3 = 大量丢失
- ## 示例
- **原问题标注:** 如何获取[本质-找方法] 川西[硬-地域] 秋季[硬-季节] 高质量[硬-质量] 风光摄影素材[硬-对象] 能体现[软-修饰] 特色[软-修饰]
- **推荐query1:** 川西秋季风光摄影素材视频
- - essence_score=0(找方法→找素材本身,本质变了)
- - hard_score=1(地域、季节、对象都符合)
- - soft_score=0.5(丢失"高质量")
- - reason: 本质改变,用户要找获取素材的方法,推荐词是素材内容本身
- **推荐query2:** 川西秋季风光摄影素材网站推荐
- - essence_score=1(找方法→推荐网站,本质一致)
- - hard_score=1(所有硬性约束满足)
- - soft_score=0.8(保留核心,"高质量"未明确但推荐通常筛选过)
- - reason: 本质一致,硬性约束满足,软性略有丢失但可接受
- ## 注意
- - essence=0 直接拒绝,不管hard/soft多高
- - essence=1, hard=0 也要拒绝
- - essence=1, hard=1 才看soft_score
- """.strip()
- class EvaluationFeedback(BaseModel):
- """评估反馈模型 - 三层评分"""
- essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1")
- hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1")
- soft_score: float = Field(..., description="软性修饰完整度,0-1")
- reason: str = Field(..., description="评估理由")
- evaluator = Agent[None](
- name="评估专家",
- instructions=eval_instructions,
- output_type=EvaluationFeedback,
- )
- # ============================================================================
- # Agent 3: 修改策略生成专家
- # ============================================================================
- strategy_instructions = """
- 你是query修改策略专家。基于所有历史尝试,生成下一步的query修改策略。
- ## 输入
- - 原始问题标注(三层)
- - 当前query
- - 当前轮推荐词及其评估结果(essence_score, hard_score, soft_score, reason)
- - **历史尝试记录**:之前所有轮次的query、修改策略、评估结果
- ## 分析维度
- ### 0. 首先查看历史尝试(关键!)
- - 之前用过哪些query?结果如何?
- - 哪些操作类型已经尝试过?(简化、扩展、替换、组合)
- - 哪些方向是死路?(多次简化仍返回空/essence=0)
- - 是否有改进趋势?(从空列表→有推荐词,从essence=0→essence=1)
- - **避免重复无效的策略**
- ### 1. 检查当前轮是否有推荐词
- - 如果返回空列表:query可能过于复杂或生僻
- - 结合历史:之前是否也返回空列表?如果多次空列表,需要大幅调整
- ### 2. 分析 essence_score
- - 如果全是 essence_score=0:本质方向错了
- - 结合历史:之前essence情况如何?是否曾经有essence=1的结果?
- ### 3. 分析 hard_score(仅看 essence_score=1 的)
- - 如果全是 hard_score=0:查看reason,哪些硬性约束不满足?
- - 结合历史:哪些约束反复不满足?
- ### 4. 分析 soft_score(仅看 essence=1 且 hard=1 的)
- - 如果 soft_score < 0.7:查看reason,丢失了哪些信息?
- - 结合历史:soft_score的变化趋势?
- ## 四种操作类型
- **简化**:删除冗余词汇,提取核心关键词
- - 适用场景:query过于复杂,返回空列表或推荐词发散
- - 注意:如果历史中已多次简化仍无效,应尝试其他操作
- **扩展**:添加限定词或场景描述
- - 适用场景:hard_score低,需要补充约束信息
- - 注意:查看历史中哪些约束信息被丢失
- **替换**:使用同义词、行业术语或口语化表达
- - 适用场景:essence_score低,核心概念理解偏差
- - 注意:如果历史显示简化无效,替换可能是突破口
- **组合**:调整关键词顺序或组合方式
- - 适用场景:结构不合理,需要调整重点
- - 注意:基于历史中哪些关键词组合更有效
- ## 输出要求
- 必须输出以下字段:
- - operation_type: "简化" | "扩展" | "替换" | "组合"
- - new_query: 修改后的新query
- - reason: 详细理由,必须包括:
- 1. **历史尝试总结**:之前尝试了哪些query和策略?结果如何?
- 2. **当前轮评估发现**:引用具体的分数和理由
- 3. **为什么这样修改**:结合历史经验说明
- 4. **预期改进**:基于历史趋势的预期
- ## 注意
- - **优先考虑历史经验,避免重复失败的策略**
- - 理由必须基于实际数据,不能编造
- - 如果返回空列表,必须在reason中说明,并结合历史判断原因
- """.strip()
- class ModificationStrategy(BaseModel):
- """修改策略模型"""
- operation_type: Literal["简化", "扩展", "替换", "组合"] = Field(..., description="操作类型")
- new_query: str = Field(..., description="修改后的新query")
- reason: str = Field(..., description="修改理由")
- strategy_generator = Agent[None](
- name="策略生成专家",
- instructions=strategy_instructions,
- output_type=ModificationStrategy,
- )
- # ============================================================================
- # 核心函数
- # ============================================================================
- async def annotate_question(q_with_context: str) -> str:
- """标注问题(三层)"""
- print("\n正在标注问题...")
- result = await Runner.run(question_annotator, q_with_context)
- annotation = str(result.final_output)
- print(f"问题标注完成:{annotation}")
- return annotation
- async def get_suggestions_with_eval(query: str, annotation: str, context: RunContext) -> list[dict]:
- """获取推荐词并评估"""
- print(f"\n正在获取推荐词:{query}")
- # 1. 调用小红书API
- xiaohongshu_api = XiaohongshuSearchRecommendations()
- query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
- print(f"获取到 {len(query_suggestions) if query_suggestions else 0} 个推荐词:{query_suggestions}")
- if not query_suggestions:
- # 记录到历史
- context.operations_history.append({
- "operation_type": "get_query_suggestions",
- "timestamp": datetime.now().isoformat(),
- "query": query,
- "suggestions": [],
- "evaluations": "未返回任何推荐词",
- })
- return []
- # 2. 并发评估所有推荐词
- async def evaluate_single_query(q_sug: str):
- eval_input = f"""
- <原始问题标注(三层)>
- {annotation}
- </原始问题标注(三层)>
- <待评估的推荐query>
- {q_sug}
- </待评估的推荐query>
- 请评估该推荐query的三个分数:
- 1. essence_score: 本质/意图是否一致(0或1)
- 2. hard_score: 硬性约束是否满足(0或1)
- 3. soft_score: 软性修饰保留程度(0-1)
- 4. reason: 详细的评估理由
- """
- evaluator_result = await Runner.run(evaluator, eval_input)
- result: EvaluationFeedback = evaluator_result.final_output
- return {
- "query": q_sug,
- "essence_score": result.essence_score,
- "hard_score": result.hard_score,
- "soft_score": result.soft_score,
- "reason": result.reason,
- }
- evaluations = await asyncio.gather(*[evaluate_single_query(q_sug) for q_sug in query_suggestions])
- # 3. 记录到历史
- context.operations_history.append({
- "operation_type": "get_query_suggestions",
- "timestamp": datetime.now().isoformat(),
- "query": query,
- "suggestions": query_suggestions,
- "evaluations": evaluations,
- })
- return evaluations
- async def generate_modification_strategy(
- current_query: str,
- evaluations: list[dict],
- annotation: str,
- context: RunContext
- ) -> ModificationStrategy:
- """生成修改策略"""
- print("\n正在生成修改策略...")
- # 整理历史尝试记录 - 完整保留推荐词和评估结果
- history_records = []
- round_num = 0
- for op in context.operations_history:
- if op["operation_type"] == "get_query_suggestions":
- round_num += 1
- record = {
- "round": round_num,
- "query": op["query"],
- "suggestions": op["suggestions"],
- "evaluations": op["evaluations"]
- }
- history_records.append(record)
- elif op["operation_type"] == "modify_query":
- # 修改操作也记录,但不增加轮数
- history_records.append({
- "operation": "modify_query",
- "modification_type": op["modification_type"],
- "original_query": op["original_query"],
- "new_query": op["new_query"],
- "reason": op["reason"]
- })
- # 格式化历史记录为JSON
- history_json = json.dumps(history_records, ensure_ascii=False, indent=2)
- strategy_input = f"""
- <原始问题标注(三层)>
- {annotation}
- </原始问题标注(三层)>
- <历史尝试记录(完整)>
- {history_json}
- </历史尝试记录(完整)>
- <当前query>
- {current_query}
- </当前query>
- <当前轮推荐词评估结果>
- {json.dumps(evaluations, ensure_ascii=False, indent=2) if evaluations else "空列表"}
- </当前轮推荐词评估结果>
- 请基于所有历史尝试和当前评估结果,生成下一步的query修改策略。
- 重点关注:
- 1. **查看历史中每个推荐词的详细评估**:
- - 哪些推荐词的essence_score=1?它们的表述有什么特点?
- - 哪些推荐词的hard_score=1?它们保留了哪些约束?
- - 每个推荐词的reason中提到了什么问题?
- 2. **避免重复失败的方向**:
- - 如果某个方向多次返回essence=0,避免继续
- - 如果某个操作类型反复无效,尝试其他类型
- 3. **识别有效的表述方式**:
- - 历史中是否有essence=1的推荐词?学习它们的表述
- - 哪些关键词组合更容易被正确理解?
- 4. **基于趋势做决策**:
- - 是否有改进趋势(空列表→有推荐词,essence=0→essence=1)?
- - 还是陷入死胡同(多次尝试无改善)?
- """
- result = await Runner.run(strategy_generator, strategy_input)
- strategy: ModificationStrategy = result.final_output
- return strategy
- def find_best_qualified_query(evaluations: list[dict], min_soft_score: float = 0.7) -> dict | None:
- """查找最佳合格query"""
- qualified = [
- e for e in evaluations
- if e['essence_score'] == 1
- and e['hard_score'] == 1
- and e['soft_score'] >= min_soft_score
- ]
- if qualified:
- return max(qualified, key=lambda x: x['soft_score'])
- return None
- # ============================================================================
- # 主流程(代码控制)
- # ============================================================================
- async def optimize_query(context: RunContext, max_rounds: int = 20) -> dict:
- """
- 主优化流程 - 由代码控制
- Args:
- context: 运行上下文
- max_rounds: 最大迭代轮数,默认20
- 返回格式:
- {
- "success": True/False,
- "result": {...} or None,
- "message": "..."
- }
- """
- # 1. 标注问题(仅一次)
- annotation = await annotate_question(context.q_with_context)
- context.question_annotation = annotation
- # 2. 迭代优化
- current_query = context.q
- for round_num in range(1, max_rounds + 1):
- print(f"\n{'='*60}")
- print(f"第 {round_num} 轮:{'使用原始问题' if round_num == 1 else '使用修改后的query'}")
- print(f"当前query: {current_query}")
- print(f"{'='*60}")
- # 获取推荐词并评估
- evaluations = await get_suggestions_with_eval(current_query, annotation, context)
- if evaluations:
- # 检查是否找到合格query
- best = find_best_qualified_query(evaluations, min_soft_score=0.7)
- if best:
- return {
- "success": True,
- "result": best,
- "message": f"第{round_num}轮找到合格query"
- }
- # 如果是最后一轮,不再生成策略
- if round_num == max_rounds:
- break
- # 生成修改策略
- print(f"\n--- 生成修改策略 ---")
- strategy = await generate_modification_strategy(current_query, evaluations, annotation, context)
- print(f"\n修改策略:")
- print(f" 操作类型:{strategy.operation_type}")
- print(f" 原query:{current_query}")
- print(f" 新query:{strategy.new_query}")
- print(f" 理由:{strategy.reason}")
- # 记录修改
- context.operations_history.append({
- "operation_type": "modify_query",
- "timestamp": datetime.now().isoformat(),
- "modification_type": strategy.operation_type,
- "original_query": current_query,
- "new_query": strategy.new_query,
- "reason": strategy.reason,
- })
- # 更新当前query
- current_query = strategy.new_query
- # 所有轮次后仍未找到,降低标准查找
- print(f"\n{'='*60}")
- print(f"{max_rounds}轮后未找到最优query,降低标准(soft_score >= 0.5)")
- print(f"{'='*60}")
- best_acceptable = find_best_qualified_query(evaluations, min_soft_score=0.5)
- if best_acceptable:
- return {
- "success": True,
- "result": best_acceptable,
- "message": f"{max_rounds}轮后找到可接受query(soft_score >= 0.5)"
- }
- # 完全失败:找出最接近的
- essence_hard_ok = [
- e for e in evaluations
- if e['essence_score'] == 1 and e['hard_score'] == 1
- ]
- if essence_hard_ok:
- closest = max(essence_hard_ok, key=lambda x: x['soft_score'])
- return {
- "success": False,
- "result": closest,
- "message": f"未找到合格query,最接近的soft_score={closest['soft_score']}"
- }
- return {
- "success": False,
- "result": None,
- "message": "未找到任何满足本质和硬性约束的推荐词"
- }
- # ============================================================================
- # 输出格式化
- # ============================================================================
- def format_output(optimization_result: dict, context: RunContext) -> str:
- """格式化输出结果"""
- if optimization_result["success"]:
- result = optimization_result["result"]
- return f"""
- 原始问题:{context.q}
- 优化后的query:{result['query']}
- 本质匹配度:{result['essence_score']} (1=本质一致)
- 硬性约束匹配度:{result['hard_score']} (1=所有约束满足)
- 软性修饰完整度:{result['soft_score']} (0-1)
- 评估理由:{result['reason']}
- 状态:{optimization_result['message']}
- """.strip()
- else:
- output = f"""
- 原始问题:{context.q}
- 结果:未找到合格推荐query
- 原因:{optimization_result['message']}
- """
- if optimization_result["result"]:
- result = optimization_result["result"]
- output += f"""
- 最接近的推荐词:{result['query']}
- - essence_score: {result['essence_score']}
- - hard_score: {result['hard_score']}
- - soft_score: {result['soft_score']}
- - reason: {result['reason']}
- """
- output += "\n建议:尝试简化问题或调整需求描述"
- return output.strip()
- # ============================================================================
- # 主函数
- # ============================================================================
- async def main(input_dir: str, max_rounds: int = 20):
- current_time, log_url = set_trace()
- # 从目录中读取固定文件名
- input_context_file = os.path.join(input_dir, 'context.md')
- input_q_file = os.path.join(input_dir, 'q.md')
- q_context = read_file_as_string(input_context_file)
- q = read_file_as_string(input_q_file)
- q_with_context = f"""
- <需求上下文>
- {q_context}
- </需求上下文>
- <当前问题>
- {q}
- </当前问题>
- """.strip()
- # 获取当前文件名作为版本
- version = os.path.basename(__file__)
- version_name = os.path.splitext(version)[0]
- # 日志保存目录
- log_dir = os.path.join(input_dir, "output", version_name, current_time)
- run_context = RunContext(
- version=version,
- input_files={
- "input_dir": input_dir,
- "context_file": input_context_file,
- "q_file": input_q_file,
- },
- q_with_context=q_with_context,
- q_context=q_context,
- q=q,
- log_dir=log_dir,
- log_url=log_url,
- )
- # 执行优化流程(代码控制)
- optimization_result = await optimize_query(run_context, max_rounds=max_rounds)
- # 格式化输出
- final_output = format_output(optimization_result, run_context)
- print(f"\n{'='*60}")
- print("最终结果")
- print(f"{'='*60}")
- print(final_output)
- # 保存结果
- run_context.final_output = final_output
- # 保存 RunContext 到 log_dir
- os.makedirs(run_context.log_dir, exist_ok=True)
- context_file_path = os.path.join(run_context.log_dir, "run_context.json")
- with open(context_file_path, "w", encoding="utf-8") as f:
- json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
- print(f"\nRunContext saved to: {context_file_path}")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="搜索query优化工具")
- parser.add_argument(
- "--input-dir",
- type=str,
- default="input/简单扣图",
- help="输入目录路径,默认: input/简单扣图"
- )
- parser.add_argument(
- "--max-rounds",
- type=int,
- default=20,
- help="最大迭代轮数,默认: 20"
- )
- args = parser.parse_args()
- asyncio.run(main(args.input_dir, max_rounds=args.max_rounds))
|