yangxiaohui před 1 měsícem
rodič
revize
d7dbf3e5cb

+ 184 - 4
script/search_recommendations/xiaohongshu_search_recommendations.py

@@ -10,8 +10,9 @@ import os
 import argparse
 import time
 import ast
+import hashlib
 from datetime import datetime
-from typing import Dict, Any
+from typing import Dict, Any, Optional
 
 
 class XiaohongshuSearchRecommendations:
@@ -21,12 +22,14 @@ class XiaohongshuSearchRecommendations:
     TOOL_NAME = "Xiaohongshu_Search_Recommendations"
     PLATFORM = "xiaohongshu"  # 平台名称
 
-    def __init__(self, results_dir: str = None):
+    def __init__(self, results_dir: str = None, enable_cache: bool = True, cache_ttl: int = 86400):
         """
         初始化API客户端
 
         Args:
             results_dir: 结果输出目录,默认为脚本所在目录下的 results 文件夹
+            enable_cache: 是否启用缓存(从已保存的文件中读取),默认为 True
+            cache_ttl: 缓存有效期(秒),默认为 86400 秒(24小时)
         """
         self.api_url = f"{self.BASE_URL}/tools/call/{self.TOOL_NAME}"
 
@@ -39,7 +42,61 @@ class XiaohongshuSearchRecommendations:
             project_root = os.path.dirname(os.path.dirname(script_dir))
             self.results_base_dir = os.path.join(project_root, "data", "search_recommendations")
 
-    def get_recommendations(self, keyword: str, timeout: int = 300, max_retries: int = 10, retry_delay: int = 2) -> Dict[str, Any]:
+        # 缓存设置
+        self.enable_cache = enable_cache
+        self.cache_ttl = cache_ttl
+        self._memory_cache = {}  # 内存缓存: {keyword: (data, timestamp)}
+
+    def _get_from_cache(self, keyword: str) -> Optional[Dict[str, Any]]:
+        """
+        从缓存中获取数据(先查内存缓存,再查文件缓存)
+
+        Args:
+            keyword: 搜索关键词
+
+        Returns:
+            缓存的数据,如果没有有效缓存则返回 None
+        """
+        if not self.enable_cache:
+            return None
+
+        current_time = time.time()
+
+        # 1. 检查内存缓存
+        if keyword in self._memory_cache:
+            data, timestamp = self._memory_cache[keyword]
+            if current_time - timestamp < self.cache_ttl:
+                # print(f"从内存缓存中获取关键词 '{keyword}' 的数据")
+                return data
+            else:
+                # 内存缓存已过期,删除
+                del self._memory_cache[keyword]
+
+        # 2. 检查文件缓存(从已保存的文件中读取最新的)
+        result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+        if os.path.exists(result_dir):
+            files = [f for f in os.listdir(result_dir) if f.endswith('.json')]
+            if files:
+                # 按文件名排序(时间戳),获取最新的文件
+                files.sort(reverse=True)
+                latest_file = os.path.join(result_dir, files[0])
+
+                # 检查文件修改时间
+                file_mtime = os.path.getmtime(latest_file)
+                if current_time - file_mtime < self.cache_ttl:
+                    try:
+                        with open(latest_file, 'r', encoding='utf-8') as f:
+                            data = json.load(f)
+                        # 更新内存缓存
+                        self._memory_cache[keyword] = (data, file_mtime)
+                        # print(f"从文件缓存中获取关键词 '{keyword}' 的数据: {latest_file}")
+                        return data
+                    except Exception as e:
+                        print(f"读取缓存文件失败: {e}")
+
+        return None
+
+    def get_recommendations(self, keyword: str, timeout: int = 300, max_retries: int = 10, retry_delay: int = 2, use_cache: bool = True) -> Dict[str, Any]:
         """
         获取小红书搜索推荐词
 
@@ -48,6 +105,7 @@ class XiaohongshuSearchRecommendations:
             timeout: 请求超时时间(秒),默认300秒
             max_retries: 最大重试次数,默认10次
             retry_delay: 重试间隔时间(秒),默认2秒
+            use_cache: 是否使用缓存,默认为 True
 
         Returns:
             API响应的JSON数据
@@ -55,6 +113,14 @@ class XiaohongshuSearchRecommendations:
         Raises:
             requests.exceptions.RequestException: 请求失败时抛出异常
         """
+        # 尝试从缓存获取
+        if use_cache:
+            cached_data = self._get_from_cache(keyword)
+            if cached_data is not None:
+                return cached_data
+
+        # 缓存未命中,发起API请求
+        # print(f"缓存未命中,发起API请求获取关键词 '{keyword}' 的数据")
         payload = {"keyword": keyword}
         last_error = None
 
@@ -76,7 +142,13 @@ class XiaohongshuSearchRecommendations:
 
                 # 成功:code == 0
                 if result.get('code') == 0:
-                    return result['data']['data']
+                    data = result['data']['data']
+                    # 保存到内存缓存
+                    self._memory_cache[keyword] = (data, time.time())
+                    # 自动保存到文件缓存
+                    if self.enable_cache:
+                        self.save_result(keyword, data)
+                    return data
 
                 # 失败:code != 0
                 last_error = f"code={result.get('code')}"
@@ -96,6 +168,114 @@ class XiaohongshuSearchRecommendations:
 
         return []
 
+    def clear_memory_cache(self, keyword: Optional[str] = None):
+        """
+        清除内存缓存
+
+        Args:
+            keyword: 要清除的关键词,如果为 None 则清除所有内存缓存
+        """
+        if keyword:
+            if keyword in self._memory_cache:
+                del self._memory_cache[keyword]
+                print(f"已清除关键词 '{keyword}' 的内存缓存")
+        else:
+            self._memory_cache.clear()
+            print("已清除所有内存缓存")
+
+    def clear_file_cache(self, keyword: Optional[str] = None, keep_latest: bool = True):
+        """
+        清除文件缓存
+
+        Args:
+            keyword: 要清除的关键词,如果为 None 则清除所有文件缓存
+            keep_latest: 是否保留最新的文件,默认为 True
+        """
+        if keyword:
+            result_dir = os.path.join(self.results_base_dir, self.PLATFORM, keyword)
+            if os.path.exists(result_dir):
+                files = [f for f in os.listdir(result_dir) if f.endswith('.json')]
+                if files:
+                    files.sort(reverse=True)
+                    # 保留最新的文件
+                    files_to_delete = files[1:] if keep_latest else files
+                    for f in files_to_delete:
+                        filepath = os.path.join(result_dir, f)
+                        os.remove(filepath)
+                        print(f"已删除缓存文件: {filepath}")
+        else:
+            platform_dir = os.path.join(self.results_base_dir, self.PLATFORM)
+            if os.path.exists(platform_dir):
+                for keyword_dir in os.listdir(platform_dir):
+                    keyword_path = os.path.join(platform_dir, keyword_dir)
+                    if os.path.isdir(keyword_path):
+                        files = [f for f in os.listdir(keyword_path) if f.endswith('.json')]
+                        if files:
+                            files.sort(reverse=True)
+                            files_to_delete = files[1:] if keep_latest else files
+                            for f in files_to_delete:
+                                filepath = os.path.join(keyword_path, f)
+                                os.remove(filepath)
+                                print(f"已删除缓存文件: {filepath}")
+
+    def get_cache_info(self, keyword: Optional[str] = None) -> Dict[str, Any]:
+        """
+        获取缓存信息
+
+        Args:
+            keyword: 要查询的关键词,如果为 None 则返回所有缓存信息
+
+        Returns:
+            缓存信息字典
+        """
+        info = {
+            "memory_cache": {},
+            "file_cache": {}
+        }
+
+        current_time = time.time()
+
+        # 内存缓存信息
+        if keyword:
+            if keyword in self._memory_cache:
+                data, timestamp = self._memory_cache[keyword]
+                info["memory_cache"][keyword] = {
+                    "count": len(data) if isinstance(data, list) else 1,
+                    "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
+                    "age_seconds": int(current_time - timestamp),
+                    "is_expired": current_time - timestamp >= self.cache_ttl
+                }
+        else:
+            for kw, (data, timestamp) in self._memory_cache.items():
+                info["memory_cache"][kw] = {
+                    "count": len(data) if isinstance(data, list) else 1,
+                    "timestamp": datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
+                    "age_seconds": int(current_time - timestamp),
+                    "is_expired": current_time - timestamp >= self.cache_ttl
+                }
+
+        # 文件缓存信息
+        platform_dir = os.path.join(self.results_base_dir, self.PLATFORM)
+        if os.path.exists(platform_dir):
+            keywords = [keyword] if keyword else os.listdir(platform_dir)
+            for kw in keywords:
+                keyword_path = os.path.join(platform_dir, kw)
+                if os.path.isdir(keyword_path):
+                    files = [f for f in os.listdir(keyword_path) if f.endswith('.json')]
+                    if files:
+                        files.sort(reverse=True)
+                        latest_file = os.path.join(keyword_path, files[0])
+                        file_mtime = os.path.getmtime(latest_file)
+                        info["file_cache"][kw] = {
+                            "file_count": len(files),
+                            "latest_file": files[0],
+                            "timestamp": datetime.fromtimestamp(file_mtime).strftime("%Y-%m-%d %H:%M:%S"),
+                            "age_seconds": int(current_time - file_mtime),
+                            "is_expired": current_time - file_mtime >= self.cache_ttl
+                        }
+
+        return info
+
     def save_result(self, keyword: str, result: Dict[str, Any]) -> str:
         """
         保存结果到文件

+ 1016 - 0
sug_v5_0_with_eval_v2_zou_yx.py

@@ -0,0 +1,1016 @@
+import asyncio
+import json
+import os
+import argparse
+from datetime import datetime
+
+from agents import Agent, Runner
+from lib.my_trace import set_trace
+from typing import Literal
+from pydantic import BaseModel, Field
+
+from lib.utils import read_file_as_string
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
+
+
+class RunContext(BaseModel):
+    version: str = Field(..., description="当前运行的脚本版本(文件名)")
+    input_files: dict[str, str] = Field(..., description="输入文件路径映射")
+    q_with_context: str
+    q_context: str
+    q: str
+    log_url: str
+    log_dir: str
+    question_annotation: str | None = Field(default=None, description="问题的标注结果")
+    operations_history: list[dict] = Field(default_factory=list, description="记录所有操作的历史")
+    optimization_result: dict | None = Field(default=None, description="最终优化结果对象")
+    final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)")
+
+
+# ============================================================================
+# Agent 1: 问题标注专家
+# ============================================================================
+question_annotation_instructions = """
+你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。
+
+## 判断标准
+
+**[本质]** - 问题的核心意图
+- 如何获取、教程、推荐、作品、测评等
+
+**[硬]** - 客观事实性约束(可明确验证、非主观判断)
+- 能明确区分类别的:地域、时间、对象、工具、操作类型
+- 特征:改变后得到完全不同类别的结果
+
+**[软]** - 主观判断性修饰(因人而异、程度性的)
+- 需要主观评价的:质量、速度、美观、特色、程度
+- 特征:改变后仍是同类结果,只是满足程度不同
+
+## 输出格式
+
+词语[本质-描述]、词语[硬-描述]、词语[软-描述]
+
+## 注意
+- 只输出标注后的字符串
+- 结合需求背景判断意图
+""".strip()
+
+question_annotator = Agent[None](
+    name="问题标注专家",
+    instructions=question_annotation_instructions,
+)
+
+
+# ============================================================================
+# Agent 2: 评估专家
+# ============================================================================
+eval_instructions = """
+你是专业的语言专家和语义相关性评判专家。你的任务是判断平台sug词条与原始query问题的相关度满足度。
+
+## 评估目标
+
+用这个推荐query搜索,能否找到满足原始需求的内容?
+
+## 三层判定流程(一次性返回所有层级的评分)
+
+### 第一层:知识识别(knowledge_recognition = 0 或 1)
+
+**什么是知识?**
+在社交媒体创作场景下,知识是指:可应用的认知内容 + 实践指导 + 问题解决方案
+
+包含三个核心要素:
+- 陈述性知识(Know-What): 是什么、有哪些、包括什么
+- 程序性知识(Know-How): 如何做、怎么实现、步骤方法
+- 策略性知识(Know-Why): 为什么、原理机制、优化策略
+
+**判定方法(三步判定法):**
+
+Step 1: 意图识别
+- 原始需求是想【知道/学会/获得】某样东西吗?→ yes 进入step2
+
+Step 2: 动词解析
+- 提取核心动词:
+  - 认知类(了解、学习、理解)
+  - 操作类(制作、拍摄、剪辑、运营)
+  - 获取类(找、下载、获取、收集)
+  - 决策类(选择、对比、评估)
+- 有明确动词 → 是知识需求
+- 无明确动词但有隐含目的 → 提取隐含动词
+- 完全无动作意图 → 非知识需求
+
+Step 3: 目标验证
+- 这个query解决后,用户会获得新的认知或能力吗? → YES则是知识
+- 这个query的答案可以被学习和应用吗? → YES则是知识
+- 这个query在寻求某个问题的解决方案吗? → YES则是知识
+
+**输出:**
+- knowledge_recognition: 1=是知识需求,0=非知识需求
+- knowledge_recognition_reason: 判定依据(包含意图识别、动词提取、目标验证的关键发现)
+
+**重要:即使knowledge_recognition=0,也要继续计算后两层得分(便于分析)**
+
+---
+
+### 第二层:知识动机判定(motivation_score = 0-1分值)
+
+**目标:** 完全理解原始query的综合意图,识别主要需求和次要需求,并进行加权评估。
+
+**评估维度:**
+
+#### 维度1: 核心意图动词识别(权重50%)
+- 显性动词直接提取:如"如何获取素材" → 核心动作="获取"
+- 隐性动词语义推理:如"川西秋天风光摄影" → 隐含动作="拍摄"或"欣赏"
+- 动作层级区分:主动作 vs 子动作
+
+**评分规则:**
+- 核心动作完全一致 → 1.0
+- 核心动作语义相近(近义词) → 0.8-0.9
+- 核心动作有包含关系(主次关系) → 0.5-0.7
+- 核心动作完全不同 → 0-0.4
+
+#### 维度2: 目标对象识别(权重30%)
+- 主要对象(What):如"获取川西秋季风光摄影素材" → 主要对象="风光摄影素材"
+- 对象的限定词:地域限定("川西")、时间限定("秋季")、质量限定("高质量")
+
+**评分规则:**
+- 主要对象+核心限定词完全匹配 → 1.0
+- 主要对象匹配,限定词部分匹配 → 0.7-0.9
+- 主要对象匹配,限定词缺失/不符 → 0.4-0.6
+- 主要对象不匹配 → 0-0.3
+
+#### 维度3: 终极目的识别(权重20%)
+**评分规则:**
+- 目的完全一致 → 1.0
+- 目的相关但路径不同 → 0.6-0.7
+- 目的不相关 → 0-0.5
+
+**综合计算公式:**
+```
+motivation_score = 核心意图动词×0.5 + 目标对象×0.3 + 终极目的×0.2
+```
+
+**输出:**
+- motivation_score: 0-1分值(>=0.9才算通过)
+- motivation_breakdown: {"核心意图动词": 0.x, "目标对象": 0.x, "终极目的": 0.x}
+
+**阈值判定:**
+- >=0.9:意图高度匹配
+- <0.9:意图不匹配,建议重新生成sug词或调整query词
+
+**注意:评估标准需要严格,对所有用例保持一致的标准**
+
+---
+
+### 第三层:相关性判定(relevance_score = 0-1分值)
+
+**目标:** 基于第二层的综合意图,评估sug词条对原始query的满足程度。
+
+**评分标准体系:**
+
+#### 高度满足(0.9-1.0)
+- 核心动作:完全一致或为标准近义词
+- 目标对象:主体+关键限定词全部匹配
+- 使用场景:完全相同或高度兼容
+- 终极目的:完全一致
+- 判定方法:逐一核对,所有维度≥0.9;替换测试(把sug词替换原query,意思不变)
+
+#### 中高满足(0.7-0.89)
+- 核心动作:一致或相近,但可能更泛化/具体化
+- 目标对象:主体匹配,但1-2个限定词缺失/泛化
+- 使用场景:基本兼容,可能略有扩展或收窄
+- 终极目的:一致但实现路径略有差异
+- 判定方法:"有效信息保留率" ≥70%
+
+#### 中低满足(0.4-0.69)
+- 核心动作:存在明显差异,但主题相关
+- 目标对象:部分匹配,关键限定词缺失或错位
+- 使用场景:有关联但场景不同
+- 终极目的:相关但实现路径完全不同
+- 判定方法:只有主题词重叠,用户需要显著改变搜索策略
+
+#### 低度/不满足(0-0.39)
+- 核心动作:完全不同或对立
+- 目标对象:主体不同或无关联
+- 使用场景:场景冲突
+- 终极目的:完全不相关
+- 判定方法:除通用词外无有效重叠,sug词满足了完全不同的需求
+
+**维度计算公式:**
+```
+relevance_score = 核心动作×0.4 + 目标对象×0.3 + 使用场景×0.15 + 终极目的×0.15
+```
+
+**特殊情况处理:**
+
+1. 泛化与具体化
+   - 泛化(sug词更广):如果原query所有要素都在覆盖范围内 → 0.75-0.85
+   - 具体化(sug词更窄):如果sug词是原query的典型子场景 → 0.7-0.8
+
+2. 同义转换宽容度
+   - 允许:获取≈下载≈寻找≈收集;技巧≈方法≈教程≈攻略
+   - 不允许:获取素材≠制作素材;学习技巧≠查看案例
+
+3. 多意图处理
+   - 识别主次意图(通过语序、连接词判断)
+   - sug词至少满足主意图 → 中高满足
+   - sug词同时满足主次意图 → 高度满足
+   - sug词只满足次意图 → 降至中低满足
+
+**输出:**
+- relevance_score: 0-1分值(>=0.9为高度满足)
+- relevance_breakdown: {"核心动作": 0.x, "目标对象": 0.x, "使用场景": 0.x, "终极目的": 0.x}
+
+---
+
+## 最终输出要求
+
+一次性返回所有三层的评估结果:
+1. knowledge_recognition (0或1) + knowledge_recognition_reason
+2. motivation_score (0-1) + motivation_breakdown
+3. relevance_score (0-1) + relevance_breakdown
+4. overall_reason: 综合评估理由(简要总结三层判定结果)
+
+**重要原则:**
+- 即使第一层knowledge_recognition=0,也要完整计算第二层和第三层
+- 即使第二层motivation_score<0.9,也要完整计算第三层
+- 所有维度的breakdown必须提供具体数值
+- 评估标准严格一致,不因用例不同而放松标准
+""".strip()
+
+class MotivationBreakdown(BaseModel):
+    """动机得分明细"""
+    核心意图动词: float = Field(..., description="核心意图动词得分,0-1")
+    目标对象: float = Field(..., description="目标对象得分,0-1")
+    终极目的: float = Field(..., description="终极目的得分,0-1")
+
+
+class RelevanceBreakdown(BaseModel):
+    """相关性得分明细"""
+    核心动作: float = Field(..., description="核心动作得分,0-1")
+    目标对象: float = Field(..., description="目标对象得分,0-1")
+    使用场景: float = Field(..., description="使用场景得分,0-1")
+    终极目的: float = Field(..., description="终极目的得分,0-1")
+
+
+class EvaluationFeedback(BaseModel):
+    """评估反馈模型 - 三层知识评估"""
+    # 第一层:知识识别
+    knowledge_recognition: Literal[0, 1] = Field(
+        ...,
+        description="是否为知识需求,1=是,0=否"
+    )
+    knowledge_recognition_reason: str = Field(
+        ...,
+        description="知识识别判定依据(意图识别、动词提取、目标验证)"
+    )
+
+    # 第二层:知识动机匹配
+    motivation_score: float = Field(
+        ...,
+        description="知识动机匹配度,0-1分值,>=0.9才通过"
+    )
+    motivation_breakdown: MotivationBreakdown = Field(
+        ...,
+        description="动机得分明细"
+    )
+
+    # 第三层:相关性评分
+    relevance_score: float = Field(
+        ...,
+        description="相关性得分,0-1分值,>=0.9为高度满足"
+    )
+    relevance_breakdown: RelevanceBreakdown = Field(
+        ...,
+        description="相关性得分明细"
+    )
+
+    overall_reason: str = Field(
+        ...,
+        description="综合评估理由"
+    )
+
+evaluator = Agent[None](
+    name="评估专家",
+    instructions=eval_instructions,
+    output_type=EvaluationFeedback,
+)
+
+
+# ============================================================================
+# Agent 3: 修改策略生成专家
+# ============================================================================
+strategy_instructions = """
+你是query修改策略专家。**模拟人在搜索引擎中的真实搜索行为**,基于反馈动态调整query。
+
+## 核心思路:搜索是探索过程,不是直达过程
+
+**关键认知:**
+1. **中间query不需要满足原始需求** - 它是探索工具,可以偏离原需求
+2. **推荐词是最宝贵的反馈信号** - 告诉你系统理解成什么了,有什么内容
+3. **每一步query都有明确的探索目的** - 不是盲目改词,而是试探和引导
+4. **最终目标:找到满足需求的推荐词** - 不是让query本身满足需求
+
+## 人的真实搜索过程
+
+**搜索的本质**:通过多步探索,利用推荐词作为桥梁,逐步引导系统
+
+**典型模式**:
+
+第1步:直接尝试
+- 目的:看系统能否直接理解
+- 结果:空列表或essence=0
+- essence=0的推荐词:告诉你系统理解成什么了
+
+第2步:降低要求,简化query
+- 目的:让系统有响应,看它在基础层面有什么
+- 推荐词虽然essence=0,但揭示了系统在某个主题有内容
+- **关键**:选一个最有潜力的推荐词
+
+第3步:基于推荐词,往目标方向引导
+- 目的:利用推荐词作为桥梁,加上目标方向的词
+- 推荐词还是essence=0,但主题在变化(接近目标)
+- **渐进式**:不求一步到位,每步都有进展
+
+第4步:继续引导或换角度
+- 如果推荐词主题不变 → 换角度
+- 如果推荐词主题在接近 → 继续引导
+
+最终:找到essence=1的推荐词
+
+**关键原则**:
+1. essence_score是评估推荐词的,不是评估中间query的
+2. essence=0的推荐词也有价值,它揭示了系统的理解方向
+3. 每一步都有明确的探索目的,看目的是否达成
+4. 通过推荐词的主题变化,判断是否在接近目标
+
+## 输入信息
+- 原始问题标注(三层):本质、硬性约束、软性修饰
+- 历史尝试记录:所有轮次的query、推荐词、评估结果
+- 当前query和推荐词评估
+
+## 分析步骤
+
+### 第一步:理解当前推荐词的信号
+**核心问题:推荐词告诉我什么信息?**
+
+**重要提醒:essence_score是评估推荐词是否满足原始需求的最终目标**
+- essence_score=1: 推荐词满足原需求的本质
+- essence_score=0: 推荐词不满足原需求的本质
+- **但中间query的目的可能不是满足原需求**,所以essence_score只是参考
+
+1. **系统理解层面**(看推荐词的主题):
+   - 空列表 → 系统完全不理解当前query
+   - 有推荐词 → 系统理解成了什么主题?
+     - 旅游?教程?素材?工具?品种介绍?
+     - 这些主题是否有助于往目标方向引导?
+
+2. **内容可用性层面**(看推荐词的价值):
+   - **即使推荐词essence=0,也可能是很好的探索起点**
+   - 例如:推荐词"川西旅游攻略"虽然essence=0,但揭示了系统认识"川西"
+   - 哪些推荐词最有潜力作为下一步的桥梁?
+
+3. **探索目的验证**:
+   - 当前query的探索目的是什么?达到了吗?
+   - 例如:目的是"看系统对川西有什么" → 达到了(有推荐词)
+   - 下一步要验证/探索什么?
+
+### 第二步:回顾历史,识别规律
+- 哪些query让系统理解方向变化了?(从"旅游"变成"摄影")
+- 哪些方向是死路?(多次essence=0且推荐词主题不变)
+- **是否有渐进的改善?**(推荐词越来越接近目标)
+
+### 第三步:选择策略类型和具体操作(带着明确的探索目的)
+
+**策略类型(strategy_type):**
+
+**refine_current(微调当前query)**
+- 适用:推荐词方向对了,需要微调让它更精确
+- 探索目的:在正确方向上精细化
+- 动作:加词/减词/换词/调整顺序
+
+**use_recommendation(选推荐词作为新起点)** ⭐ 最重要策略
+- 适用:推荐词虽然knowledge_recognition=0或relevance_score低,但**揭示了系统在这个方向有内容**
+- 探索目的:利用推荐词这个客观信号,引导系统往目标方向
+- **核心思维**:推荐词是系统给你的提示,告诉你"我有这个"
+- 动作:
+  - 选一个最有潜力的推荐词作为base_query
+  - 在它基础上加目标方向的词
+  - **这个新query可能不满足原需求,但目的是探索和引导**
+
+**change_approach(换完全不同的角度)**
+- 适用:当前方向是死路(多次尝试推荐词主题不变)
+- 探索目的:跳出当前框架,从另一个角度切入
+- 动作:换一种完全不同的表述方式
+
+**relax_constraints(放宽约束)**
+- 适用:query太复杂,系统不理解(返回空列表)
+- 探索目的:先让系统有响应,看它在最基础层面有什么
+- 动作:去掉限定词,保留核心概念
+
+---
+
+**具体操作类型(operation_type):**
+
+**增加**
+- **判定标准**:
+  * 推荐词的语义比当前query更具体,且词汇数量 > 当前query的词汇数量
+  * 推荐词包含当前query的核心关键词,且添加了能将query具体化的新词
+  * 推荐词中存在当前query中不存在的关键新词(如:特定平台名称、特定功能词、具体场景词)
+- **使用时机**:当推荐词整体趋势是添加更具体的限定词或场景描述时
+
+**删减**
+- **判定标准**(满足以下任一条件):
+  * 推荐词的词汇数量明显少于当前query
+  * 推荐词返回空列表(说明query过于复杂)
+  * 推荐词词汇数量少于query,但核心搜索意图一致
+  * 识别出query中1-2个词为"非核心冗余词",删除后不影响核心意图
+- **使用时机**:当原query过于冗长复杂,推荐词都倾向于使用更简洁的表达时
+
+**调序**
+- **判定标准**:
+  * 推荐词与query拥有完全相同或极为相似的核心关键词集合,只是词汇排列顺序不同
+  * 推荐词的词序更自然或更符合用户搜索习惯
+  * 多个推荐词都倾向于使用与query不同但意思一致的词序
+- **使用时机**:当推荐词与原query关键词相同但顺序不同,且新顺序更符合用户习惯时
+
+**替换**
+- **判定标准**:
+  * 推荐词与query仅有1-2个核心词不同,其他词均相同
+  * 差异词是同义词、近义词或在该领域更流行/专业的替代词
+  * 推荐词中的新词在推荐列表中出现频率更高
+- **使用时机**:当推荐词显示某个词的同义词或专业术语更受欢迎时
+
+## 输出要求
+
+### 1. reasoning(推理过程)
+必须包含三部分,**重点写探索目的**:
+
+- **当前推荐词信号分析**:
+  - 系统理解成什么主题了?(旅游?教程?素材?工具?品种?)
+  - 推荐词揭示了什么信息?(系统在哪个方向有内容)
+  - **不要只看knowledge_recognition和relevance_score**:
+    - knowledge_recognition=0或relevance_score低不代表推荐词没用
+    - 关键看推荐词的主题是否有助于引导
+  - 哪个推荐词最有潜力作为下一步的桥梁?
+
+- **历史尝试与趋势**:
+  - 系统理解的主题变化:从"品种介绍"→"旅游"→"摄影"
+  - 是否在逐步接近目标?还是原地打转?
+
+- **下一步策略与探索目的**:
+  - **这一步query的探索目的是什么?**
+    - 验证系统对某个词的理解?
+    - 往某个方向引导?
+    - 利用推荐词作为桥梁?
+  - 为什么选这个base_query?
+  - 为什么选这个operation_type?
+  - 为什么这样修改?
+  - **重要**:不要纠结"这个query不满足原需求",关键是它能否达成探索目的
+
+### 2. strategy_type
+从4种策略中选择:refine_current, use_recommendation, change_approach, relax_constraints
+
+### 3. operation_type
+从4种操作中选择:增加、删减、调序、替换
+- **必须基于推荐词的特征选择**:看推荐词与当前query的差异模式
+- 参考上述"具体操作类型"的判定标准
+
+### 4. base_query
+**关键**:可以选择历史中的query,也可以选择历史推荐词
+- 如果选历史query:base_query_source = "history_query"
+- 如果选历史推荐词:base_query_source = "history_recommendation"
+
+### 5. base_query_source
+说明base_query的来源
+
+### 6. modification_actions
+列出具体的修改动作,建议格式:
+- 每个动作以操作类型开头(增加/删减/调序/替换)
+- 清晰描述具体修改了什么内容
+
+### 7. new_query
+最终的新query
+
+## 重要原则
+
+1. **推荐词是最宝贵的反馈** - 充分利用推荐词这个客观信号
+   - 即使essence=0的推荐词,也揭示了系统在这个方向有什么
+   - **优先考虑use_recommendation策略** - 选一个推荐词作为起点
+
+2. **中间query可以偏离原需求** - 每一步都有明确的探索目的
+   - 不要纠结"这个query不满足原需求"
+   - 关键是:这个query能不能帮你往正确方向引导系统
+
+3. **识别死胡同,及时换方向**
+   - 如果多次尝试推荐词主题不变 → 换方向
+   - 如果推荐词越来越偏 → 回退到之前的某个好的起点
+
+4. **保持推理简洁** - 抓住关键信息
+   - 明确说出探索目的
+   - 不要重复啰嗦
+""".strip()
+
+class ModificationStrategy(BaseModel):
+    """修改策略模型 - 模拟人的搜索调整过程"""
+    reasoning: str = Field(..., description="推理过程:1)当前推荐词分析:系统理解成什么了?2)历史尝试总结:哪些方向有效/无效?3)下一步策略:为什么这样调整?")
+
+    strategy_type: Literal[
+        "refine_current",      # 微调当前query(加词/减词/换词/换顺序)
+        "use_recommendation",  # 选择推荐词作为新起点,在它基础上修改
+        "change_approach",     # 换完全不同的表述角度
+        "relax_constraints"    # 放宽约束,去掉部分限定词
+    ] = Field(..., description="策略类型")
+
+    operation_type: Literal[
+        "增加",  # 增加query语句,添加具体化的关键词
+        "删减",  # 删减query词,去除冗余词汇
+        "调序",  # 调整query词的顺序,更符合用户习惯的词序
+        "替换",  # 替换query中的某个词,使用同义词或专业术语
+    ] = Field(..., description="具体操作类型:增加、删减、调序、替换")
+
+    base_query: str = Field(..., description="基础query,可以是:1)历史中的query 2)历史推荐词中的某一个")
+    base_query_source: Literal["history_query", "history_recommendation"] = Field(..., description="base_query的来源")
+
+    modification_actions: list[str] = Field(..., description="具体修改动作的描述,如:['去掉\"如何获取\"', '保留核心词\"川西秋季\"', '把\"素材\"改为\"图片\"']")
+
+    new_query: str = Field(..., description="修改后的新query")
+
+strategy_generator = Agent[None](
+    name="策略生成专家",
+    instructions=strategy_instructions,
+    output_type=ModificationStrategy,
+)
+
+
+# ============================================================================
+# 核心函数
+# ============================================================================
+
+async def annotate_question(q_with_context: str) -> str:
+    """标注问题(三层)"""
+    print("\n正在标注问题...")
+    result = await Runner.run(question_annotator, q_with_context)
+    annotation = str(result.final_output)
+    print(f"问题标注完成:{annotation}")
+    return annotation
+
+
+async def get_suggestions_with_eval(query: str, annotation: str, context: RunContext) -> list[dict]:
+    """获取推荐词并评估"""
+    print(f"\n正在获取推荐词:{query}")
+
+    # 1. 调用小红书API
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+    query_suggestions = xiaohongshu_api.get_recommendations(keyword=query)
+    print(f"获取到 {len(query_suggestions) if query_suggestions else 0} 个推荐词:{query_suggestions}")
+
+    if not query_suggestions:
+        # 记录到历史
+        context.operations_history.append({
+            "operation_type": "get_query_suggestions",
+            "timestamp": datetime.now().isoformat(),
+            "query": query,
+            "suggestions": [],
+            "evaluations": "未返回任何推荐词",
+        })
+        return []
+
+    # 2. 并发评估所有推荐词
+    async def evaluate_single_query(q_sug: str):
+        eval_input = f"""
+<需求上下文>
+{context.q_context}
+</需求上下文>
+
+<原始query问题>
+{context.q}
+</原始query问题>
+
+<平台sug词条>
+{q_sug}
+</平台sug词条>
+
+请对该sug词条进行三层评估(一次性返回所有层级的评分):
+
+第一层:判断sug词条是否为知识需求(knowledge_recognition: 0或1)
+第二层:评估知识动机匹配度(motivation_score: 0-1,需>=0.9)
+第三层:评估相关性得分(relevance_score: 0-1,>=0.9为高度满足)
+
+重要:即使第一层=0或第二层<0.9,也要完整计算所有层级的得分。
+"""
+        evaluator_result = await Runner.run(evaluator, eval_input)
+        result: EvaluationFeedback = evaluator_result.final_output
+        return {
+            "query": q_sug,
+            "knowledge_recognition": result.knowledge_recognition,
+            "knowledge_recognition_reason": result.knowledge_recognition_reason,
+            "motivation_score": result.motivation_score,
+            "motivation_breakdown": result.motivation_breakdown.model_dump(),
+            "relevance_score": result.relevance_score,
+            "relevance_breakdown": result.relevance_breakdown.model_dump(),
+            "overall_reason": result.overall_reason,
+        }
+
+    evaluations = await asyncio.gather(*[evaluate_single_query(q_sug) for q_sug in query_suggestions])
+
+    # 3. 记录到历史
+    context.operations_history.append({
+        "operation_type": "get_query_suggestions",
+        "timestamp": datetime.now().isoformat(),
+        "query": query,
+        "suggestions": query_suggestions,
+        "evaluations": evaluations,
+    })
+
+    return evaluations
+
+
+async def generate_modification_strategy(
+    current_query: str,
+    evaluations: list[dict],
+    annotation: str,
+    context: RunContext
+) -> ModificationStrategy:
+    """生成修改策略"""
+    print("\n正在生成修改策略...")
+
+    # 整理历史尝试记录 - 完整保留推荐词和评估结果
+    history_records = []
+    round_num = 0
+
+    for op in context.operations_history:
+        if op["operation_type"] == "get_query_suggestions":
+            round_num += 1
+            record = {
+                "round": round_num,
+                "query": op["query"],
+                "suggestions": op["suggestions"],
+                "evaluations": op["evaluations"]
+            }
+            history_records.append(record)
+        elif op["operation_type"] == "modify_query":
+            # 修改操作也记录,但不增加轮数
+            history_records.append({
+                "operation": "modify_query",
+                "strategy_type": op.get("strategy_type", op.get("modification_type")),  # 兼容旧字段
+                "operation_type_detail": op.get("operation_type_detail"),  # 新增:具体操作类型(增加、删减、调序、替换)
+                "base_query": op.get("base_query"),
+                "base_query_source": op.get("base_query_source"),
+                "modification_actions": op.get("modification_actions", []),
+                "original_query": op["original_query"],
+                "new_query": op["new_query"],
+                "reasoning": op["reasoning"]
+            })
+
+    # 格式化历史记录为JSON
+    history_json = json.dumps(history_records, ensure_ascii=False, indent=2)
+
+    strategy_input = f"""
+<原始问题标注(三层)>
+{annotation}
+</原始问题标注(三层)>
+
+<历史尝试记录(完整)>
+{history_json}
+</历史尝试记录(完整)>
+
+<当前query>
+{current_query}
+</当前query>
+
+<当前轮推荐词评估结果>
+{json.dumps(evaluations, ensure_ascii=False, indent=2) if evaluations else "空列表"}
+</当前轮推荐词评估结果>
+
+请基于所有历史尝试和当前评估结果,生成下一步的query修改策略。
+
+重点分析:
+
+1. **当前推荐词的信号**:
+   - 系统理解成什么主题了?(旅游?教程?素材?工具?品种?)
+   - 推荐词揭示了什么信息?系统在哪个方向有内容?
+   - **不要只看essence_score**:essence=0的推荐词也可能是好的探索起点
+   - 哪个推荐词最有潜力作为下一步的桥梁?
+
+2. **历史趋势分析**:
+   - 推荐词的主题变化:从"品种介绍"→"旅游"→"摄影"?
+   - 是否在逐步接近目标?还是原地打转(主题不变)?
+   - 哪些query让系统理解方向改变了?
+
+3. **确定探索目的**:
+   - 下一步query的探索目的是什么?
+     * 验证系统对某个词的理解?
+     * 往某个方向引导系统?
+     * 利用推荐词作为桥梁?
+   - **记住**:中间query不需要满足原需求,关键是达成探索目的
+"""
+    result = await Runner.run(strategy_generator, strategy_input)
+    strategy: ModificationStrategy = result.final_output
+    return strategy
+
+
+def find_qualified_queries(evaluations: list[dict]) -> dict:
+    """分级查找合格query
+
+    Returns:
+        {
+            "highly_qualified": [...],      # 高度满足:knowledge=1, motivation>=0.9, relevance>=0.9
+            "moderately_qualified": [...],  # 中高满足:knowledge=1, motivation>=0.9, relevance>=0.7
+            "lower_qualified": [...],       # 中低满足:knowledge=1, motivation>=0.9, relevance>=0.4
+        }
+    """
+    # 高度满足:knowledge=1, motivation>=0.9, relevance>=0.9
+    highly_qualified = [
+        e for e in evaluations
+        if e['knowledge_recognition'] == 1
+        and e['motivation_score'] >= 0.9
+        and e['relevance_score'] >= 0.9
+    ]
+
+    # 中高满足:knowledge=1, motivation>=0.9, 0.7<=relevance<0.9
+    moderately_qualified = [
+        e for e in evaluations
+        if e['knowledge_recognition'] == 1
+        and e['motivation_score'] >= 0.9
+        and 0.7 <= e['relevance_score'] < 0.9
+    ]
+
+    # 中低满足:knowledge=1, motivation>=0.9, 0.4<=relevance<0.7
+    lower_qualified = [
+        e for e in evaluations
+        if e['knowledge_recognition'] == 1
+        and e['motivation_score'] >= 0.9
+        and 0.4 <= e['relevance_score'] < 0.7
+    ]
+
+    return {
+        "highly_qualified": sorted(highly_qualified, key=lambda x: x['relevance_score'], reverse=True),
+        "moderately_qualified": sorted(moderately_qualified, key=lambda x: x['relevance_score'], reverse=True),
+        "lower_qualified": sorted(lower_qualified, key=lambda x: x['relevance_score'], reverse=True),
+    }
+
+
+# ============================================================================
+# 主流程(代码控制)
+# ============================================================================
+
+async def optimize_query(context: RunContext, max_rounds: int = 20) -> dict:
+    """
+    主优化流程 - 由代码控制
+
+    Args:
+        context: 运行上下文
+        max_rounds: 最大迭代轮数,默认20
+
+    返回格式:
+    {
+        "success": True/False,
+        "result": {...} or None,
+        "message": "..."
+    }
+    """
+    # 1. 标注问题(仅一次)
+    annotation = await annotate_question(context.q_with_context)
+    context.question_annotation = annotation
+
+    # 2. 迭代优化
+    current_query = context.q
+
+    for round_num in range(1, max_rounds + 1):
+        print(f"\n{'='*60}")
+        print(f"第 {round_num} 轮:{'使用原始问题' if round_num == 1 else '使用修改后的query'}")
+        print(f"当前query: {current_query}")
+        print(f"{'='*60}")
+
+        # 获取推荐词并评估
+        evaluations = await get_suggestions_with_eval(current_query, annotation, context)
+
+        if evaluations:
+            # 检查是否找到合格query(分级筛选)
+            qualified = find_qualified_queries(evaluations)
+
+            # 优先返回高度满足的query
+            if qualified["highly_qualified"]:
+                return {
+                    "success": True,
+                    "level": "highly_qualified",
+                    "results": qualified["highly_qualified"],
+                    "message": f"第{round_num}轮找到{len(qualified['highly_qualified'])}个高度满足的query(知识=1, 动机≥0.9, 相关性≥0.9)"
+                }
+
+            # 其次返回中高满足的query
+            if qualified["moderately_qualified"]:
+                return {
+                    "success": True,
+                    "level": "moderately_qualified",
+                    "results": qualified["moderately_qualified"],
+                    "message": f"第{round_num}轮找到{len(qualified['moderately_qualified'])}个中高满足的query(知识=1, 动机≥0.9, 相关性≥0.7)"
+                }
+
+        # 如果是最后一轮,不再生成策略
+        if round_num == max_rounds:
+            break
+
+        # 生成修改策略
+        print(f"\n--- 生成修改策略 ---")
+        strategy = await generate_modification_strategy(current_query, evaluations, annotation, context)
+
+        print(f"\n修改策略:")
+        print(f"  推理过程:{strategy.reasoning}")
+        print(f"  策略类型:{strategy.strategy_type}")
+        print(f"  操作类型:{strategy.operation_type}")
+        print(f"  基础query:{strategy.base_query} (来源: {strategy.base_query_source})")
+        print(f"  修改动作:{', '.join(strategy.modification_actions)}")
+        print(f"  新query:{strategy.new_query}")
+
+        # 记录修改
+        context.operations_history.append({
+            "operation_type": "modify_query",
+            "timestamp": datetime.now().isoformat(),
+            "reasoning": strategy.reasoning,
+            "strategy_type": strategy.strategy_type,
+            "operation_type_detail": strategy.operation_type,
+            "base_query": strategy.base_query,
+            "base_query_source": strategy.base_query_source,
+            "modification_actions": strategy.modification_actions,
+            "original_query": current_query,
+            "new_query": strategy.new_query,
+        })
+
+        # 更新当前query
+        current_query = strategy.new_query
+
+    # 所有轮次后仍未找到高度/中高满足的,降低标准查找
+    print(f"\n{'='*60}")
+    print(f"{max_rounds}轮后未找到高度/中高满足的query,降低标准(相关性 >= 0.4)")
+    print(f"{'='*60}")
+
+    qualified = find_qualified_queries(evaluations)
+    if qualified["lower_qualified"]:
+        return {
+            "success": True,
+            "level": "lower_qualified",
+            "results": qualified["lower_qualified"],
+            "message": f"{max_rounds}轮后找到{len(qualified['lower_qualified'])}个中低满足的query(知识=1, 动机≥0.9, 相关性≥0.4)"
+        }
+
+    # 完全失败:找出最接近的(只满足知识识别,但动机不够)
+    knowledge_ok = [
+        e for e in evaluations
+        if e['knowledge_recognition'] == 1
+    ]
+    if knowledge_ok:
+        # 返回所有满足knowledge的,按motivation_score降序
+        closest_queries = sorted(knowledge_ok, key=lambda x: x['motivation_score'], reverse=True)
+        return {
+            "success": False,
+            "level": "failed",
+            "results": closest_queries[:5],  # 只返回前5个
+            "message": f"未找到合格query,但有{len(closest_queries)}个是知识需求(knowledge=1,但motivation<0.9)"
+        }
+
+    return {
+        "success": False,
+        "level": "failed",
+        "results": [],
+        "message": "未找到任何知识类推荐词(所有推荐词的knowledge_recognition均为0)"
+    }
+
+
+# ============================================================================
+# 输出格式化
+# ============================================================================
+
+def format_output(optimization_result: dict, context: RunContext) -> str:
+    """格式化输出结果"""
+    results = optimization_result.get("results", [])
+    level = optimization_result.get("level", "")
+
+    # 满足程度映射
+    level_map = {
+        "highly_qualified": "高度满足 (90-100%)",
+        "moderately_qualified": "中高满足 (70-89%)",
+        "lower_qualified": "中低满足 (40-69%)",
+        "failed": "未通过"
+    }
+
+    if optimization_result["success"] and results:
+        output = f"原始问题:{context.q}\n"
+        output += f"满足程度:{level_map.get(level, '未知')}\n"
+        output += f"状态:{optimization_result['message']}\n\n"
+        output += "推荐query(按相关性降序):\n"
+        for i, result in enumerate(results, 1):
+            output += f"\n{i}. {result['query']}\n"
+            output += f"   - 知识识别:{'是' if result['knowledge_recognition'] == 1 else '否'} ({result['knowledge_recognition_reason'][:50]}...)\n"
+            output += f"   - 动机匹配度:{result['motivation_score']:.2f} (≥0.9通过)\n"
+            output += f"     * 核心意图动词: {result['motivation_breakdown'].get('核心意图动词', 0):.2f}\n"
+            output += f"     * 目标对象: {result['motivation_breakdown'].get('目标对象', 0):.2f}\n"
+            output += f"     * 终极目的: {result['motivation_breakdown'].get('终极目的', 0):.2f}\n"
+            output += f"   - 相关性得分:{result['relevance_score']:.2f} (≥0.9高度满足)\n"
+            output += f"     * 核心动作: {result['relevance_breakdown'].get('核心动作', 0):.2f}\n"
+            output += f"     * 目标对象: {result['relevance_breakdown'].get('目标对象', 0):.2f}\n"
+            output += f"     * 使用场景: {result['relevance_breakdown'].get('使用场景', 0):.2f}\n"
+            output += f"     * 终极目的: {result['relevance_breakdown'].get('终极目的', 0):.2f}\n"
+            output += f"   - 综合评估:{result['overall_reason'][:100]}...\n"
+        return output.strip()
+    else:
+        output = f"原始问题:{context.q}\n"
+        output += f"结果:未找到合格推荐query\n"
+        output += f"满足程度:{level_map.get(level, '未知')}\n"
+        output += f"原因:{optimization_result['message']}\n"
+
+        if results:
+            output += "\n最接近的推荐词:\n"
+            for i, result in enumerate(results[:3], 1):  # 只显示前3个
+                output += f"\n{i}. {result['query']}\n"
+                output += f"   - 知识识别:{'是' if result['knowledge_recognition'] == 1 else '否'}\n"
+                output += f"   - 动机匹配度:{result['motivation_score']:.2f}\n"
+                output += f"   - 相关性得分:{result['relevance_score']:.2f}\n"
+                output += f"   - 综合评估:{result['overall_reason'][:100]}...\n"
+
+        output += "\n建议:尝试简化问题或调整需求描述"
+        return output.strip()
+
+
+# ============================================================================
+# 主函数
+# ============================================================================
+
+async def main(input_dir: str, max_rounds: int = 20):
+    current_time, log_url = set_trace()
+
+    # 从目录中读取固定文件名
+    input_context_file = os.path.join(input_dir, 'context.md')
+    input_q_file = os.path.join(input_dir, 'q.md')
+
+    q_context = read_file_as_string(input_context_file)
+    q = read_file_as_string(input_q_file)
+    q_with_context = f"""
+<需求上下文>
+{q_context}
+</需求上下文>
+<当前问题>
+{q}
+</当前问题>
+""".strip()
+
+    # 获取当前文件名作为版本
+    version = os.path.basename(__file__)
+    version_name = os.path.splitext(version)[0]
+
+    # 日志保存目录
+    log_dir = os.path.join(input_dir, "output", version_name, current_time)
+
+    run_context = RunContext(
+        version=version,
+        input_files={
+            "input_dir": input_dir,
+            "context_file": input_context_file,
+            "q_file": input_q_file,
+        },
+        q_with_context=q_with_context,
+        q_context=q_context,
+        q=q,
+        log_dir=log_dir,
+        log_url=log_url,
+    )
+
+    # 执行优化流程(代码控制)
+    optimization_result = await optimize_query(run_context, max_rounds=max_rounds)
+
+    # 格式化输出
+    final_output = format_output(optimization_result, run_context)
+    print(f"\n{'='*60}")
+    print("最终结果")
+    print(f"{'='*60}")
+    print(final_output)
+
+    # 保存结果
+    run_context.optimization_result = optimization_result
+    run_context.final_output = final_output
+
+    # 保存 RunContext 到 log_dir
+    os.makedirs(run_context.log_dir, exist_ok=True)
+    context_file_path = os.path.join(run_context.log_dir, "run_context.json")
+    with open(context_file_path, "w", encoding="utf-8") as f:
+        json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
+    print(f"\nRunContext saved to: {context_file_path}")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="搜索query优化工具")
+    parser.add_argument(
+        "--input-dir",
+        type=str,
+        default="input/简单扣图",
+        help="输入目录路径,默认: input/简单扣图"
+    )
+    parser.add_argument(
+        "--max-rounds",
+        type=int,
+        default=20,
+        help="最大迭代轮数,默认: 20"
+    )
+    args = parser.parse_args()
+
+    asyncio.run(main(args.input_dir, max_rounds=args.max_rounds))

+ 11 - 6
sug_v6_0_progressive_exploration.py

@@ -141,7 +141,6 @@ c) **跨层级组合**
 
 ### 2. promising_signals
 列出最有价值的推荐词(来自任何已探索的query),每个说明为什么有价值
-格式:[{"query": "...", "from_level": 1, "reason": "..."}]
 
 ### 3. should_evaluate_now
 是否已经可以开始评估候选了?true/false
@@ -165,10 +164,16 @@ c) **跨层级组合**
 4. **识别死胡同**:如果某个方向的推荐词一直不相关,果断放弃
 """.strip()
 
+class PromisingSignal(BaseModel):
+    """有价值的推荐词信号"""
+    query: str = Field(..., description="推荐词")
+    from_level: int = Field(..., description="来自哪一层")
+    reason: str = Field(..., description="为什么有价值")
+
 class LevelAnalysis(BaseModel):
     """层级分析结果"""
     key_findings: str = Field(..., description="当前层的关键发现")
-    promising_signals: list[dict] = Field(..., description="有价值的推荐词信号,格式:[{\"query\": \"...\", \"from_level\": 1, \"reason\": \"...\"}]")
+    promising_signals: list[PromisingSignal] = Field(..., description="有价值的推荐词信号")
     should_evaluate_now: bool = Field(..., description="是否应该开始评估候选")
     candidates_to_evaluate: list[str] = Field(default_factory=list, description="如果should_evaluate_now=true,要评估的候选query列表")
     next_combinations: list[str] = Field(default_factory=list, description="如果should_evaluate_now=false,下一层要探索的query组合")
@@ -250,10 +255,10 @@ evaluator = Agent[None](
 # 核心函数
 # ============================================================================
 
-async def extract_keywords(q_with_context: str) -> KeywordList:
+async def extract_keywords(q: str) -> KeywordList:
     """提取关键词"""
     print("\n正在提取关键词...")
-    result = await Runner.run(keyword_extractor, q_with_context)
+    result = await Runner.run(keyword_extractor, q)
     keyword_list: KeywordList = result.final_output
     print(f"提取的关键词:{keyword_list.keywords}")
     print(f"提取理由:{keyword_list.reasoning}")
@@ -436,8 +441,8 @@ async def progressive_exploration(context: RunContext, max_levels: int = 4) -> d
     }
     """
 
-    # 阶段1:提取关键词
-    keyword_result = await extract_keywords(context.q_with_context)
+    # 阶段1:提取关键词(从原始问题提取)
+    keyword_result = await extract_keywords(context.q)
     context.keywords = keyword_result.keywords
 
     # 阶段2:渐进式探索

+ 651 - 0
sug_v6_1_intent_relevance.py

@@ -0,0 +1,651 @@
+import asyncio
+import json
+import os
+import argparse
+from datetime import datetime
+
+from agents import Agent, Runner
+from lib.my_trace import set_trace
+from typing import Literal
+from pydantic import BaseModel, Field
+
+from lib.utils import read_file_as_string
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
+
+
+class RunContext(BaseModel):
+    version: str = Field(..., description="当前运行的脚本版本(文件名)")
+    input_files: dict[str, str] = Field(..., description="输入文件路径映射")
+    q_with_context: str
+    q_context: str
+    q: str
+    log_url: str
+    log_dir: str
+
+    # 探索阶段记录
+    keywords: list[str] | None = Field(default=None, description="提取的关键词")
+    exploration_levels: list[dict] = Field(default_factory=list, description="每一层的探索结果")
+    level_analyses: list[dict] = Field(default_factory=list, description="每一层的主Agent分析")
+
+    # 最终结果
+    final_candidates: list[str] | None = Field(default=None, description="最终选出的候选query")
+    evaluation_results: list[dict] | None = Field(default=None, description="候选query的评估结果")
+    optimization_result: dict | None = Field(default=None, description="最终优化结果对象")
+    final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)")
+
+
+# ============================================================================
+# Agent 1: 关键词提取专家
+# ============================================================================
+keyword_extraction_instructions = """
+你是关键词提取专家。给定一个搜索问题(含上下文),提取出**最细粒度的关键概念**。
+
+## 提取原则
+
+1. **细粒度优先**:拆分成最小的有意义单元
+   - 不要保留完整的长句
+   - 拆分成独立的、有搜索意义的词或短语
+
+2. **保留核心维度**:
+   - 地域/对象
+   - 时间
+   - 行为/意图:获取、教程、推荐、如何等
+   - 主题/领域
+   - 质量/属性
+
+3. **去掉无意义的虚词**:的、吗、呢等
+
+4. **保留领域专有词**:不要过度拆分专业术语
+   - 如果是常见的组合词,保持完整
+
+## 输出要求
+
+输出关键词列表,按重要性排序(最核心的在前)。
+""".strip()
+
+class KeywordList(BaseModel):
+    """关键词列表"""
+    keywords: list[str] = Field(..., description="提取的关键词,按重要性排序")
+    reasoning: str = Field(..., description="提取理由")
+
+keyword_extractor = Agent[None](
+    name="关键词提取专家",
+    instructions=keyword_extraction_instructions,
+    output_type=KeywordList,
+)
+
+
+# ============================================================================
+# Agent 2: 层级探索分析专家
+# ============================================================================
+level_analysis_instructions = """
+你是搜索空间探索分析专家。基于当前层级的探索结果,决定下一步行动。
+
+## 你的任务
+
+分析当前已探索的词汇空间,判断:
+1. **发现了什么有价值的信号?**
+2. **是否已经可以评估候选了?**
+3. **如果还不够,下一层应该探索什么组合?**
+
+## 分析维度
+
+### 1. 信号识别(最重要)
+
+看推荐词里**出现了什么主题**:
+
+**关键问题:**
+- 哪些推荐词**最接近原始需求**?
+- 哪些推荐词**揭示了有价值的方向**(即使不完全匹配)?
+- 哪些推荐词可以作为**下一层探索的桥梁**?
+- 系统对哪些概念理解得好?哪些理解偏了?
+
+### 2. 组合策略
+
+基于发现的信号,设计下一层探索:
+
+**组合类型:**
+
+a) **关键词直接组合**
+   - 两个关键词组合成新query
+
+b) **利用推荐词作为桥梁**(重要!)
+   - 发现某个推荐词很有价值 → 直接探索这个推荐词
+   - 或在推荐词基础上加其他关键词
+
+c) **跨层级组合**
+   - 结合多层发现的有价值推荐词
+   - 组合成更复杂的query
+
+### 3. 停止条件
+
+**何时可以评估候选?**
+
+满足以下之一:
+- 推荐词中出现了**明确包含原始需求多个核心要素的query**
+- 已经探索到**足够复杂的组合**(3-4个关键词),且推荐词相关
+- 探索了**3-4层**,信息已经足够丰富
+
+**何时继续探索?**
+- 当前推荐词太泛,没有接近原始需求
+- 发现了有价值的信号,但需要进一步组合验证
+- 层数还少(< 3层)
+
+## 输出要求
+
+### 1. key_findings
+总结当前层发现的关键信息,包括:
+- 哪些推荐词最有价值?
+- 系统对哪些概念理解得好/不好?
+- 发现了什么意外的方向?
+
+### 2. promising_signals
+列出最有价值的推荐词(来自任何已探索的query),每个说明为什么有价值
+
+### 3. should_evaluate_now
+是否已经可以开始评估候选了?true/false
+
+### 4. candidates_to_evaluate
+如果should_evaluate_now=true,列出应该评估的候选query
+- 可以是推荐词
+- 可以是自己构造的组合
+
+### 5. next_combinations
+如果should_evaluate_now=false,列出下一层应该探索的query组合
+
+### 6. reasoning
+详细的推理过程
+
+## 重要原则
+
+1. **不要过早评估**:至少探索2层,除非第一层就发现了完美匹配
+2. **充分利用推荐词**:推荐词是系统给的提示,要善用
+3. **保持探索方向的多样性**:不要只盯着一个方向
+4. **识别死胡同**:如果某个方向的推荐词一直不相关,果断放弃
+""".strip()
+
+class PromisingSignal(BaseModel):
+    """有价值的推荐词信号"""
+    query: str = Field(..., description="推荐词")
+    from_level: int = Field(..., description="来自哪一层")
+    reason: str = Field(..., description="为什么有价值")
+
+class LevelAnalysis(BaseModel):
+    """层级分析结果"""
+    key_findings: str = Field(..., description="当前层的关键发现")
+    promising_signals: list[PromisingSignal] = Field(..., description="有价值的推荐词信号")
+    should_evaluate_now: bool = Field(..., description="是否应该开始评估候选")
+    candidates_to_evaluate: list[str] = Field(default_factory=list, description="如果should_evaluate_now=true,要评估的候选query列表")
+    next_combinations: list[str] = Field(default_factory=list, description="如果should_evaluate_now=false,下一层要探索的query组合")
+    reasoning: str = Field(..., description="详细的推理过程")
+
+level_analyzer = Agent[None](
+    name="层级探索分析专家",
+    instructions=level_analysis_instructions,
+    output_type=LevelAnalysis,
+)
+
+
+# ============================================================================
+# Agent 3: 评估专家(简化版:意图匹配 + 相关性评分)
+# ============================================================================
+eval_instructions = """
+你是搜索query评估专家。给定原始问题和推荐query,评估两个维度。
+
+## 评估目标
+
+用这个推荐query搜索,能否找到满足原始需求的内容?
+
+## 两层评分
+
+### 1. intent_match(意图匹配)= true/false
+
+推荐query的**使用意图**是否与原问题一致?
+
+**核心问题:用户搜索这个推荐词,想做什么?**
+
+**判断标准:**
+- 原问题意图:找方法?找教程?找资源/素材?找工具?看作品?
+- 推荐词意图:如果用户搜索这个词,他的目的是什么?
+
+**示例:**
+- 原问题意图="找素材"
+  - ✅ true: "素材下载"、"素材网站"、"免费素材"(都是获取素材)
+  - ❌ false: "素材制作教程"、"如何制作素材"(意图变成学习了)
+
+- 原问题意图="学教程"
+  - ✅ true: "教程视频"、"教学步骤"、"入门指南"
+  - ❌ false: "成品展示"、"作品欣赏"(意图变成看作品了)
+
+**评分:**
+- true = 意图一致,搜索推荐词能达到原问题的目的
+- false = 意图改变,搜索推荐词无法达到原问题的目的
+
+### 2. relevance_score(相关性)= 0-1 连续分数
+
+推荐query在**主题、要素、属性**上与原问题的相关程度?
+
+**评估维度:**
+- 主题相关:核心主题是否匹配?(如:摄影、旅游、美食)
+- 要素覆盖:关键要素保留了多少?(如:地域、时间、对象、工具)
+- 属性匹配:质量、风格、特色等属性是否保留?
+
+**评分参考:**
+- 0.9-1.0 = 几乎完美匹配,所有核心要素都保留
+- 0.7-0.8 = 高度相关,核心要素保留,少数次要要素缺失
+- 0.5-0.6 = 中度相关,主题匹配但多个要素缺失
+- 0.3-0.4 = 低度相关,只有部分主题相关
+- 0-0.2 = 基本不相关
+
+## 评估策略
+
+1. **先判断 intent_match**:意图不匹配直接 false,无论相关性多高
+2. **再评估 relevance_score**:在意图匹配的前提下,计算相关性
+
+## 输出要求
+
+- intent_match: true/false
+- relevance_score: 0-1 的浮点数
+- reason: 详细的评估理由,需要说明:
+  - 原问题的意图是什么
+  - 推荐词的意图是什么
+  - 为什么判断意图匹配/不匹配
+  - 相关性分数的依据(哪些要素保留/缺失)
+""".strip()
+
+class RelevanceEvaluation(BaseModel):
+    """评估反馈模型 - 意图匹配 + 相关性"""
+    intent_match: bool = Field(..., description="意图是否匹配")
+    relevance_score: float = Field(..., description="相关性分数 0-1,分数越高越相关")
+    reason: str = Field(..., description="评估理由,需说明意图判断和相关性依据")
+
+evaluator = Agent[None](
+    name="评估专家",
+    instructions=eval_instructions,
+    output_type=RelevanceEvaluation,
+)
+
+
+# ============================================================================
+# 核心函数
+# ============================================================================
+
+async def extract_keywords(q: str) -> KeywordList:
+    """提取关键词"""
+    print("\n正在提取关键词...")
+    result = await Runner.run(keyword_extractor, q)
+    keyword_list: KeywordList = result.final_output
+    print(f"提取的关键词:{keyword_list.keywords}")
+    print(f"提取理由:{keyword_list.reasoning}")
+    return keyword_list
+
+
+async def explore_level(queries: list[str], level_num: int, context: RunContext) -> dict:
+    """探索一个层级(并发获取所有query的推荐词)"""
+    print(f"\n{'='*60}")
+    print(f"Level {level_num} 探索:{len(queries)} 个query")
+    print(f"{'='*60}")
+
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    # 并发获取所有推荐词
+    async def get_single_sug(query: str):
+        print(f"  探索: {query}")
+        suggestions = xiaohongshu_api.get_recommendations(keyword=query)
+        print(f"    → {len(suggestions) if suggestions else 0} 个推荐词")
+        return {
+            "query": query,
+            "suggestions": suggestions or []
+        }
+
+    results = await asyncio.gather(*[get_single_sug(q) for q in queries])
+
+    level_data = {
+        "level": level_num,
+        "timestamp": datetime.now().isoformat(),
+        "queries": results
+    }
+
+    context.exploration_levels.append(level_data)
+    return level_data
+
+
+async def analyze_level(level_data: dict, all_levels: list[dict], original_question: str, context: RunContext) -> LevelAnalysis:
+    """分析当前层级,决定下一步"""
+    print(f"\n正在分析 Level {level_data['level']}...")
+
+    # 构造输入
+    analysis_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<已探索的所有层级>
+{json.dumps(all_levels, ensure_ascii=False, indent=2)}
+</已探索的所有层级>
+
+<当前层级>
+Level {level_data['level']}
+{json.dumps(level_data['queries'], ensure_ascii=False, indent=2)}
+</当前层级>
+
+请分析当前探索状态,决定下一步行动。
+"""
+
+    result = await Runner.run(level_analyzer, analysis_input)
+    analysis: LevelAnalysis = result.final_output
+
+    print(f"\n分析结果:")
+    print(f"  关键发现:{analysis.key_findings}")
+    print(f"  有价值的信号:{len(analysis.promising_signals)} 个")
+    print(f"  是否评估:{analysis.should_evaluate_now}")
+
+    if analysis.should_evaluate_now:
+        print(f"  候选query:{analysis.candidates_to_evaluate}")
+    else:
+        print(f"  下一层探索:{analysis.next_combinations}")
+
+    # 保存分析结果
+    context.level_analyses.append({
+        "level": level_data['level'],
+        "timestamp": datetime.now().isoformat(),
+        "analysis": analysis.model_dump()
+    })
+
+    return analysis
+
+
+async def evaluate_candidates(candidates: list[str], original_question: str, context: RunContext) -> list[dict]:
+    """评估候选query"""
+    print(f"\n{'='*60}")
+    print(f"评估 {len(candidates)} 个候选query")
+    print(f"{'='*60}")
+
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    async def evaluate_single_candidate(candidate: str):
+        print(f"\n评估候选:{candidate}")
+
+        # 1. 获取推荐词
+        suggestions = xiaohongshu_api.get_recommendations(keyword=candidate)
+        print(f"  获取到 {len(suggestions) if suggestions else 0} 个推荐词")
+
+        if not suggestions:
+            return {
+                "candidate": candidate,
+                "suggestions": [],
+                "evaluations": []
+            }
+
+        # 2. 评估每个推荐词
+        async def eval_single_sug(sug: str):
+            eval_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<待评估的推荐query>
+{sug}
+</待评估的推荐query>
+
+请评估该推荐query:
+1. intent_match: 意图是否匹配(true/false)
+2. relevance_score: 相关性分数(0-1)
+3. reason: 详细的评估理由
+"""
+            result = await Runner.run(evaluator, eval_input)
+            evaluation: RelevanceEvaluation = result.final_output
+            return {
+                "query": sug,
+                "intent_match": evaluation.intent_match,
+                "relevance_score": evaluation.relevance_score,
+                "reason": evaluation.reason,
+            }
+
+        evaluations = await asyncio.gather(*[eval_single_sug(s) for s in suggestions])
+
+        return {
+            "candidate": candidate,
+            "suggestions": suggestions,
+            "evaluations": evaluations
+        }
+
+    results = await asyncio.gather(*[evaluate_single_candidate(c) for c in candidates])
+
+    context.evaluation_results = results
+    return results
+
+
+def find_qualified_queries(evaluation_results: list[dict], min_relevance_score: float = 0.7) -> list[dict]:
+    """
+    查找所有合格的query
+
+    筛选标准:
+    1. intent_match = True(必须满足)
+    2. relevance_score >= min_relevance_score
+
+    返回:按 relevance_score 降序排列
+    """
+    all_qualified = []
+
+    for result in evaluation_results:
+        for eval_item in result.get("evaluations", []):
+            if (eval_item['intent_match'] is True
+                and eval_item['relevance_score'] >= min_relevance_score):
+                all_qualified.append({
+                    "from_candidate": result["candidate"],
+                    **eval_item
+                })
+
+    # 按relevance_score降序排列
+    return sorted(all_qualified, key=lambda x: x['relevance_score'], reverse=True)
+
+
+# ============================================================================
+# 主流程
+# ============================================================================
+
+async def progressive_exploration(context: RunContext, max_levels: int = 4) -> dict:
+    """
+    渐进式广度探索流程
+
+    Args:
+        context: 运行上下文
+        max_levels: 最大探索层数,默认4
+
+    返回格式:
+    {
+        "success": True/False,
+        "results": [...],
+        "message": "..."
+    }
+    """
+
+    # 阶段1:提取关键词(从原始问题提取)
+    keyword_result = await extract_keywords(context.q)
+    context.keywords = keyword_result.keywords
+
+    # 阶段2:渐进式探索
+    current_level = 1
+
+    # Level 1:单个关键词
+    level_1_queries = context.keywords[:7]  # 限制最多7个关键词
+    level_1_data = await explore_level(level_1_queries, current_level, context)
+
+    # 分析Level 1
+    analysis_1 = await analyze_level(level_1_data, context.exploration_levels, context.q, context)
+
+    if analysis_1.should_evaluate_now:
+        # 直接评估
+        eval_results = await evaluate_candidates(analysis_1.candidates_to_evaluate, context.q, context)
+        qualified = find_qualified_queries(eval_results, min_relevance_score=0.7)
+
+        if qualified:
+            return {
+                "success": True,
+                "results": qualified,
+                "message": f"Level 1 即找到 {len(qualified)} 个合格query"
+            }
+
+    # Level 2 及以后:迭代探索
+    for level_num in range(2, max_levels + 1):
+        # 获取上一层的分析结果
+        prev_analysis: LevelAnalysis = context.level_analyses[-1]["analysis"]
+        prev_analysis = LevelAnalysis(**prev_analysis)  # 转回对象
+
+        if not prev_analysis.next_combinations:
+            print(f"\nLevel {level_num-1} 分析后无需继续探索")
+            break
+
+        # 探索当前层
+        level_data = await explore_level(prev_analysis.next_combinations, level_num, context)
+
+        # 分析当前层
+        analysis = await analyze_level(level_data, context.exploration_levels, context.q, context)
+
+        if analysis.should_evaluate_now:
+            # 评估候选
+            eval_results = await evaluate_candidates(analysis.candidates_to_evaluate, context.q, context)
+            qualified = find_qualified_queries(eval_results, min_relevance_score=0.7)
+
+            if qualified:
+                return {
+                    "success": True,
+                    "results": qualified,
+                    "message": f"Level {level_num} 找到 {len(qualified)} 个合格query"
+                }
+
+    # 所有层探索完,降低标准
+    print(f"\n{'='*60}")
+    print(f"探索完 {max_levels} 层,降低标准(relevance_score >= 0.5)")
+    print(f"{'='*60}")
+
+    if context.evaluation_results:
+        acceptable = find_qualified_queries(context.evaluation_results, min_relevance_score=0.5)
+        if acceptable:
+            return {
+                "success": True,
+                "results": acceptable,
+                "message": f"找到 {len(acceptable)} 个可接受query(soft_score >= 0.5)"
+            }
+
+    # 完全失败
+    return {
+        "success": False,
+        "results": [],
+        "message": "探索完所有层级,未找到合格的推荐词"
+    }
+
+
+# ============================================================================
+# 输出格式化
+# ============================================================================
+
+def format_output(optimization_result: dict, context: RunContext) -> str:
+    """格式化输出结果"""
+    results = optimization_result.get("results", [])
+
+    output = f"原始问题:{context.q}\n"
+    output += f"提取的关键词:{', '.join(context.keywords or [])}\n"
+    output += f"探索层数:{len(context.exploration_levels)}\n"
+    output += f"状态:{optimization_result['message']}\n\n"
+
+    if optimization_result["success"] and results:
+        output += "合格的推荐query(按relevance_score降序):\n"
+        for i, result in enumerate(results, 1):
+            output += f"\n{i}. {result['query']}\n"
+            output += f"   - 来自候选:{result['from_candidate']}\n"
+            output += f"   - 意图匹配:{result['intent_match']} (True=意图一致)\n"
+            output += f"   - 相关性分数:{result['relevance_score']:.2f} (0-1,越高越相关)\n"
+            output += f"   - 评估理由:{result['reason']}\n"
+    else:
+        output += "结果:未找到合格推荐query\n"
+        if context.level_analyses:
+            last_analysis = context.level_analyses[-1]["analysis"]
+            output += f"\n最后一层分析:\n{last_analysis.get('key_findings', 'N/A')}\n"
+
+    return output.strip()
+
+
+# ============================================================================
+# 主函数
+# ============================================================================
+
+async def main(input_dir: str, max_levels: int = 4):
+    current_time, log_url = set_trace()
+
+    # 从目录中读取固定文件名
+    input_context_file = os.path.join(input_dir, 'context.md')
+    input_q_file = os.path.join(input_dir, 'q.md')
+
+    q_context = read_file_as_string(input_context_file)
+    q = read_file_as_string(input_q_file)
+    q_with_context = f"""
+<需求上下文>
+{q_context}
+</需求上下文>
+<当前问题>
+{q}
+</当前问题>
+""".strip()
+
+    # 获取当前文件名作为版本
+    version = os.path.basename(__file__)
+    version_name = os.path.splitext(version)[0]
+
+    # 日志保存目录
+    log_dir = os.path.join(input_dir, "output", version_name, current_time)
+
+    run_context = RunContext(
+        version=version,
+        input_files={
+            "input_dir": input_dir,
+            "context_file": input_context_file,
+            "q_file": input_q_file,
+        },
+        q_with_context=q_with_context,
+        q_context=q_context,
+        q=q,
+        log_dir=log_dir,
+        log_url=log_url,
+    )
+
+    # 执行渐进式探索
+    optimization_result = await progressive_exploration(run_context, max_levels=max_levels)
+
+    # 格式化输出
+    final_output = format_output(optimization_result, run_context)
+    print(f"\n{'='*60}")
+    print("最终结果")
+    print(f"{'='*60}")
+    print(final_output)
+
+    # 保存结果
+    run_context.optimization_result = optimization_result
+    run_context.final_output = final_output
+
+    # 保存 RunContext 到 log_dir
+    os.makedirs(run_context.log_dir, exist_ok=True)
+    context_file_path = os.path.join(run_context.log_dir, "run_context.json")
+    with open(context_file_path, "w", encoding="utf-8") as f:
+        json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
+    print(f"\nRunContext saved to: {context_file_path}")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1 意图匹配+相关性评分版")
+    parser.add_argument(
+        "--input-dir",
+        type=str,
+        default="input/简单扣图",
+        help="输入目录路径,默认: input/简单扣图"
+    )
+    parser.add_argument(
+        "--max-levels",
+        type=int,
+        default=4,
+        help="最大探索层数,默认: 4"
+    )
+    args = parser.parse_args()
+
+    asyncio.run(main(args.input_dir, max_levels=args.max_levels))

+ 557 - 0
sug_v6_2_combinatorial.py

@@ -0,0 +1,557 @@
+import asyncio
+import json
+import os
+import argparse
+from datetime import datetime
+from itertools import combinations
+
+from agents import Agent, Runner
+from lib.my_trace import set_trace
+from typing import Literal
+from pydantic import BaseModel, Field
+
+from lib.utils import read_file_as_string
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
+
+
+class RunContext(BaseModel):
+    version: str = Field(..., description="当前运行的脚本版本(文件名)")
+    input_files: dict[str, str] = Field(..., description="输入文件路径映射")
+    q_with_context: str
+    q_context: str
+    q: str
+    log_url: str
+    log_dir: str
+
+    # 分词和组合
+    keywords: list[str] | None = Field(default=None, description="提取的关键词")
+    query_combinations: dict[str, list[str]] = Field(default_factory=dict, description="各层级的query组合")
+
+    # 探索结果
+    all_sug_queries: list[dict] = Field(default_factory=list, description="所有获取到的推荐词")
+
+    # 评估结果
+    evaluation_results: list[dict] = Field(default_factory=list, description="所有推荐词的评估结果")
+    optimization_result: dict | None = Field(default=None, description="最终优化结果对象")
+    final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)")
+
+
+# ============================================================================
+# Agent 1: 分词专家
+# ============================================================================
+segmentation_instructions = """
+你是中文分词专家。给定一个句子,将其分词。
+
+## 分词原则
+
+1. 去掉标点符号
+2. 拆分成最小的有意义单元
+3. 去掉助词、语气词、助动词
+4. 保留疑问词
+5. 保留实词:名词、动词、形容词、副词
+
+## 输出要求
+
+输出分词列表。
+""".strip()
+
+class SegmentationResult(BaseModel):
+    """分词结果"""
+    words: list[str] = Field(..., description="分词列表")
+    reasoning: str = Field(..., description="分词说明")
+
+segmenter = Agent[None](
+    name="分词专家",
+    instructions=segmentation_instructions,
+    output_type=SegmentationResult,
+)
+
+
+# ============================================================================
+# Agent 2: 评估专家(意图匹配 + 相关性评分)
+# ============================================================================
+eval_instructions = """
+你是搜索query评估专家。给定原始问题和推荐query,评估两个维度。
+
+## 评估目标
+
+用这个推荐query搜索,能否找到满足原始需求的内容?
+
+## 两层评分
+
+### 1. intent_match(意图匹配)= true/false
+
+推荐query的**使用意图**是否与原问题一致?
+
+**核心问题:用户搜索这个推荐词,想做什么?**
+
+**判断标准:**
+- 原问题意图:找方法?找教程?找资源/素材?找工具?看作品?
+- 推荐词意图:如果用户搜索这个词,他的目的是什么?
+
+**评分:**
+- true = 意图一致,搜索推荐词能达到原问题的目的
+- false = 意图改变,搜索推荐词无法达到原问题的目的
+
+### 2. relevance_score(相关性)= 0-1 连续分数
+
+推荐query在**主题、要素、属性**上与原问题的相关程度?
+
+**评估维度:**
+- 主题相关:核心主题是否匹配?(如:摄影、旅游、美食)
+- 要素覆盖:关键要素保留了多少?(如:地域、时间、对象、工具)
+- 属性匹配:质量、风格、特色等属性是否保留?
+
+**评分参考:**
+- 0.9-1.0 = 几乎完美匹配,所有核心要素都保留
+- 0.7-0.8 = 高度相关,核心要素保留,少数次要要素缺失
+- 0.5-0.6 = 中度相关,主题匹配但多个要素缺失
+- 0.3-0.4 = 低度相关,只有部分主题相关
+- 0-0.2 = 基本不相关
+
+## 评估策略
+
+1. **先判断 intent_match**:意图不匹配直接 false,无论相关性多高
+2. **再评估 relevance_score**:在意图匹配的前提下,计算相关性
+
+## 输出要求
+
+- intent_match: true/false
+- relevance_score: 0-1 的浮点数
+- reason: 详细的评估理由,需要说明:
+  - 原问题的意图是什么
+  - 推荐词的意图是什么
+  - 为什么判断意图匹配/不匹配
+  - 相关性分数的依据(哪些要素保留/缺失)
+""".strip()
+
+class RelevanceEvaluation(BaseModel):
+    """评估反馈模型 - 意图匹配 + 相关性"""
+    intent_match: bool = Field(..., description="意图是否匹配")
+    relevance_score: float = Field(..., description="相关性分数 0-1,分数越高越相关")
+    reason: str = Field(..., description="评估理由,需说明意图判断和相关性依据")
+
+evaluator = Agent[None](
+    name="评估专家",
+    instructions=eval_instructions,
+    output_type=RelevanceEvaluation,
+)
+
+
+# ============================================================================
+# 核心函数
+# ============================================================================
+
+async def segment_text(q: str) -> SegmentationResult:
+    """分词"""
+    print("\n正在分词...")
+    result = await Runner.run(segmenter, q)
+    seg_result: SegmentationResult = result.final_output
+    print(f"分词结果:{seg_result.words}")
+    print(f"分词说明:{seg_result.reasoning}")
+    return seg_result
+
+
+def generate_query_combinations(keywords: list[str], max_combination_size: int) -> dict[str, list[str]]:
+    """
+    生成query组合
+
+    Args:
+        keywords: 关键词列表
+        max_combination_size: 最大组合词数(N)
+
+    Returns:
+        {
+            "1-word": [...],
+            "2-word": [...],
+            "3-word": [...],
+            ...
+            "N-word": [...]
+        }
+    """
+    result = {}
+
+    for size in range(1, max_combination_size + 1):
+        if size > len(keywords):
+            break
+
+        combs = list(combinations(keywords, size))
+        queries = [''.join(comb) for comb in combs]  # 直接拼接,无空格
+        result[f"{size}-word"] = queries
+
+        print(f"\n{size}词组合:{len(queries)} 个")
+        if len(queries) <= 10:
+            for q in queries:
+                print(f"  - {q}")
+        else:
+            print(f"  - {queries[0]}")
+            print(f"  - {queries[1]}")
+            print(f"  ...")
+            print(f"  - {queries[-1]}")
+
+    return result
+
+
+async def fetch_suggestions_for_queries(queries: list[str], context: RunContext) -> list[dict]:
+    """
+    并发获取所有query的推荐词
+
+    Returns:
+        [
+            {
+                "query": "川西",
+                "suggestions": ["川西旅游", "川西攻略", ...],
+                "timestamp": "..."
+            },
+            ...
+        ]
+    """
+    print(f"\n{'='*60}")
+    print(f"获取推荐词:{len(queries)} 个query")
+    print(f"{'='*60}")
+
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    async def get_single_sug(query: str):
+        print(f"  查询: {query}")
+        suggestions = xiaohongshu_api.get_recommendations(keyword=query)
+        print(f"    → {len(suggestions) if suggestions else 0} 个推荐词")
+        return {
+            "query": query,
+            "suggestions": suggestions or [],
+            "timestamp": datetime.now().isoformat()
+        }
+
+    results = await asyncio.gather(*[get_single_sug(q) for q in queries])
+    return results
+
+
+async def evaluate_all_suggestions(sug_results: list[dict], original_question: str, context: RunContext) -> list[dict]:
+    """
+    评估所有推荐词
+
+    Args:
+        sug_results: 所有query的推荐词结果
+        original_question: 原始问题
+
+    Returns:
+        [
+            {
+                "source_query": "川西秋季",
+                "sug_query": "川西秋季旅游",
+                "intent_match": True,
+                "relevance_score": 0.8,
+                "reason": "..."
+            },
+            ...
+        ]
+    """
+    print(f"\n{'='*60}")
+    print(f"评估推荐词")
+    print(f"{'='*60}")
+
+    # 收集所有推荐词
+    all_evaluations = []
+
+    async def evaluate_single_sug(source_query: str, sug_query: str):
+        eval_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<待评估的推荐query>
+{sug_query}
+</待评估的推荐query>
+
+请评估该推荐query:
+1. intent_match: 意图是否匹配(true/false)
+2. relevance_score: 相关性分数(0-1)
+3. reason: 详细的评估理由
+"""
+        result = await Runner.run(evaluator, eval_input)
+        evaluation: RelevanceEvaluation = result.final_output
+        return {
+            "source_query": source_query,
+            "sug_query": sug_query,
+            "intent_match": evaluation.intent_match,
+            "relevance_score": evaluation.relevance_score,
+            "reason": evaluation.reason,
+        }
+
+    # 并发评估所有推荐词
+    tasks = []
+    for sug_result in sug_results:
+        source_query = sug_result["query"]
+        for sug in sug_result["suggestions"]:
+            tasks.append(evaluate_single_sug(source_query, sug))
+
+    if tasks:
+        print(f"  总共需要评估 {len(tasks)} 个推荐词...")
+        all_evaluations = await asyncio.gather(*tasks)
+
+    context.evaluation_results = all_evaluations
+    return all_evaluations
+
+
+def find_qualified_queries(evaluations: list[dict], min_relevance_score: float = 0.7) -> list[dict]:
+    """
+    查找所有合格的query
+
+    筛选标准:
+    1. intent_match = True(必须满足)
+    2. relevance_score >= min_relevance_score
+
+    返回:按 relevance_score 降序排列
+    """
+    qualified = [
+        e for e in evaluations
+        if e['intent_match'] is True and e['relevance_score'] >= min_relevance_score
+    ]
+
+    # 按relevance_score降序排列
+    return sorted(qualified, key=lambda x: x['relevance_score'], reverse=True)
+
+
+# ============================================================================
+# 主流程
+# ============================================================================
+
+async def combinatorial_search(context: RunContext, max_combination_size: int = 1) -> dict:
+    """
+    组合式搜索流程
+
+    Args:
+        context: 运行上下文
+        max_combination_size: 最大组合词数(N),默认1
+
+    返回格式:
+    {
+        "success": True/False,
+        "results": [...],
+        "message": "..."
+    }
+    """
+
+    # 步骤1:分词
+    seg_result = await segment_text(context.q)
+    context.keywords = seg_result.words
+
+    # 步骤2:生成query组合
+    print(f"\n{'='*60}")
+    print(f"生成query组合(最大组合数:{max_combination_size})")
+    print(f"{'='*60}")
+    query_combinations = generate_query_combinations(context.keywords, max_combination_size)
+    context.query_combinations = query_combinations
+
+    # 步骤3:获取所有query的推荐词
+    all_queries = []
+    for level, queries in query_combinations.items():
+        all_queries.extend(queries)
+
+    sug_results = await fetch_suggestions_for_queries(all_queries, context)
+    context.all_sug_queries = sug_results
+
+    # 统计
+    total_sugs = sum(len(r["suggestions"]) for r in sug_results)
+    print(f"\n总共获取到 {total_sugs} 个推荐词")
+
+    # 步骤4:评估所有推荐词
+    evaluations = await evaluate_all_suggestions(sug_results, context.q, context)
+
+    # 步骤5:筛选合格query
+    qualified = find_qualified_queries(evaluations, min_relevance_score=0.7)
+
+    if qualified:
+        return {
+            "success": True,
+            "results": qualified,
+            "message": f"找到 {len(qualified)} 个合格query(intent_match=True 且 relevance>=0.7)"
+        }
+
+    # 降低标准
+    acceptable = find_qualified_queries(evaluations, min_relevance_score=0.5)
+    if acceptable:
+        return {
+            "success": True,
+            "results": acceptable,
+            "message": f"找到 {len(acceptable)} 个可接受query(intent_match=True 且 relevance>=0.5)"
+        }
+
+    # 完全失败:返回所有intent_match=True的
+    intent_matched = [e for e in evaluations if e['intent_match'] is True]
+    if intent_matched:
+        intent_matched_sorted = sorted(intent_matched, key=lambda x: x['relevance_score'], reverse=True)
+        return {
+            "success": False,
+            "results": intent_matched_sorted[:10],  # 只返回前10个
+            "message": f"未找到高相关性query,但有 {len(intent_matched)} 个意图匹配的推荐词"
+        }
+
+    return {
+        "success": False,
+        "results": [],
+        "message": "未找到任何意图匹配的推荐词"
+    }
+
+
+# ============================================================================
+# 输出格式化
+# ============================================================================
+
+def format_output(optimization_result: dict, context: RunContext) -> str:
+    """格式化输出结果"""
+    results = optimization_result.get("results", [])
+
+    output = f"原始问题:{context.q}\n"
+    output += f"提取的关键词:{', '.join(context.keywords or [])}\n"
+    output += f"关键词数量:{len(context.keywords or [])}\n"
+    output += f"\nquery组合统计:\n"
+    for level, queries in context.query_combinations.items():
+        output += f"  - {level}: {len(queries)} 个\n"
+
+    # 统计信息
+    total_queries = sum(len(q) for q in context.query_combinations.values())
+    total_sugs = sum(len(r["suggestions"]) for r in context.all_sug_queries)
+    total_evals = len(context.evaluation_results)
+
+    output += f"\n探索统计:\n"
+    output += f"  - 总query数:{total_queries}\n"
+    output += f"  - 总推荐词数:{total_sugs}\n"
+    output += f"  - 总评估数:{total_evals}\n"
+
+    output += f"\n状态:{optimization_result['message']}\n\n"
+
+    if optimization_result["success"] and results:
+        output += "=" * 60 + "\n"
+        output += "合格的推荐query(按relevance_score降序):\n"
+        output += "=" * 60 + "\n"
+        for i, result in enumerate(results[:20], 1):  # 只显示前20个
+            output += f"\n{i}. [{result['relevance_score']:.2f}] {result['sug_query']}\n"
+            output += f"   来源:{result['source_query']}\n"
+            output += f"   意图:{'✓ 匹配' if result['intent_match'] else '✗ 不匹配'}\n"
+            output += f"   理由:{result['reason'][:150]}...\n" if len(result['reason']) > 150 else f"   理由:{result['reason']}\n"
+    else:
+        output += "=" * 60 + "\n"
+        output += "结果:未找到足够相关的推荐query\n"
+        output += "=" * 60 + "\n"
+        if results:
+            output += "\n最接近的推荐词(前10个):\n\n"
+            for i, result in enumerate(results[:10], 1):
+                output += f"{i}. [{result['relevance_score']:.2f}] {result['sug_query']}\n"
+                output += f"   来源:{result['source_query']}\n"
+                output += f"   意图:{'✓ 匹配' if result['intent_match'] else '✗ 不匹配'}\n\n"
+
+        # 按source_query分组显示
+        output += "\n" + "=" * 60 + "\n"
+        output += "按查询词分组的推荐词情况:\n"
+        output += "=" * 60 + "\n"
+
+        for sug_data in context.all_sug_queries:
+            source_q = sug_data["query"]
+            sugs = sug_data["suggestions"]
+
+            # 找到这个source_query对应的所有评估
+            related_evals = [e for e in context.evaluation_results if e["source_query"] == source_q]
+            intent_match_count = sum(1 for e in related_evals if e["intent_match"])
+            avg_relevance = sum(e["relevance_score"] for e in related_evals) / len(related_evals) if related_evals else 0
+
+            output += f"\n查询:{source_q}\n"
+            output += f"  推荐词数:{len(sugs)}\n"
+            output += f"  意图匹配数:{intent_match_count}/{len(related_evals)}\n"
+            output += f"  平均相关性:{avg_relevance:.2f}\n"
+
+            # 显示前3个推荐词
+            if sugs:
+                output += f"  示例推荐词:\n"
+                for sug in sugs[:3]:
+                    eval_item = next((e for e in related_evals if e["sug_query"] == sug), None)
+                    if eval_item:
+                        output += f"    - {sug} [意图:{'✓' if eval_item['intent_match'] else '✗'}, 相关:{eval_item['relevance_score']:.2f}]\n"
+                    else:
+                        output += f"    - {sug}\n"
+
+    return output.strip()
+
+
+# ============================================================================
+# 主函数
+# ============================================================================
+
+async def main(input_dir: str, max_combination_size: int = 1):
+    current_time, log_url = set_trace()
+
+    # 从目录中读取固定文件名
+    input_context_file = os.path.join(input_dir, 'context.md')
+    input_q_file = os.path.join(input_dir, 'q.md')
+
+    q_context = read_file_as_string(input_context_file)
+    q = read_file_as_string(input_q_file)
+    q_with_context = f"""
+<需求上下文>
+{q_context}
+</需求上下文>
+<当前问题>
+{q}
+</当前问题>
+""".strip()
+
+    # 获取当前文件名作为版本
+    version = os.path.basename(__file__)
+    version_name = os.path.splitext(version)[0]
+
+    # 日志保存目录
+    log_dir = os.path.join(input_dir, "output", version_name, current_time)
+
+    run_context = RunContext(
+        version=version,
+        input_files={
+            "input_dir": input_dir,
+            "context_file": input_context_file,
+            "q_file": input_q_file,
+        },
+        q_with_context=q_with_context,
+        q_context=q_context,
+        q=q,
+        log_dir=log_dir,
+        log_url=log_url,
+    )
+
+    # 执行组合式搜索
+    optimization_result = await combinatorial_search(run_context, max_combination_size=max_combination_size)
+
+    # 格式化输出
+    final_output = format_output(optimization_result, run_context)
+    print(f"\n{'='*60}")
+    print("最终结果")
+    print(f"{'='*60}")
+    print(final_output)
+
+    # 保存结果
+    run_context.optimization_result = optimization_result
+    run_context.final_output = final_output
+
+    # 保存 RunContext 到 log_dir
+    os.makedirs(run_context.log_dir, exist_ok=True)
+    context_file_path = os.path.join(run_context.log_dir, "run_context.json")
+    with open(context_file_path, "w", encoding="utf-8") as f:
+        json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
+    print(f"\nRunContext saved to: {context_file_path}")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.2 组合式搜索版")
+    parser.add_argument(
+        "--input-dir",
+        type=str,
+        default="input/简单扣图",
+        help="输入目录路径,默认: input/简单扣图"
+    )
+    parser.add_argument(
+        "--max-combo",
+        type=int,
+        default=1,
+        help="最大组合词数(N),默认: 1"
+    )
+    args = parser.parse_args()
+
+    asyncio.run(main(args.input_dir, max_combination_size=args.max_combo))

+ 712 - 0
sug_v6_3_with_annotation.py

@@ -0,0 +1,712 @@
+import asyncio
+import json
+import os
+import argparse
+from datetime import datetime
+from itertools import combinations, permutations
+
+from agents import Agent, Runner
+from lib.my_trace import set_trace
+from typing import Literal
+from pydantic import BaseModel, Field
+
+from lib.utils import read_file_as_string
+from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
+
+
+# ============================================================================
+# 并发控制配置
+# ============================================================================
+# API请求并发度(小红书接口)
+API_CONCURRENCY_LIMIT = 5
+
+# 模型评估并发度(GPT评估)
+MODEL_CONCURRENCY_LIMIT = 10
+
+
+class RunContext(BaseModel):
+    version: str = Field(..., description="当前运行的脚本版本(文件名)")
+    input_files: dict[str, str] = Field(..., description="输入文件路径映射")
+    q_with_context: str
+    q_context: str
+    q: str
+    log_url: str
+    log_dir: str
+
+    # 问题标注
+    question_annotation: str | None = Field(default=None, description="问题的标注结果(三层)")
+
+    # 分词和组合
+    keywords: list[str] | None = Field(default=None, description="提取的关键词")
+    query_combinations: dict[str, list[str]] = Field(default_factory=dict, description="各层级的query组合")
+
+    # 探索结果
+    all_sug_queries: list[dict] = Field(default_factory=list, description="所有获取到的推荐词")
+
+    # 评估结果
+    evaluation_results: list[dict] = Field(default_factory=list, description="所有推荐词的评估结果")
+    optimization_result: dict | None = Field(default=None, description="最终优化结果对象")
+    final_output: str | None = Field(default=None, description="最终输出结果(格式化文本)")
+
+
+# ============================================================================
+# Agent 1: 问题标注专家
+# ============================================================================
+question_annotation_instructions = """
+你是搜索需求分析专家。给定问题(含需求背景),在原文上标注三层:本质、硬性、软性。
+
+## 判断标准
+
+**[本质]** - 问题的核心意图
+- 如何获取、教程、推荐、作品、测评等
+
+**[硬]** - 客观事实性约束(可明确验证、非主观判断)
+- 能明确区分类别的:地域、时间、对象、工具、操作类型
+- 特征:改变后得到完全不同类别的结果
+
+**[软]** - 主观判断性修饰(因人而异、程度性的)
+- 需要主观评价的:质量、速度、美观、特色、程度
+- 特征:改变后仍是同类结果,只是满足程度不同
+
+## 输出格式
+
+词语[本质-描述]、词语[硬-描述]、词语[软-描述]
+
+## 注意
+- 只输出标注后的字符串
+- 结合需求背景判断意图
+""".strip()
+
+question_annotator = Agent[None](
+    name="问题标注专家",
+    instructions=question_annotation_instructions,
+)
+
+
+# ============================================================================
+# Agent 2: 分词专家
+# ============================================================================
+segmentation_instructions = """
+你是中文分词专家。给定一个句子,将其分词。
+
+## 分词原则
+
+1. 去掉标点符号
+2. 拆分成最小的有意义单元
+3. 去掉助词、语气词、助动词
+4. 保留疑问词
+5. 保留实词:名词、动词、形容词、副词
+
+## 输出要求
+
+输出分词列表。
+""".strip()
+
+class SegmentationResult(BaseModel):
+    """分词结果"""
+    words: list[str] = Field(..., description="分词列表")
+    reasoning: str = Field(..., description="分词说明")
+
+segmenter = Agent[None](
+    name="分词专家",
+    instructions=segmentation_instructions,
+    output_type=SegmentationResult,
+)
+
+
+# ============================================================================
+# Agent 3: 评估专家(意图匹配 + 相关性评分)
+# ============================================================================
+eval_instructions = """
+你是搜索query评估专家。给定原始问题、问题标注和推荐query,评估两个维度。
+
+## 输入信息
+
+你会收到:
+1. 原始问题:用户的原始表述
+2. 问题标注:对原始问题的三层标注(本质、硬性、软性)
+3. 推荐query:待评估的推荐词
+
+## 评估目标
+
+用这个推荐query搜索,能否找到满足原始需求的内容?
+
+## 两层评分
+
+### 1. intent_match(意图匹配)= true/false
+
+推荐query的**使用意图**是否与原问题的**本质**一致?
+
+**核心:只关注[本质]标注**
+- 问题标注中的 `[本质-XXX]` 标记明确了用户的核心意图
+- 判断推荐词是否能达成这个核心意图
+
+**常见本质类型:**
+- 找方法/如何获取 → 推荐词应包含方法、途径、网站、渠道等
+- 找教程 → 推荐词应是教程、教学相关
+- 找资源/素材 → 推荐词应是资源、素材本身
+- 找工具 → 推荐词应是工具推荐
+- 看作品 → 推荐词应是作品展示
+
+**评分:**
+- true = 推荐词的意图与 `[本质]` 一致
+- false = 推荐词的意图与 `[本质]` 不一致
+
+### 2. relevance_score(相关性)= 0-1 连续分数
+
+在意图匹配的前提下,推荐query在**主题、要素、属性**上与原问题的相关程度?
+
+**评估维度:**
+- 主题相关:核心主题是否匹配?(如:摄影、旅游、美食)
+- 要素覆盖:`[硬-XXX]` 标记的硬性约束保留了多少?(地域、时间、对象、工具等)
+- 属性匹配:`[软-XXX]` 标记的软性修饰保留了多少?(质量、速度、美观等)
+
+**评分参考:**
+- 0.9-1.0 = 几乎完美匹配,[硬]和[软]标注的要素都保留
+- 0.7-0.8 = 高度相关,[硬]标注的要素都保留,[软]标注少数缺失
+- 0.5-0.6 = 中度相关,[硬]标注的要素保留大部分,[软]标注多数缺失
+- 0.3-0.4 = 低度相关,[硬]标注的要素部分缺失
+- 0-0.2 = 基本不相关,[硬]标注的要素大量缺失
+
+## 评估策略
+
+1. **先看[本质]判断 intent_match**:意图不匹配直接 false
+2. **再看[硬][软]评估 relevance_score**:计算要素和属性的保留程度
+
+## 输出要求
+
+- intent_match: true/false
+- relevance_score: 0-1 的浮点数
+- reason: 详细的评估理由,需要说明:
+  - 原问题的[本质]是什么,推荐词是否匹配这个本质
+  - [硬]约束哪些保留/缺失
+  - [软]修饰哪些保留/缺失
+  - 最终相关性分数的依据
+""".strip()
+
+class RelevanceEvaluation(BaseModel):
+    """评估反馈模型 - 意图匹配 + 相关性"""
+    intent_match: bool = Field(..., description="意图是否匹配")
+    relevance_score: float = Field(..., description="相关性分数 0-1,分数越高越相关")
+    reason: str = Field(..., description="评估理由,需说明意图判断和相关性依据")
+
+evaluator = Agent[None](
+    name="评估专家",
+    instructions=eval_instructions,
+    output_type=RelevanceEvaluation,
+)
+
+
+# ============================================================================
+# 核心函数
+# ============================================================================
+
+async def annotate_question(q_with_context: str) -> str:
+    """标注问题(三层)"""
+    print("\n正在标注问题...")
+    result = await Runner.run(question_annotator, q_with_context)
+    annotation = str(result.final_output)
+    print(f"问题标注完成:{annotation}")
+    return annotation
+
+
+async def segment_text(q: str) -> SegmentationResult:
+    """分词"""
+    print("\n正在分词...")
+    result = await Runner.run(segmenter, q)
+    seg_result: SegmentationResult = result.final_output
+    print(f"分词结果:{seg_result.words}")
+    print(f"分词说明:{seg_result.reasoning}")
+    return seg_result
+
+
+def generate_query_combinations(keywords: list[str], max_combination_size: int) -> dict[str, list[str]]:
+    """
+    生成query组合(考虑词的顺序)
+
+    Args:
+        keywords: 关键词列表
+        max_combination_size: 最大组合词数(N)
+
+    Returns:
+        {
+            "1-word": [...],
+            "2-word": [...],
+            "3-word": [...],
+            ...
+            "N-word": [...]
+        }
+    """
+    result = {}
+
+    for size in range(1, max_combination_size + 1):
+        if size > len(keywords):
+            break
+
+        # 1-word组合:不需要考虑顺序
+        if size == 1:
+            queries = keywords.copy()
+        else:
+            # 多词组合:先选择size个词(combinations),再排列(permutations)
+            all_queries = []
+            combs = list(combinations(keywords, size))
+            for comb in combs:
+                # 对每个组合生成所有排列
+                perms = list(permutations(comb))
+                for perm in perms:
+                    query = ''.join(perm)  # 直接拼接,无空格
+                    all_queries.append(query)
+
+            # 去重(虽然理论上不会重复,但保险起见)
+            queries = list(dict.fromkeys(all_queries))
+
+        result[f"{size}-word"] = queries
+
+        print(f"\n{size}词组合:{len(queries)} 个")
+        if len(queries) <= 10:
+            for q in queries:
+                print(f"  - {q}")
+        else:
+            print(f"  - {queries[0]}")
+            print(f"  - {queries[1]}")
+            print(f"  ...")
+            print(f"  - {queries[-1]}")
+
+    return result
+
+
+async def fetch_suggestions_for_queries(queries: list[str], context: RunContext) -> list[dict]:
+    """
+    并发获取所有query的推荐词(带并发控制)
+
+    Returns:
+        [
+            {
+                "query": "川西",
+                "suggestions": ["川西旅游", "川西攻略", ...],
+                "timestamp": "..."
+            },
+            ...
+        ]
+    """
+    print(f"\n{'='*60}")
+    print(f"获取推荐词:{len(queries)} 个query(并发度:{API_CONCURRENCY_LIMIT})")
+    print(f"{'='*60}")
+
+    xiaohongshu_api = XiaohongshuSearchRecommendations()
+
+    # 创建信号量控制并发
+    semaphore = asyncio.Semaphore(API_CONCURRENCY_LIMIT)
+
+    async def get_single_sug(query: str):
+        async with semaphore:
+            print(f"  查询: {query}")
+            suggestions = xiaohongshu_api.get_recommendations(keyword=query)
+            print(f"    → {len(suggestions) if suggestions else 0} 个推荐词")
+            return {
+                "query": query,
+                "suggestions": suggestions or [],
+                "timestamp": datetime.now().isoformat()
+            }
+
+    results = await asyncio.gather(*[get_single_sug(q) for q in queries])
+    return results
+
+
+async def evaluate_all_suggestions(
+    sug_results: list[dict],
+    original_question: str,
+    question_annotation: str,
+    context: RunContext
+) -> list[dict]:
+    """
+    评估所有推荐词(带并发控制)
+
+    Args:
+        sug_results: 所有query的推荐词结果
+        original_question: 原始问题
+        question_annotation: 问题标注(三层)
+
+    Returns:
+        [
+            {
+                "source_query": "川西秋季",
+                "sug_query": "川西秋季旅游",
+                "intent_match": True,
+                "relevance_score": 0.8,
+                "reason": "..."
+            },
+            ...
+        ]
+    """
+    print(f"\n{'='*60}")
+    print(f"评估推荐词(并发度:{MODEL_CONCURRENCY_LIMIT})")
+    print(f"{'='*60}")
+
+    # 创建信号量控制并发
+    semaphore = asyncio.Semaphore(MODEL_CONCURRENCY_LIMIT)
+
+    # 收集所有推荐词
+    all_evaluations = []
+
+    async def evaluate_single_sug(source_query: str, sug_query: str):
+        async with semaphore:
+            eval_input = f"""
+<原始问题>
+{original_question}
+</原始问题>
+
+<问题标注(三层)>
+{question_annotation}
+</问题标注(三层)>
+
+<待评估的推荐query>
+{sug_query}
+</待评估的推荐query>
+
+请评估该推荐query:
+1. intent_match: 意图是否匹配(true/false)
+2. relevance_score: 相关性分数(0-1)
+3. reason: 详细的评估理由
+
+评估时请参考问题标注中的[本质]、[硬]、[软]标记。
+"""
+            result = await Runner.run(evaluator, eval_input)
+            evaluation: RelevanceEvaluation = result.final_output
+            return {
+                "source_query": source_query,
+                "sug_query": sug_query,
+                "intent_match": evaluation.intent_match,
+                "relevance_score": evaluation.relevance_score,
+                "reason": evaluation.reason,
+            }
+
+    # 并发评估所有推荐词
+    tasks = []
+    for sug_result in sug_results:
+        source_query = sug_result["query"]
+        for sug in sug_result["suggestions"]:
+            tasks.append(evaluate_single_sug(source_query, sug))
+
+    if tasks:
+        print(f"  总共需要评估 {len(tasks)} 个推荐词...")
+        all_evaluations = await asyncio.gather(*tasks)
+
+    context.evaluation_results = all_evaluations
+    return all_evaluations
+
+
+def find_qualified_queries(evaluations: list[dict], min_relevance_score: float = 0.7) -> list[dict]:
+    """
+    查找所有合格的query
+
+    筛选标准:
+    1. intent_match = True(必须满足)
+    2. relevance_score >= min_relevance_score
+
+    返回:按 relevance_score 降序排列
+    """
+    qualified = [
+        e for e in evaluations
+        if e['intent_match'] is True and e['relevance_score'] >= min_relevance_score
+    ]
+
+    # 按relevance_score降序排列
+    return sorted(qualified, key=lambda x: x['relevance_score'], reverse=True)
+
+
+# ============================================================================
+# 主流程
+# ============================================================================
+
+async def combinatorial_search(context: RunContext, max_combination_size: int = 1) -> dict:
+    """
+    组合式搜索流程(带问题标注)
+
+    Args:
+        context: 运行上下文
+        max_combination_size: 最大组合词数(N),默认1
+
+    返回格式:
+    {
+        "success": True/False,
+        "results": [...],
+        "message": "..."
+    }
+    """
+
+    # 步骤1:标注问题(三层)
+    annotation = await annotate_question(context.q_with_context)
+    context.question_annotation = annotation
+
+    # 步骤2:分词
+    seg_result = await segment_text(context.q)
+    context.keywords = seg_result.words
+
+    # 步骤3:生成query组合
+    print(f"\n{'='*60}")
+    print(f"生成query组合(最大组合数:{max_combination_size})")
+    print(f"{'='*60}")
+    query_combinations = generate_query_combinations(context.keywords, max_combination_size)
+    context.query_combinations = query_combinations
+
+    # 步骤4:获取所有query的推荐词
+    all_queries = []
+    for level, queries in query_combinations.items():
+        all_queries.extend(queries)
+
+    sug_results = await fetch_suggestions_for_queries(all_queries, context)
+    context.all_sug_queries = sug_results
+
+    # 统计
+    total_sugs = sum(len(r["suggestions"]) for r in sug_results)
+    print(f"\n总共获取到 {total_sugs} 个推荐词")
+
+    # 步骤5:评估所有推荐词(使用原始问题和标注)
+    evaluations = await evaluate_all_suggestions(sug_results, context.q, annotation, context)
+
+    # 步骤6:筛选合格query
+    qualified = find_qualified_queries(evaluations, min_relevance_score=0.7)
+
+    if qualified:
+        return {
+            "success": True,
+            "results": qualified,
+            "message": f"找到 {len(qualified)} 个合格query(intent_match=True 且 relevance>=0.7)"
+        }
+
+    # 降低标准
+    acceptable = find_qualified_queries(evaluations, min_relevance_score=0.5)
+    if acceptable:
+        return {
+            "success": True,
+            "results": acceptable,
+            "message": f"找到 {len(acceptable)} 个可接受query(intent_match=True 且 relevance>=0.5)"
+        }
+
+    # 完全失败:返回所有intent_match=True的
+    intent_matched = [e for e in evaluations if e['intent_match'] is True]
+    if intent_matched:
+        intent_matched_sorted = sorted(intent_matched, key=lambda x: x['relevance_score'], reverse=True)
+        return {
+            "success": False,
+            "results": intent_matched_sorted[:10],  # 只返回前10个
+            "message": f"未找到高相关性query,但有 {len(intent_matched)} 个意图匹配的推荐词"
+        }
+
+    return {
+        "success": False,
+        "results": [],
+        "message": "未找到任何意图匹配的推荐词"
+    }
+
+
+# ============================================================================
+# 输出格式化
+# ============================================================================
+
+def format_output(optimization_result: dict, context: RunContext) -> str:
+    """格式化输出结果"""
+    results = optimization_result.get("results", [])
+
+    output = f"原始问题:{context.q}\n"
+    output += f"问题标注:{context.question_annotation}\n"
+    output += f"提取的关键词:{', '.join(context.keywords or [])}\n"
+    output += f"关键词数量:{len(context.keywords or [])}\n"
+    output += f"\nquery组合统计:\n"
+    for level, queries in context.query_combinations.items():
+        output += f"  - {level}: {len(queries)} 个\n"
+
+    # 统计信息
+    total_queries = sum(len(q) for q in context.query_combinations.values())
+    total_sugs = sum(len(r["suggestions"]) for r in context.all_sug_queries)
+    total_evals = len(context.evaluation_results)
+
+    output += f"\n探索统计:\n"
+    output += f"  - 总query数:{total_queries}\n"
+    output += f"  - 总推荐词数:{total_sugs}\n"
+    output += f"  - 总评估数:{total_evals}\n"
+
+    output += f"\n状态:{optimization_result['message']}\n\n"
+
+    if optimization_result["success"] and results:
+        output += "=" * 60 + "\n"
+        output += "合格的推荐query(按relevance_score降序):\n"
+        output += "=" * 60 + "\n"
+        for i, result in enumerate(results[:20], 1):  # 只显示前20个
+            output += f"\n{i}. [{result['relevance_score']:.2f}] {result['sug_query']}\n"
+            output += f"   来源:{result['source_query']}\n"
+            output += f"   意图:{'✓ 匹配' if result['intent_match'] else '✗ 不匹配'}\n"
+            output += f"   理由:{result['reason'][:150]}...\n" if len(result['reason']) > 150 else f"   理由:{result['reason']}\n"
+    else:
+        output += "=" * 60 + "\n"
+        output += "结果:未找到足够相关的推荐query\n"
+        output += "=" * 60 + "\n"
+        if results:
+            output += "\n最接近的推荐词(前10个):\n\n"
+            for i, result in enumerate(results[:10], 1):
+                output += f"{i}. [{result['relevance_score']:.2f}] {result['sug_query']}\n"
+                output += f"   来源:{result['source_query']}\n"
+                output += f"   意图:{'✓ 匹配' if result['intent_match'] else '✗ 不匹配'}\n\n"
+
+        # 按source_query分组显示
+        output += "\n" + "=" * 60 + "\n"
+        output += "按查询词分组的推荐词情况:\n"
+        output += "=" * 60 + "\n"
+
+        for sug_data in context.all_sug_queries:
+            source_q = sug_data["query"]
+            sugs = sug_data["suggestions"]
+
+            # 找到这个source_query对应的所有评估
+            related_evals = [e for e in context.evaluation_results if e["source_query"] == source_q]
+            intent_match_count = sum(1 for e in related_evals if e["intent_match"])
+            avg_relevance = sum(e["relevance_score"] for e in related_evals) / len(related_evals) if related_evals else 0
+
+            output += f"\n查询:{source_q}\n"
+            output += f"  推荐词数:{len(sugs)}\n"
+            output += f"  意图匹配数:{intent_match_count}/{len(related_evals)}\n"
+            output += f"  平均相关性:{avg_relevance:.2f}\n"
+
+            # 显示前3个推荐词
+            if sugs:
+                output += f"  示例推荐词:\n"
+                for sug in sugs[:3]:
+                    eval_item = next((e for e in related_evals if e["sug_query"] == sug), None)
+                    if eval_item:
+                        output += f"    - {sug} [意图:{'✓' if eval_item['intent_match'] else '✗'}, 相关:{eval_item['relevance_score']:.2f}]\n"
+                    else:
+                        output += f"    - {sug}\n"
+
+    return output.strip()
+
+
+# ============================================================================
+# 主函数
+# ============================================================================
+
+async def main(
+    input_dir: str,
+    max_combination_size: int = 1,
+    api_concurrency: int = API_CONCURRENCY_LIMIT,
+    model_concurrency: int = MODEL_CONCURRENCY_LIMIT
+):
+    # 更新全局并发配置
+    global API_CONCURRENCY_LIMIT, MODEL_CONCURRENCY_LIMIT
+    API_CONCURRENCY_LIMIT = api_concurrency
+    MODEL_CONCURRENCY_LIMIT = model_concurrency
+
+    current_time, log_url = set_trace()
+
+    # 从目录中读取固定文件名
+    input_context_file = os.path.join(input_dir, 'context.md')
+    input_q_file = os.path.join(input_dir, 'q.md')
+
+    q_context = read_file_as_string(input_context_file)
+    q = read_file_as_string(input_q_file)
+    q_with_context = f"""
+<需求上下文>
+{q_context}
+</需求上下文>
+<当前问题>
+{q}
+</当前问题>
+""".strip()
+
+    # 获取当前文件名作为版本
+    version = os.path.basename(__file__)
+    version_name = os.path.splitext(version)[0]
+
+    # 日志保存目录
+    log_dir = os.path.join(input_dir, "output", version_name, current_time)
+
+    run_context = RunContext(
+        version=version,
+        input_files={
+            "input_dir": input_dir,
+            "context_file": input_context_file,
+            "q_file": input_q_file,
+        },
+        q_with_context=q_with_context,
+        q_context=q_context,
+        q=q,
+        log_dir=log_dir,
+        log_url=log_url,
+    )
+
+    print(f"\n{'='*60}")
+    print(f"并发配置")
+    print(f"{'='*60}")
+    print(f"API请求并发度:{API_CONCURRENCY_LIMIT}")
+    print(f"模型评估并发度:{MODEL_CONCURRENCY_LIMIT}")
+
+    # 执行组合式搜索(带问题标注)
+    optimization_result = await combinatorial_search(run_context, max_combination_size=max_combination_size)
+
+    # 格式化输出
+    final_output = format_output(optimization_result, run_context)
+    print(f"\n{'='*60}")
+    print("最终结果")
+    print(f"{'='*60}")
+    print(final_output)
+
+    # 保存结果
+    run_context.optimization_result = optimization_result
+    run_context.final_output = final_output
+
+    # 保存 RunContext 到 log_dir
+    os.makedirs(run_context.log_dir, exist_ok=True)
+    context_file_path = os.path.join(run_context.log_dir, "run_context.json")
+    with open(context_file_path, "w", encoding="utf-8") as f:
+        json.dump(run_context.model_dump(), f, ensure_ascii=False, indent=2)
+    print(f"\nRunContext saved to: {context_file_path}")
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description="搜索query优化工具 - v6.3 组合式搜索+问题标注版",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例:
+  # 默认参数
+  python sug_v6_3_with_annotation.py
+
+  # 2词组合,API并发5,模型并发20
+  python sug_v6_3_with_annotation.py --max-combo 2 --api-concurrency 5 --model-concurrency 20
+
+  # 3词组合,降低并发度
+  python sug_v6_3_with_annotation.py --max-combo 3 --api-concurrency 3 --model-concurrency 10
+        """
+    )
+    parser.add_argument(
+        "--input-dir",
+        type=str,
+        default="input/简单扣图",
+        help="输入目录路径,默认: input/简单扣图"
+    )
+    parser.add_argument(
+        "--max-combo",
+        type=int,
+        default=1,
+        help="最大组合词数(N),默认: 1"
+    )
+    parser.add_argument(
+        "--api-concurrency",
+        type=int,
+        default=API_CONCURRENCY_LIMIT,
+        help=f"API请求并发度,默认: {API_CONCURRENCY_LIMIT}"
+    )
+    parser.add_argument(
+        "--model-concurrency",
+        type=int,
+        default=MODEL_CONCURRENCY_LIMIT,
+        help=f"模型评估并发度,默认: {MODEL_CONCURRENCY_LIMIT}"
+    )
+    args = parser.parse_args()
+
+    asyncio.run(main(
+        args.input_dir,
+        max_combination_size=args.max_combo,
+        api_concurrency=args.api_concurrency,
+        model_concurrency=args.model_concurrency
+    ))

+ 129 - 0
test_cache.py

@@ -0,0 +1,129 @@
+#!/usr/bin/env python3
+"""
+测试 XiaohongshuSearchRecommendations 缓存功能
+"""
+
+import sys
+import os
+import json
+import time
+
+# 添加脚本目录到路径
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'script', 'search_recommendations'))
+
+from xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
+
+
+def print_section(title):
+    """打印分隔符"""
+    print("\n" + "="*60)
+    print(f"  {title}")
+    print("="*60)
+
+
+def test_cache():
+    """测试缓存功能"""
+
+    # 创建客户端,设置较短的缓存时间用于测试(60秒)
+    print_section("1. 初始化客户端")
+    client = XiaohongshuSearchRecommendations(enable_cache=True, cache_ttl=60)
+    print(f"✓ 客户端初始化成功")
+    print(f"  - 缓存已启用: {client.enable_cache}")
+    print(f"  - 缓存有效期: {client.cache_ttl} 秒")
+    print(f"  - 结果目录: {client.results_base_dir}")
+
+    # 测试关键词
+    test_keyword = "川西"
+
+    # 第一次请求(应该从API获取)
+    print_section(f"2. 第一次请求关键词 '{test_keyword}'")
+    print("预期: 缓存未命中,从API获取数据")
+    start_time = time.time()
+    result1 = client.get_recommendations(test_keyword)
+    elapsed1 = time.time() - start_time
+    print(f"✓ 请求完成,耗时: {elapsed1:.2f} 秒")
+    print(f"  - 获取到 {len(result1)} 条推荐词")
+    if result1:
+        print(f"  - 示例: {result1[:3]}")
+
+    # 保存结果到文件(模拟正常使用流程)
+    if result1:
+        filepath = client.save_result(test_keyword, result1)
+        print(f"✓ 结果已保存到: {filepath}")
+
+    # 查看缓存信息
+    print_section("3. 查看缓存信息")
+    cache_info = client.get_cache_info(test_keyword)
+    print("内存缓存:")
+    print(json.dumps(cache_info["memory_cache"], ensure_ascii=False, indent=2))
+    print("\n文件缓存:")
+    print(json.dumps(cache_info["file_cache"], ensure_ascii=False, indent=2))
+
+    # 第二次请求(应该从内存缓存获取)
+    print_section(f"4. 第二次请求关键词 '{test_keyword}'(内存缓存)")
+    print("预期: 从内存缓存获取")
+    start_time = time.time()
+    result2 = client.get_recommendations(test_keyword)
+    elapsed2 = time.time() - start_time
+    print(f"✓ 请求完成,耗时: {elapsed2:.2f} 秒")
+    print(f"  - 获取到 {len(result2)} 条推荐词")
+    print(f"  - 速度提升: {(elapsed1/elapsed2):.1f}x")
+
+    # 验证结果一致性
+    if result1 == result2:
+        print("✓ 缓存数据与原始数据一致")
+    else:
+        print("✗ 警告: 缓存数据与原始数据不一致")
+
+    # 清除内存缓存,测试文件缓存
+    print_section(f"5. 清除内存缓存,测试文件缓存")
+    client.clear_memory_cache(test_keyword)
+    print("✓ 内存缓存已清除")
+
+    # 第三次请求(应该从文件缓存获取)
+    print_section(f"6. 第三次请求关键词 '{test_keyword}'(文件缓存)")
+    print("预期: 从文件缓存获取")
+    start_time = time.time()
+    result3 = client.get_recommendations(test_keyword)
+    elapsed3 = time.time() - start_time
+    print(f"✓ 请求完成,耗时: {elapsed3:.2f} 秒")
+    print(f"  - 获取到 {len(result3)} 条推荐词")
+
+    # 验证结果一致性
+    if result1 == result3:
+        print("✓ 文件缓存数据与原始数据一致")
+    else:
+        print("✗ 警告: 文件缓存数据与原始数据不一致")
+
+    # 测试禁用缓存
+    print_section(f"7. 测试禁用缓存(use_cache=False)")
+    print("预期: 直接从API获取,不使用缓存")
+    start_time = time.time()
+    result4 = client.get_recommendations(test_keyword, use_cache=False)
+    elapsed4 = time.time() - start_time
+    print(f"✓ 请求完成,耗时: {elapsed4:.2f} 秒")
+    print(f"  - 获取到 {len(result4)} 条推荐词")
+
+    # 查看最终缓存状态
+    print_section("8. 最终缓存状态")
+    cache_info = client.get_cache_info()
+    print("所有内存缓存:")
+    for keyword, info in cache_info["memory_cache"].items():
+        print(f"  - {keyword}: {info}")
+    print("\n所有文件缓存:")
+    for keyword, info in cache_info["file_cache"].items():
+        print(f"  - {keyword}: {info}")
+
+    print_section("测试完成")
+    print("✓ 所有缓存功能测试通过")
+
+
+if __name__ == "__main__":
+    try:
+        test_cache()
+    except KeyboardInterrupt:
+        print("\n\n测试被用户中断")
+    except Exception as e:
+        print(f"\n\n✗ 测试失败: {e}")
+        import traceback
+        traceback.print_exc()