yangxiaohui 1 mês atrás
pai
commit
7e160503d6
1 arquivos alterados com 627 adições e 0 exclusões
  1. 627 0
      sug_v6_0_progressive_exploration.py

+ 627 - 0
sug_v6_0_progressive_exploration.py

@@ -0,0 +1,627 @@
+import asyncio
+import json
+import os
+import argparse
+from datetime import datetime
+
+from agents import Agent, Runner
+from lib.my_trace import set_trace
+from typing import Literal
+from pydantic import BaseModel, Field
+
+from lib.utils import read_file_as_string
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
+
+
+class RunContext(BaseModel):
+    version: str = Field(..., description="当前运行的脚本版本(文件名)")
+    input_files: dict[str, str] = Field(..., description="输入文件路径映射")
+    q_with_context: str
+    q_context: str
+    q: str
+    log_url: str
+    log_dir: str
+
+    # 探索阶段记录
+    keywords: list[str] | None = Field(default=None, description="提取的关键词")
+    exploration_levels: list[dict] = Field(default_factory=list, description="每一层的探索结果")
+    level_analyses: list[dict] = Field(default_factory=list, description="每一层的主Agent分析")
+
+    # 最终结果
+    final_candidates: list[str] | None = Field(default=None, description="最终选出的候选query")
+    evaluation_results: list[dict] | None = Field(default=None, description="候选query的评估结果")
+    optimization_result: dict | None = Field(default=None, description="最终优化结果对象")
+    final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)")
+
+
+# ============================================================================
+# Agent 1: 关键词提取专家
+# ============================================================================
+keyword_extraction_instructions = """
+你是关键词提取专家。给定一个搜索问题(含上下文),提取出**最细粒度的关键概念**。
+
+## 提取原则
+
+1. **细粒度优先**:拆分成最小的有意义单元
+   - 不要保留完整的长句
+   - 拆分成独立的、有搜索意义的词或短语
+
+2. **保留核心维度**:
+   - 地域/对象
+   - 时间
+   - 行为/意图:获取、教程、推荐、如何等
+   - 主题/领域
+   - 质量/属性
+
+3. **去掉无意义的虚词**:的、吗、呢等
+
+4. **保留领域专有词**:不要过度拆分专业术语
+   - 如果是常见的组合词,保持完整
+
+## 输出要求
+
+输出关键词列表,按重要性排序(最核心的在前)。
+""".strip()
+
+class KeywordList(BaseModel):
+    """关键词列表"""
+    keywords: list[str] = Field(..., description="提取的关键词,按重要性排序")
+    reasoning: str = Field(..., description="提取理由")
+
+keyword_extractor = Agent[None](
+    name="关键词提取专家",
+    instructions=keyword_extraction_instructions,
+    output_type=KeywordList,
+)
+
+
+# ============================================================================
+# Agent 2: 层级探索分析专家
+# ============================================================================
+level_analysis_instructions = """
+你是搜索空间探索分析专家。基于当前层级的探索结果,决定下一步行动。
+
+## 你的任务
+
+分析当前已探索的词汇空间,判断:
+1. **发现了什么有价值的信号?**
+2. **是否已经可以评估候选了?**
+3. **如果还不够,下一层应该探索什么组合?**
+
+## 分析维度
+
+### 1. 信号识别(最重要)
+
+看推荐词里**出现了什么主题**:
+
+**关键问题:**
+- 哪些推荐词**最接近原始需求**?
+- 哪些推荐词**揭示了有价值的方向**(即使不完全匹配)?
+- 哪些推荐词可以作为**下一层探索的桥梁**?
+- 系统对哪些概念理解得好?哪些理解偏了?
+
+### 2. 组合策略
+
+基于发现的信号,设计下一层探索:
+
+**组合类型:**
+
+a) **关键词直接组合**
+   - 两个关键词组合成新query
+
+b) **利用推荐词作为桥梁**(重要!)
+   - 发现某个推荐词很有价值 → 直接探索这个推荐词
+   - 或在推荐词基础上加其他关键词
+
+c) **跨层级组合**
+   - 结合多层发现的有价值推荐词
+   - 组合成更复杂的query
+
+### 3. 停止条件
+
+**何时可以评估候选?**
+
+满足以下之一:
+- 推荐词中出现了**明确包含原始需求多个核心要素的query**
+- 已经探索到**足够复杂的组合**(3-4个关键词),且推荐词相关
+- 探索了**3-4层**,信息已经足够丰富
+
+**何时继续探索?**
+- 当前推荐词太泛,没有接近原始需求
+- 发现了有价值的信号,但需要进一步组合验证
+- 层数还少(< 3层)
+
+## 输出要求
+
+### 1. key_findings
+总结当前层发现的关键信息,包括:
+- 哪些推荐词最有价值?
+- 系统对哪些概念理解得好/不好?
+- 发现了什么意外的方向?
+
+### 2. promising_signals
+列出最有价值的推荐词(来自任何已探索的query),每个说明为什么有价值
+格式:[{"query": "...", "from_level": 1, "reason": "..."}]
+
+### 3. should_evaluate_now
+是否已经可以开始评估候选了?true/false
+
+### 4. candidates_to_evaluate
+如果should_evaluate_now=true,列出应该评估的候选query
+- 可以是推荐词
+- 可以是自己构造的组合
+
+### 5. next_combinations
+如果should_evaluate_now=false,列出下一层应该探索的query组合
+
+### 6. reasoning
+详细的推理过程
+
+## 重要原则
+
+1. **不要过早评估**:至少探索2层,除非第一层就发现了完美匹配
+2. **充分利用推荐词**:推荐词是系统给的提示,要善用
+3. **保持探索方向的多样性**:不要只盯着一个方向
+4. **识别死胡同**:如果某个方向的推荐词一直不相关,果断放弃
+""".strip()
+
+class LevelAnalysis(BaseModel):
+    """层级分析结果"""
+    key_findings: str = Field(..., description="当前层的关键发现")
+    promising_signals: list[dict] = Field(..., description="有价值的推荐词信号,格式:[{\"query\": \"...\", \"from_level\": 1, \"reason\": \"...\"}]")
+    should_evaluate_now: bool = Field(..., description="是否应该开始评估候选")
+    candidates_to_evaluate: list[str] = Field(default_factory=list, description="如果should_evaluate_now=true,要评估的候选query列表")
+    next_combinations: list[str] = Field(default_factory=list, description="如果should_evaluate_now=false,下一层要探索的query组合")
+    reasoning: str = Field(..., description="详细的推理过程")
+
+level_analyzer = Agent[None](
+    name="层级探索分析专家",
+    instructions=level_analysis_instructions,
+    output_type=LevelAnalysis,
+)
+
+
+# ============================================================================
+# Agent 3: 评估专家(复用v5_3的评估逻辑)
+# ============================================================================
+eval_instructions = """
+你是搜索query评估专家。给定原始问题和推荐query,评估三个分数。
+
+## 评估目标
+
+用这个推荐query搜索,能否找到满足原始需求的内容?
+
+## 三层评分
+
+### 1. essence_score(本质/意图)= 0 或 1
+
+推荐query的本质/意图是否与原问题一致?
+
+**判断标准:**
+- 原问题的核心意图是什么?(找方法、找教程、找作品、找工具、找资源等)
+- 推荐词是否明确表达了相同的意图?
+
+**评分原则:**
+- 1 = 本质一致,推荐词**明确表达**相同意图
+- 0 = 本质改变或**不够明确**
+
+### 2. hard_score(硬性约束)= 0 或 1
+
+在本质一致的前提下,是否满足所有硬性约束?
+
+**硬性约束**:地域、时间、对象、工具等客观可验证的限定
+
+**评分:**
+- 1 = 所有硬性约束都满足
+- 0 = 任一硬性约束不满足
+
+### 3. soft_score(软性修饰)= 0-1
+
+软性修饰词(质量、特色、美观等主观评价)保留了多少?
+
+**评分参考:**
+- 1.0 = 完整保留
+- 0.7-0.9 = 保留核心
+- 0.4-0.6 = 部分丢失
+- 0-0.3 = 大量丢失
+
+## 注意
+
+- essence=0 直接拒绝,不管hard/soft多高
+- essence=1, hard=0 也要拒绝
+- essence=1, hard=1 才看soft_score
+""".strip()
+
+class EvaluationFeedback(BaseModel):
+    """评估反馈模型 - 三层评分"""
+    essence_score: Literal[0, 1] = Field(..., description="本质/意图匹配度,0或1")
+    hard_score: Literal[0, 1] = Field(..., description="硬性约束匹配度,0或1")
+    soft_score: float = Field(..., description="软性修饰完整度,0-1")
+    reason: str = Field(..., description="评估理由")
+
+evaluator = Agent[None](
+    name="评估专家",
+    instructions=eval_instructions,
+    output_type=EvaluationFeedback,
+)
+
+
+# ============================================================================
+# 核心函数
+# ============================================================================
+
+async def extract_keywords(q_with_context: str) -> KeywordList:
+    """提取关键词"""
+    print("\n正在提取关键词...")
+    result = await Runner.run(keyword_extractor, q_with_context)
+    keyword_list: KeywordList = result.final_output
+    print(f"提取的关键词:{keyword_list.keywords}")
+    print(f"提取理由:{keyword_list.reasoning}")
+    return keyword_list
+
+
+async def explore_level(queries: list[str], level_num: int, context: RunContext) -> dict:
+    """探索一个层级(并发获取所有query的推荐词)"""
+    print(f"\n{'='*60}")
+    print(f"Level {level_num} 探索:{len(queries)} 个query")
+    print(f"{'='*60}")
+
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    # 并发获取所有推荐词
+    async def get_single_sug(query: str):
+        print(f"  探索: {query}")
+        suggestions = xiaohongshu_api.get_recommendations(keyword=query)
+        print(f"    → {len(suggestions) if suggestions else 0} 个推荐词")
+        return {
+            "query": query,
+            "suggestions": suggestions or []
+        }
+
+    results = await asyncio.gather(*[get_single_sug(q) for q in queries])
+
+    level_data = {
+        "level": level_num,
+        "timestamp": datetime.now().isoformat(),
+        "queries": results
+    }
+
+    context.exploration_levels.append(level_data)
+    return level_data
+
+
+async def analyze_level(level_data: dict, all_levels: list[dict], original_question: str, context: RunContext) -> LevelAnalysis:
+    """分析当前层级,决定下一步"""
+    print(f"\n正在分析 Level {level_data['level']}...")
+
+    # 构造输入
+    analysis_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<已探索的所有层级>
+{json.dumps(all_levels, ensure_ascii=False, indent=2)}
+</已探索的所有层级>
+
+<当前层级>
+Level {level_data['level']}
+{json.dumps(level_data['queries'], ensure_ascii=False, indent=2)}
+</当前层级>
+
+请分析当前探索状态,决定下一步行动。
+"""
+
+    result = await Runner.run(level_analyzer, analysis_input)
+    analysis: LevelAnalysis = result.final_output
+
+    print(f"\n分析结果:")
+    print(f"  关键发现:{analysis.key_findings}")
+    print(f"  有价值的信号:{len(analysis.promising_signals)} 个")
+    print(f"  是否评估:{analysis.should_evaluate_now}")
+
+    if analysis.should_evaluate_now:
+        print(f"  候选query:{analysis.candidates_to_evaluate}")
+    else:
+        print(f"  下一层探索:{analysis.next_combinations}")
+
+    # 保存分析结果
+    context.level_analyses.append({
+        "level": level_data['level'],
+        "timestamp": datetime.now().isoformat(),
+        "analysis": analysis.model_dump()
+    })
+
+    return analysis
+
+
+async def evaluate_candidates(candidates: list[str], original_question: str, context: RunContext) -> list[dict]:
+    """评估候选query"""
+    print(f"\n{'='*60}")
+    print(f"评估 {len(candidates)} 个候选query")
+    print(f"{'='*60}")
+
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    async def evaluate_single_candidate(candidate: str):
+        print(f"\n评估候选:{candidate}")
+
+        # 1. 获取推荐词
+        suggestions = xiaohongshu_api.get_recommendations(keyword=candidate)
+        print(f"  获取到 {len(suggestions) if suggestions else 0} 个推荐词")
+
+        if not suggestions:
+            return {
+                "candidate": candidate,
+                "suggestions": [],
+                "evaluations": []
+            }
+
+        # 2. 评估每个推荐词
+        async def eval_single_sug(sug: str):
+            eval_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<待评估的推荐query>
+{sug}
+</待评估的推荐query>
+
+请评估该推荐query的三个分数:
+1. essence_score: 本质/意图是否一致(0或1)
+2. hard_score: 硬性约束是否满足(0或1)
+3. soft_score: 软性修饰保留程度(0-1)
+4. reason: 详细的评估理由
+"""
+            result = await Runner.run(evaluator, eval_input)
+            evaluation: EvaluationFeedback = result.final_output
+            return {
+                "query": sug,
+                "essence_score": evaluation.essence_score,
+                "hard_score": evaluation.hard_score,
+                "soft_score": evaluation.soft_score,
+                "reason": evaluation.reason,
+            }
+
+        evaluations = await asyncio.gather(*[eval_single_sug(s) for s in suggestions])
+
+        return {
+            "candidate": candidate,
+            "suggestions": suggestions,
+            "evaluations": evaluations
+        }
+
+    results = await asyncio.gather(*[evaluate_single_candidate(c) for c in candidates])
+
+    context.evaluation_results = results
+    return results
+
+
+def find_qualified_queries(evaluation_results: list[dict], min_soft_score: float = 0.7) -> list[dict]:
+    """查找所有合格的query"""
+    all_qualified = []
+
+    for result in evaluation_results:
+        for eval_item in result.get("evaluations", []):
+            if (eval_item['essence_score'] == 1
+                and eval_item['hard_score'] == 1
+                and eval_item['soft_score'] >= min_soft_score):
+                all_qualified.append({
+                    "from_candidate": result["candidate"],
+                    **eval_item
+                })
+
+    # 按soft_score降序排列
+    return sorted(all_qualified, key=lambda x: x['soft_score'], reverse=True)
+
+
+# ============================================================================
+# 主流程
+# ============================================================================
+
+async def progressive_exploration(context: RunContext, max_levels: int = 4) -> dict:
+    """
+    渐进式广度探索流程
+
+    Args:
+        context: 运行上下文
+        max_levels: 最大探索层数,默认4
+
+    返回格式:
+    {
+        "success": True/False,
+        "results": [...],
+        "message": "..."
+    }
+    """
+
+    # 阶段1:提取关键词
+    keyword_result = await extract_keywords(context.q_with_context)
+    context.keywords = keyword_result.keywords
+
+    # 阶段2:渐进式探索
+    current_level = 1
+
+    # Level 1:单个关键词
+    level_1_queries = context.keywords[:7]  # 限制最多7个关键词
+    level_1_data = await explore_level(level_1_queries, current_level, context)
+
+    # 分析Level 1
+    analysis_1 = await analyze_level(level_1_data, context.exploration_levels, context.q, context)
+
+    if analysis_1.should_evaluate_now:
+        # 直接评估
+        eval_results = await evaluate_candidates(analysis_1.candidates_to_evaluate, context.q, context)
+        qualified = find_qualified_queries(eval_results, min_soft_score=0.7)
+
+        if qualified:
+            return {
+                "success": True,
+                "results": qualified,
+                "message": f"Level 1 即找到 {len(qualified)} 个合格query"
+            }
+
+    # Level 2 及以后:迭代探索
+    for level_num in range(2, max_levels + 1):
+        # 获取上一层的分析结果
+        prev_analysis: LevelAnalysis = context.level_analyses[-1]["analysis"]
+        prev_analysis = LevelAnalysis(**prev_analysis)  # 转回对象
+
+        if not prev_analysis.next_combinations:
+            print(f"\nLevel {level_num-1} 分析后无需继续探索")
+            break
+
+        # 探索当前层
+        level_data = await explore_level(prev_analysis.next_combinations, level_num, context)
+
+        # 分析当前层
+        analysis = await analyze_level(level_data, context.exploration_levels, context.q, context)
+
+        if analysis.should_evaluate_now:
+            # 评估候选
+            eval_results = await evaluate_candidates(analysis.candidates_to_evaluate, context.q, context)
+            qualified = find_qualified_queries(eval_results, min_soft_score=0.7)
+
+            if qualified:
+                return {
+                    "success": True,
+                    "results": qualified,
+                    "message": f"Level {level_num} 找到 {len(qualified)} 个合格query"
+                }
+
+    # 所有层探索完,降低标准
+    print(f"\n{'='*60}")
+    print(f"探索完 {max_levels} 层,降低标准(soft_score >= 0.5)")
+    print(f"{'='*60}")
+
+    if context.evaluation_results:
+        acceptable = find_qualified_queries(context.evaluation_results, min_soft_score=0.5)
+        if acceptable:
+            return {
+                "success": True,
+                "results": acceptable,
+                "message": f"找到 {len(acceptable)} 个可接受query(soft_score >= 0.5)"
+            }
+
+    # 完全失败
+    return {
+        "success": False,
+        "results": [],
+        "message": "探索完所有层级,未找到合格的推荐词"
+    }
+
+
+# ============================================================================
+# 输出格式化
+# ============================================================================
+
+def format_output(optimization_result: dict, context: RunContext) -> str:
+    """格式化输出结果"""
+    results = optimization_result.get("results", [])
+
+    output = f"原始问题:{context.q}\n"
+    output += f"提取的关键词:{', '.join(context.keywords or [])}\n"
+    output += f"探索层数:{len(context.exploration_levels)}\n"
+    output += f"状态:{optimization_result['message']}\n\n"
+
+    if optimization_result["success"] and results:
+        output += "合格的推荐query(按soft_score降序):\n"
+        for i, result in enumerate(results, 1):
+            output += f"\n{i}. {result['query']}\n"
+            output += f"   - 来自候选:{result['from_candidate']}\n"
+            output += f"   - 本质匹配度:{result['essence_score']} (1=本质一致)\n"
+            output += f"   - 硬性约束匹配度:{result['hard_score']} (1=所有约束满足)\n"
+            output += f"   - 软性修饰完整度:{result['soft_score']:.2f} (0-1)\n"
+            output += f"   - 评估理由:{result['reason']}\n"
+    else:
+        output += "结果:未找到合格推荐query\n"
+        if context.level_analyses:
+            last_analysis = context.level_analyses[-1]["analysis"]
+            output += f"\n最后一层分析:\n{last_analysis.get('key_findings', 'N/A')}\n"
+
+    return output.strip()
+
+
+# ============================================================================
+# 主函数
+# ============================================================================
+
+async def main(input_dir: str, max_levels: int = 4):
+    current_time, log_url = set_trace()
+
+    # 从目录中读取固定文件名
+    input_context_file = os.path.join(input_dir, 'context.md')
+    input_q_file = os.path.join(input_dir, 'q.md')
+
+    q_context = read_file_as_string(input_context_file)
+    q = read_file_as_string(input_q_file)
+    q_with_context = f"""
+<需求上下文>
+{q_context}
+</需求上下文>
+<当前问题>
+{q}
+</当前问题>
+""".strip()
+
+    # 获取当前文件名作为版本
+    version = os.path.basename(__file__)
+    version_name = os.path.splitext(version)[0]
+
+    # 日志保存目录
+    log_dir = os.path.join(input_dir, "output", version_name, current_time)
+
+    run_context = RunContext(
+        version=version,
+        input_files={
+            "input_dir": input_dir,
+            "context_file": input_context_file,
+            "q_file": input_q_file,
+        },
+        q_with_context=q_with_context,
+        q_context=q_context,
+        q=q,
+        log_dir=log_dir,
+        log_url=log_url,
+    )
+
+    # 执行渐进式探索
+    optimization_result = await progressive_exploration(run_context, max_levels=max_levels)
+
+    # 格式化输出
+    final_output = format_output(optimization_result, run_context)
+    print(f"\n{'='*60}")
+    print("最终结果")
+    print(f"{'='*60}")
+    print(final_output)
+
+    # 保存结果
+    run_context.optimization_result = optimization_result
+    run_context.final_output = final_output
+
+    # 保存 RunContext 到 log_dir
+    os.makedirs(run_context.log_dir, exist_ok=True)
+    context_file_path = os.path.join(run_context.log_dir, "run_context.json")
+    with open(context_file_path, "w", encoding="utf-8") as f:
+        json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
+    print(f"\nRunContext saved to: {context_file_path}")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="搜索query优化工具 - 渐进式广度探索版")
+    parser.add_argument(
+        "--input-dir",
+        type=str,
+        default="input/简单扣图",
+        help="输入目录路径,默认: input/简单扣图"
+    )
+    parser.add_argument(
+        "--max-levels",
+        type=int,
+        default=4,
+        help="最大探索层数,默认: 4"
+    )
+    args = parser.parse_args()
+
+    asyncio.run(main(args.input_dir, max_levels=args.max_levels))