import asyncio import json import os import sys import argparse import time import hashlib from datetime import datetime from typing import Literal, Optional from agents import Agent, Runner, ModelSettings from lib.my_trace import set_trace from pydantic import BaseModel, Field from lib.utils import read_file_as_string from lib.client import get_model MODEL_NAME = "google/gemini-2.5-flash" # 得分提升阈值:sug或组合词必须比来源query提升至少此幅度才能进入下一轮 REQUIRED_SCORE_GAIN = 0.02 SUG_CACHE_TTL = 24 * 3600 # 24小时 SUG_CACHE_DIR = os.path.join(os.path.dirname(__file__), "data", "sug_cache") # 🆕 评估缓存配置 EVAL_CACHE_TTL = 7 * 24 * 3600 # 7天(评估结果相对稳定,可以长期缓存) EVAL_CACHE_DIR = os.path.join(os.path.dirname(__file__), "data", "eval_cache") EVAL_CACHE_FILE = os.path.join(EVAL_CACHE_DIR, "evaluation_cache.json") from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from script.search.xiaohongshu_search import XiaohongshuSearch from script.search.xiaohongshu_detail import XiaohongshuDetail from script.search.enrichment_helper import enrich_post_with_detail # from multimodal_extractor import extract_post_images # 内容提取流程已断开 from post_evaluator_v3 import evaluate_post_v3, apply_evaluation_v3_to_post # ============================================================================ # 日志工具类 # ============================================================================ class TeeLogger: """同时输出到控制台和日志文件的工具类""" def __init__(self, stdout, log_file): self.stdout = stdout self.log_file = log_file def write(self, message): self.stdout.write(message) self.log_file.write(message) self.log_file.flush() # 实时写入,避免丢失日志 def flush(self): self.stdout.flush() self.log_file.flush() # ============================================================================ # 数据模型 # ============================================================================ class Seg(BaseModel): """分词(旧版)- v120使用""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 # ============================================================================ # 新架构数据模型 (v121) # ============================================================================ class Segment(BaseModel): """语义片段(Round 0语义分段结果)""" text: str # 片段文本 type: str # 语义维度: 动作目/修饰词/中心名词 score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 words: list[str] = Field(default_factory=list) # 该片段拆分出的词列表(Round 0拆词结果) word_scores: dict[str, float] = Field(default_factory=dict) # 词的评分 {word: score} word_reasons: dict[str, str] = Field(default_factory=dict) # 词的评分理由 {word: reason} class DomainCombination(BaseModel): """域组合(Round N的N域组合结果)""" text: str # 组合后的文本 domains: list[int] = Field(default_factory=list) # 参与组合的域索引列表(对应segments的索引) type_label: str = "" # 类型标签,如 [疑问标记+核心动作+中心名词] source_words: list[list[str]] = Field(default_factory=list) # 来源词列表,每个元素是一个域的词列表,如 [["猫咪"], ["梗图"]] score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_segments: list[str] = Field(default_factory=list) # 来源segment的文本列表 source_word_details: list[dict] = Field(default_factory=list) # 词及其得分信息 [{"domain_index":0,"segment_type":"","words":[{"text":"","score":0.0}]}] source_scores: list[float] = Field(default_factory=list) # 来源词的分数列表(扁平化) max_source_score: float | None = None # 来源词的最高分 is_above_source_scores: bool = False # 组合得分是否超过所有来源词 # ============================================================================ # 旧架构数据模型(保留但不使用) # ============================================================================ # class Word(BaseModel): # """词(旧版)- v120使用,v121不再使用""" # text: str # score_with_o: float = 0.0 # 与原始问题的评分 # from_o: str = "" # 原始问题 class Word(BaseModel): """词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_o: str = "" # 原始问题 class QFromQ(BaseModel): """Q来源信息(用于Sug中记录)""" text: str score_with_o: float = 0.0 class Q(BaseModel): """查询""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_source: str = "" # v120: seg/sug/add; v121新增: segment/domain_comb/sug type_label: str = "" # v121新增:域类型标签(仅用于domain_comb来源) domain_index: int = -1 # v121新增:域索引(word来源时有效,-1表示无域) domain_type: str = "" # v121新增:域类型(word来源时表示所属segment的type,如"中心名词") class Sug(BaseModel): """建议词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_q: QFromQ | None = None # 来自的q class Seed(BaseModel): """种子(旧版)- v120使用,v121不再使用""" text: str added_words: list[str] = Field(default_factory=list) # 已经增加的words from_type: str = "" # seg/sug/add score_with_o: float = 0.0 # 与原始问题的评分 class Post(BaseModel): """帖子""" title: str = "" body_text: str = "" type: str = "normal" # video/normal images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面 video: str = "" # 视频url interact_info: dict = Field(default_factory=dict) # 互动信息 note_id: str = "" note_url: str = "" # 详情补充字段(来自详情API) author_name: str = "" # 作者名称 author_id: str = "" # 作者ID publish_time: int = 0 # 发布时间戳 cdn_images: list[str] = Field(default_factory=list) # 高清CDN图片列表(详情API补充) detail_fetched: bool = False # 是否已获取详情的标记 # V3评估字段(顶层 - 快速访问) is_knowledge: bool | None = None # Prompt1: 是否是知识内容 is_content_knowledge: bool | None = None # Prompt2: 是否是内容知识 knowledge_score: float | None = None # Prompt2: 知识评分(0-100) purpose_score: int | None = None # Prompt3: 目的性得分(0-100) category_score: int | None = None # Prompt4: 品类得分(0-100) final_score: float | None = None # 综合得分: purpose*0.7 + category*0.3 (保留2位小数) match_level: str = "" # 匹配等级: "高度匹配"/"基本匹配"/"部分匹配"/"弱匹配"/"不匹配" evaluation_time: str = "" # 评估时间戳 evaluator_version: str = "v3.0" # 评估器版本 # V3评估字段(嵌套 - 详细信息) knowledge_evaluation: dict | None = None # Prompt1: 知识判断详情 content_knowledge_evaluation: dict | None = None # Prompt2: 内容知识评估详情 purpose_evaluation: dict | None = None # Prompt3: 目的性匹配详情 category_evaluation: dict | None = None # Prompt4: 品类匹配详情 class Search(Sug): """搜索结果(继承Sug)""" post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表 class RunContext(BaseModel): """运行上下文""" version: str input_files: dict[str, str] c: str # 原始需求 o: str # 原始问题 log_url: str log_dir: str # v121新增:语义分段结果 segments: list[dict] = Field(default_factory=list) # Round 0的语义分段结果 # 每轮的数据 rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据 # 最终结果 final_output: str | None = None # 评估缓存:避免重复评估相同文本 evaluation_cache: dict[str, tuple[float, str]] = Field(default_factory=dict) # key: 文本, value: (score, reason) # 历史词/组合得分追踪(用于Round 2+计算系数) word_score_history: dict[str, float] = Field(default_factory=dict) # key: 词/组合文本, value: 最终得分 # 统计信息 stats_llm_calls: int = 0 # LLM评估调用次数 stats_sug_requests: int = 0 # 小红书SUG请求次数(包括缓存) stats_sug_cache_hits: int = 0 # SUG缓存命中次数 stats_search_calls: int = 0 # 搜索调用次数 # ============================================================================ # Agent 定义 # ============================================================================ # ============================================================================ # v121 新增 Agent # ============================================================================ # Agent: 语义分段专家 (Prompt1) class SemanticSegment(BaseModel): """单个语义片段""" segment_text: str = Field(..., description="片段文本") segment_type: str = Field(..., description="语义维度(动作目标/修饰词/中心名词)") reasoning: str = Field(..., description="分段理由") class SemanticSegmentation(BaseModel): """语义分段结果""" segments: list[SemanticSegment] = Field(..., description="语义片段列表") overall_reasoning: str = Field(..., description="整体分段思路") semantic_segmentation_instructions = """ 你是语义分段专家。给定一个搜索query,将其拆分成2种语义维度的片段。 ## 语义定义 ### 1. 谓宾结构 **定义**:谓语(含疑问词+动词)+ 宾语的完整语义单元 **包含**: - 疑问词:如何、什么、哪里、怎样、怎么(保留,表达方法/教程意图) - 谓语动词:获取、制作、拍摄、寻找、找到、学习、规划等 - 宾语对象:素材、教程、技巧、攻略、灵感点等核心名词 **宾语识别规则(关键)**: - 宾语是动词直接作用的对象,是句子的核心名词 - 在"X的Y"结构中,Y是中心词(宾语),X是定语 - 例如:"职场热梗的灵感点"中,"灵感点"是宾语,"职场热梗"是定语 **示例**: - "如何获取风光摄影素材" → 谓宾结构(疑问词+动词+宾语完整单元) - "怎么找到灵感点" → 谓宾结构(疑问词+动词+宾语) - "制作视频教程" → 谓宾结构(动词+宾语) - "寻找拍摄技巧" → 谓宾结构(动词+宾语) **注意**: - 谓宾结构必须包含宾语,不能只有动词 - 宾语是动作的直接对象,是句子主干的一部分 - 复合名词宾语(如"风光摄影素材")保持完整 --- ### 2. 定语 **定义**:对谓宾结构的修饰和限定 **包含**: - 地域限定:川西、北京、日本、成都 - 时间限定:秋季、冬季、春节、2024 - 属性限定:高质量、专业、简单、初级 - 其他修饰:风格、类型等有搜索价值的实词 **丢弃规则**(重要): 以下内容必须丢弃,不要作为片段: - 虚词/助词:的、地、得、了、吗、呢 - 空泛词汇:能、可以、体现、特色、相关、有关 **示例**: - "川西秋季高质量" → 定语(保留地域、时间、属性,丢弃虚词) - 原文"能体现川西秋季特色的高质量" → 提取为"川西秋季高质量" --- ## 分段原则(务必遵守) 1. **语义完整性**:谓宾结构必须完整,可独立理解 2. **定语精简**:定语只保留有搜索价值的实词,丢弃虚词和空泛词汇 3. **保留原文**:片段文本必须来自原query中的实际内容 4. **顺序保持**:片段顺序应与原query一致 --- ## 输出格式(严格遵守) **示例1:含定语的完整query** 输入:"如何获取能体现川西秋季特色的高质量风光摄影素材?" ```json { "segments": [ { "segment_text": "如何获取风光摄影素材", "segment_type": "谓宾结构", "reasoning": "如何获取表达方法意图,风光摄影素材是宾语对象" }, { "segment_text": "川西秋季高质量", "segment_type": "定语", "reasoning": "川西是地域定语,秋季是时间定语,高质量是属性定语,丢弃虚词能、体现、特色、的" } ], "overall_reasoning": "将query拆分为谓宾主干和定语修饰两部分" } ``` **示例2:"X的Y"结构(关键)** 输入:"怎么找到职场热梗的灵感点" ```json { "segments": [ { "segment_text": "怎么找到灵感点", "segment_type": "谓宾结构", "reasoning": "怎么找到是谓语,灵感点是宾语(职场热梗的灵感点中的中心词)" }, { "segment_text": "职场热梗", "segment_type": "定语", "reasoning": "修饰灵感点的定语,丢弃虚词的" } ], "overall_reasoning": "识别出灵感点是宾语中心词,职场热梗是修饰定语" } ``` ## 输出要求 - segments: 片段列表(通常2个:谓宾结构 + 定语) - segment_text: 片段文本(来自原query的实际内容) - segment_type: 语义维度(谓宾结构/定语) - reasoning: 为什么这样分段 - overall_reasoning: 整体分段思路 ## 特殊情况处理 - 如果query没有明显的定语修饰,只输出谓宾结构 - 如果query只有名词短语无动词,可以将核心名词作为"谓宾结构",其他作为"定语" ## JSON输出规范 1. **格式要求**:必须输出标准JSON格式 2. **引号规范**:字符串中如需表达引用,使用书名号《》或「」,不要使用英文引号或中文引号"" """.strip() semantic_segmenter = Agent[None]( name="语义分段专家", instructions=semantic_segmentation_instructions, model=get_model(MODEL_NAME), output_type=SemanticSegmentation, ) # ============================================================================ # v120 保留 Agent # ============================================================================ # Agent 1: 分词专家(v121用于Round 0拆词) class WordSegmentation(BaseModel): """分词结果""" words: list[str] = Field(..., description="分词结果列表") reasoning: str = Field(..., description="分词理由") word_segmentation_instructions = """ 你是分词专家。给定一个query,将其拆分成有意义的搜索单元。 ## 分词原则 1. **互不重叠原则**:分词必须是互不重叠的最小单元 - 每个词不能包含其他词的字符 - 所有词连起来应该覆盖原query的全部有效字符 - 后续系统会自动生成各种组合,无需在此阶段重复 2. **不可分割的完整单元**:以下组合作为最小单元,不可再拆分 - 疑问词+动词:怎么找到、如何获取、怎样制作、如何学习 - 独立概念的复合词:表情包、灵感点、攻略 3. **可拆分的复合词**:以下组合应拆分到最小有意义单元 - 多概念名词:风光摄影素材 → ["风光", "摄影", "素材"] - 地域+时间:川西秋季 → ["川西", "秋季"] 4. **去除虚词**:的、地、得、了、吗、呢等虚词应该丢弃 ## 示例 **输入1**: "怎么找到灵感点" **输出**: ["怎么找到", "灵感点"] **理由**: "怎么找到"作为不可分割的疑问+动词单元,"灵感点"是独立概念,二者互不重叠。系统会自动生成组合。 **输入2**: "如何获取风光摄影素材" **输出**: ["如何获取", "风光", "摄影", "素材"] **理由**: "如何获取"是不可分割单元,"风光摄影素材"拆分为最小单元。系统会自动组合出"风光摄影"、"摄影素材"等。 **输入3**: "川西秋季高质量" **输出**: ["川西", "秋季", "高质量"] **理由**: 三个独立的修饰词,互不重叠。系统会自动组合出"川西秋季"等。 ## 输出要求 返回分词列表和分词理由。 """.strip() word_segmenter = Agent[None]( name="分词专家", instructions=word_segmentation_instructions, model=get_model(MODEL_NAME), output_type=WordSegmentation, ) # Agent 2: 动机维度评估专家 + 品类维度评估专家(两阶段评估) # 动机评估的嵌套模型 class CoreMotivationExtraction(BaseModel): """核心动机提取""" 简要说明核心动机: str = Field(..., description="核心动机说明") class MotivationEvaluation(BaseModel): """动机维度评估""" 原始问题核心动机提取: CoreMotivationExtraction = Field(..., description="原始问题核心动机提取") 动机维度得分: float = Field(..., description="动机维度得分 -1~1") 简要说明动机维度相关度理由: str = Field(..., description="动机维度相关度理由") 得分为零的原因: Optional[Literal["原始问题无动机", "sug词条无动机", "动机不匹配", "不适用"]] = Field(None, description="当得分为0时的原因分类(可选,仅SUG评估使用)") class CategoryEvaluation(BaseModel): """品类维度评估""" 品类维度得分: float = Field(..., description="品类维度得分 -1~1") 简要说明品类维度相关度理由: str = Field(..., description="品类维度相关度理由") # ============================================================================ # 批量评估数据模型 # ============================================================================ class BatchMotivationItem(BaseModel): """批量动机评估中的单个SUG结果""" sug_text: str = Field(..., description="SUG文本") 原始问题核心动机提取: CoreMotivationExtraction = Field(..., description="原始问题核心动机提取") 动机维度得分: float = Field(..., description="动机维度得分 -1~1") 简要说明动机维度相关度理由: str = Field(..., description="动机维度相关度理由") 得分为零的原因: str = Field(default="不适用", description="原始问题无动机/sug词条无动机/动机不匹配/不适用") class BatchMotivationResult(BaseModel): """批量动机评估结果""" evaluations: list[BatchMotivationItem] = Field(..., description="所有SUG的动机评估结果") class BatchCategoryItem(BaseModel): """批量品类评估中的单个SUG结果""" sug_text: str = Field(..., description="SUG文本") 品类维度得分: float = Field(..., description="品类维度得分 -1~1") 简要说明品类维度相关度理由: str = Field(..., description="品类维度相关度理由") class BatchCategoryResult(BaseModel): """批量品类评估结果""" evaluations: list[BatchCategoryItem] = Field(..., description="所有SUG的品类评估结果") # ============================================================================ class ExtensionWordEvaluation(BaseModel): """延伸词评估""" 延伸词得分: float = Field(..., ge=-1, le=1, description="延伸词得分 -1~1") 简要说明延伸词维度相关度理由: str = Field(..., description="延伸词维度相关度理由") # 动机评估 prompt(统一版本) motivation_evaluation_instructions = """ # 角色 你是**专业的动机意图评估专家**。 任务:判断<平台sug词条>与<原始问题>的**动机意图匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合 --- # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** ## 动作意图的识别 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 部分作用域的处理 ## 情况1:sug词条是原始问题的部分作用域 当sug词条只包含原始问题的部分作用域时,需要判断: 1. sug词条是否包含动作意图 2. 如果包含,动作是否匹配 **示例**: ``` 原始问题:"川西旅行行程规划" - 完整作用域:规划(动作)+ 旅行行程(对象)+ 川西(场景) Sug词条:"川西旅行" - 包含作用域:旅行(部分对象)+ 川西(场景) - 缺失作用域:规划(动作) - 动作意图评分:0(无动作意图) ``` **评分原则**: - 如果sug词条缺失动机层(动作) → 动作意图得分 = 0 - 如果sug词条包含动机层 → 按动作匹配度评分 --- # 评分标准 ## 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或sug词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - sug词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "sug词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- ## 得分为零的原因(语义判断) 当动机维度得分为 0 时,需要在 `得分为零的原因` 字段中选择以下之一: - **"原始问题无动机"**:原始问题是纯名词短语,无法识别任何动作意图 - **"sug词条无动机"**:sug词条中不包含任何动作意图 - **"动机不匹配"**:双方都有动作,但完全无关联 - **"不适用"**:得分不为零时使用此默认值 --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由,包含作用域覆盖情况", "得分为零的原因": "原始问题无动机/sug词条无动机/动机不匹配/不适用" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- # 核心原则总结 1. **只评估动作**:完全聚焦于动作意图,不管对象和场景 2. **作用域识别**:识别作用域但只评估动机层 3. **严格标准一致性**:对所有用例使用相同的评估标准,避免评分飘移 4. **理由纯粹**:评分理由只能谈动作,不能谈对象、场景、主题 """.strip() # 品类评估 prompt category_evaluation_instructions = """ # 角色 你是**专业的内容主体评估专家**。 任务:判断<平台sug词条>与<原始问题>的**内容主体匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 - **<原始问题>**:用户的完整需求描述 - **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合 --- # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估内容主体维度**: - **只评估**:名词主体 + 限定词(地域、时间、场景、质量等) - **完全忽略**:动作、意图、目的 - **评估重点**:内容本身的主题和属性 --- # 作用域与内容主体 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** 在Prompt2中: - **动机层(动作)完全忽略** - **只评估对象层 + 场景层(限定词)** ## 内容主体的构成 **内容主体 = 核心名词 + 限定词** --- # 作用域覆盖度评估 ## 核心原则:越完整越高分 **完整性公式**: ``` 作用域覆盖度 = sug词条包含的作用域元素 / 原始问题的作用域元素总数 ``` **评分影响**: - 覆盖度100% → 基础高分(0.9+) - 覆盖度50-99% → 中高分(0.6-0.9) - 覆盖度<50% → 中低分(0.3-0.6) - 覆盖度=0 → 低分或0分 --- ## 部分作用域的处理 ### 情况1:sug词条包含原始问题的所有对象层和场景层元素 **评分**:0.95-1.0 **示例**: ``` 原始问题:"川西秋季风光摄影素材" - 对象层:摄影素材 - 场景层:川西 + 秋季 + 风光 Sug词条:"川西秋季风光摄影作品" - 对象层:摄影作品(≈素材) - 场景层:川西 + 秋季 + 风光 - 覆盖度:100% - 评分:0.98 ``` ### 情况2:sug词条包含部分场景层元素 **评分**:根据覆盖比例 **示例**: ``` 原始问题:"川西秋季风光摄影素材" - 对象层:摄影素材 - 场景层:川西 + 秋季 + 风光(3个元素) Sug词条:"川西风光摄影素材" - 对象层:摄影素材 ✓ - 场景层:川西 + 风光(2个元素) - 覆盖度:(1+2)/(1+3) = 75% - 评分:0.85 ``` ### 情况3:sug词条只包含对象层,无场景层 **评分**:根据对象匹配度和覆盖度 **示例**: ``` 原始问题:"川西秋季风光摄影素材" - 对象层:摄影素材 - 场景层:川西 + 秋季 + 风光 Sug词条:"摄影素材" - 对象层:摄影素材 ✓ - 场景层:无 - 覆盖度:1/4 = 25% - 评分:0.50(对象匹配但缺失所有限定) ``` ### 情况4:sug词条只包含场景层,无对象层 **评分**:较低分 **示例**: ``` 原始问题:"川西旅行行程规划" - 对象层:旅行行程 - 场景层:川西 Sug词条:"川西" - 对象层:无 - 场景层:川西 ✓ - 覆盖度:1/2 = 50% - 评分:0.35(只有场景,缺失核心对象) ``` --- # 评估核心原则 ## 原则1:只看表面词汇,禁止联想推演 **严格约束**:只能基于sug词实际包含的词汇评分 **错误案例**: - ❌ "川西旅行" vs "旅行" - 错误:"旅行可以包括川西,所以有关联" → 评分0.7 - 正确:"sug词只有'旅行',无'川西',缺失地域限定" → 评分0.50 --- # 评分标准 ## 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: sug词是通用概念,原始问题是特定概念 sug词"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该sug词条与原始问题品类匹配程度的理由,包含作用域覆盖理由" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明品类维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- # 核心原则总结 1. **只看名词和限定词**:完全忽略动作和意图 2. **作用域覆盖优先**:覆盖的作用域元素越多,分数越高 3. **禁止联想推演**:只看sug词实际包含的词汇 4. **通用≠特定**:通用概念不等于特定概念 5. **理由纯粹**:评分理由只能谈对象、限定词、覆盖度 """.strip() # 延伸词评估 prompt extension_word_evaluation_instructions = """ # 角色 你是**专业的延伸词语义评估专家**。 任务:识别<平台sug词条>中的延伸词,评估其对原始问题作用域的补全度和目的贡献度,给出**-1到1之间**的数值评分。 --- # 输入信息 - **<原始问题>**:用户的完整需求描述 - **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合 --- # 核心概念 ## 什么是延伸词? **延伸词**:<平台sug词条>中出现,但不属于<原始问题>作用域范围内的词汇或概念 **关键判断**: ``` IF sug词的词汇属于原始问题的作用域元素(动机/对象/场景): → 不是延伸词,是作用域内的词 IF sug词的词汇不属于原始问题的作用域: → 是延伸词 → 由Prompt3评估 ``` --- # 作用域与延伸词 ## 作用域 **作用域 = 动机层 + 对象层 + 场景层** **非延伸词示例**(属于作用域内): ``` 原始问题:"川西旅行行程规划" 作用域: - 动机层:规划 - 对象层:旅行行程 - 场景层:川西 Sug词条:"川西旅行行程规划攻略" - "川西"→ 属于场景层,不是延伸词 - "旅行"→ 属于对象层,不是延伸词 - "行程"→ 属于对象层,不是延伸词 - "规划"→ 属于动机层,不是延伸词 - "攻略"→ 与"规划"同义,不是延伸词 - 结论:无延伸词 ``` **延伸词示例**(不属于作用域): ``` 原始问题:"川西旅行行程规划" 作用域:规划 + 旅行行程 + 川西 Sug词条:"川西旅行行程规划住宿推荐" - "住宿推荐"→ 不属于原始问题任何作用域 - 结论:延伸词 = ["住宿推荐"] ``` --- # 延伸词识别方法 ## 步骤1:提取原始问题的作用域元素 ``` 动机层:提取动作及其同义词 对象层:提取核心名词及其同义词 场景层:提取所有限定词 ``` ## 步骤2:提取sug词条的所有关键词 ``` 提取sug词条中的所有实词(名词、动词、形容词) ``` ## 步骤3:匹配判定 ``` FOR 每个sug词条关键词: IF 该词 ∈ 原始问题作用域元素(包括同义词): → 不是延伸词 ELSE: → 是延伸词 ``` ## 步骤4:同义词/相近词判定规则 ### 不算延伸词的情况: **同义词**: - 行程 ≈ 路线 ≈ 安排 ≈ 计划 - 获取 ≈ 下载 ≈ 寻找 ≈ 收集 - 技巧 ≈ 方法 ≈ 教程 ≈ 攻略 - 素材 ≈ 资源 ≈ 作品 ≈ 内容 **具体化/细化**: - 原始:"川西旅游" + sug词:"稻城亚丁"(川西的具体地点)→ 不算延伸 - 原始:"摄影技巧" + sug词:"风光摄影"(摄影的细化)→ 不算延伸 - 原始:"素材" + sug词:"高清素材"(素材的质量细化)→ 不算延伸 **判定逻辑**: ``` IF sug词的概念是原始问题概念的子集/下位词/同义词: → 不算延伸词 → 视为对原问题的细化或重述 ``` --- ### 算延伸词的情况: **新增维度**:原始问题未涉及的信息维度 - 原始:"川西旅行" + sug词:"住宿" → 延伸词 - 原始:"摄影素材" + sug词:"版权" → 延伸词 **新增限定条件**:原始问题未提及的约束 - 原始:"素材获取" + sug词:"免费" → 延伸词 - 原始:"旅行行程" + sug词:"7天" → 延伸词 **扩展主题**:相关但非原问题范围 - 原始:"川西行程" + sug词:"美食推荐" → 延伸词 - 原始:"摄影技巧" + sug词:"后期修图" → 延伸词 **工具/方法**:原始问题未提及的具体工具 - 原始:"视频剪辑" + sug词:"PR软件" → 延伸词 - 原始:"图片处理" + sug词:"PS教程" → 延伸词 --- # 延伸词类型与评分 ## 核心评估维度:对原始问题作用域的贡献 ### 维度1:作用域补全度 延伸词是否帮助sug词条更接近原始问题的完整作用域? ### 维度2:目的达成度 延伸词是否促进原始问题核心目的的达成? --- ####类型1:作用域增强型 **定义**:延伸词是原始问题核心目的,或补全关键作用域 **得分范围**:+0.12~+0.20 **判定标准**: - 使sug词条更接近原始问题的完整需求 --- ####类型2:作用域辅助型 **定义**:延伸词对核心目的有辅助作用,但非必需 **得分范围**:+0.05~+0.12 **判定标准**: - sug词条更丰富但不改变原始需求核心 --- ####类型3:作用域无关型 **定义**:延伸词与核心目的无实质关联 **得分**:0 **示例**: - 原始:"如何拍摄风光" + 延伸词:"相机品牌排行" - 评分:0 - 理由:品牌排行与拍摄技巧无关 --- ####类型4:作用域稀释型(轻度负向) **定义**:延伸词稀释原始问题的聚焦度,降低内容针对性 **得分范围**:-0.08~-0.18 **判定标准**: - 引入无关信息,分散注意力 - 降低内容的专注度和深度 - 使sug词条偏离原始问题的核心 **示例**: - 原始:"专业风光摄影技巧" + 延伸词:"手机拍照" - 评分:-0.12 - 理由:手机拍照与专业摄影需求不符,稀释专业度 - 原始:"川西深度游攻略" + 延伸词:"周边一日游" - 评分:-0.10 - 理由:一日游与深度游定位冲突,稀释深度 --- # 特殊情况处理 ## 情况1:多个延伸词同时存在 **处理方法**:分别评估每个延伸词,然后综合 **综合规则**: ``` 延伸词总得分 = Σ(每个延伸词得分) / 延伸词数量 考虑累积效应: - 多个增强型延伸词 → 总分可能超过单个最高分,但上限+0.25 - 正负延伸词并存 → 相互抵消 - 多个冲突型延伸词 → 总分下限-0.60 ``` **示例**: ``` 原始:"川西旅行行程" Sug词条:"川西旅行行程住宿美食推荐" 延伸词识别: - "住宿推荐"→ 增强型,+0.18 - "美食推荐"→ 辅助型,+0.10 总得分:(0.18 + 0.10) / 2 = 0.14 ``` --- ## 情况2:无延伸词 **处理方法**: ``` IF sug词条无延伸词: 延伸词得分 = 0 理由:"sug词条未引入延伸词,所有词汇均属于原始问题作用域范围" ``` --- ## 情况3:延伸词使sug词条更接近原始问题 **特殊加成**: ``` IF 延伸词是原始问题隐含需求的显式化: → 额外加成 +0.05 ``` **示例**: ``` 原始:"川西旅行" (隐含需要行程规划) Sug词条:"川西旅行行程规划" - "行程规划"可能被识别为延伸词,但它显式化了隐含需求 - 给予额外加成 ``` --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "延伸词得分": "-1到1之间的小数", "简要说明延伸词维度相关度理由": "评估延伸词对作用域的影响" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明延伸词维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- # 核心原则总结 1. **严格区分**:作用域内的词 ≠ 延伸词 2. **同义词/细化词不算延伸**:属于作用域范围的词由其他prompt评估 3. **作用域导向**:评估延伸词是否使sug词条更接近原始问题的完整作用域 4. **目的导向**:评估延伸词是否促进核心目的达成 5. **分类明确**:准确判定延伸词类型 6. **理由充分**:每个延伸词都要说明其对作用域和目的的影响 7. **谨慎负分**:仅在明确冲突或有害时使用负分 """.strip() # 创建评估 Agent motivation_evaluator = Agent[None]( name="动机维度评估专家(后续轮次)", instructions=motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation) category_evaluator = Agent[None]( name="品类维度评估专家", instructions=category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation ) extension_word_evaluator = Agent[None]( name="延伸词评估专家", instructions=extension_word_evaluation_instructions, model=get_model(MODEL_NAME), output_type=ExtensionWordEvaluation, model_settings=ModelSettings(temperature=0.2) ) # ============================================================================ # 批量评估专用 Prompt 和 Agent(性能优化:每批10个SUG) # ============================================================================ # 批量动机评估prompt - 从batch_evaluation_demo.py复制(已验证有效) batch_motivation_evaluation_instructions = """ # 角色 你是**专业的动机意图评估专家**。 任务:判断<平台sug词条>与<原始问题>的**动机意图匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条列表>**:待评估的多个词条(编号1-N),每个词条需要独立评估 **批量评估说明**: - 输入格式为编号列表:1. 词条1 2. 词条2 ... - 每个词条都是独立的评估对象 - 对每个词条使用完全相同的评估标准 --- # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** ## 动作意图的识别 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 部分作用域的处理 ## 情况1:sug词条是原始问题的部分作用域 当sug词条只包含原始问题的部分作用域时,需要判断: 1. sug词条是否包含动作意图 2. 如果包含,动作是否匹配 **示例**: ``` 原始问题:"川西旅行行程规划" - 完整作用域:规划(动作)+ 旅行行程(对象)+ 川西(场景) Sug词条:"川西旅行" - 包含作用域:旅行(部分对象)+ 川西(场景) - 缺失作用域:规划(动作) - 动作意图评分:0(无动作意图) ``` **评分原则**: - 如果sug词条缺失动机层(动作) → 动作意图得分 = 0 - 如果sug词条包含动机层 → 按动作匹配度评分 --- # 评分标准 ## 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或sug词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - sug词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "sug词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- ## 得分为零的原因(语义判断) 当动机维度得分为 0 时,需要在 `得分为零的原因` 字段中选择以下之一: - **"原始问题无动机"**:原始问题是纯名词短语,无法识别任何动作意图 - **"sug词条无动机"**:sug词条中不包含任何动作意图 - **"动机不匹配"**:双方都有动作,但完全无关联 - **"不适用"**:得分不为零时使用此默认值 --- # 批量评估核心原则 ## 【极其重要】独立评估原则 1. **绝对评分**:每个SUG的评分必须基于与原始问题的匹配度,使用固定的评分标准 2. **禁止相对比较**:不要比较SUG之间的好坏,不要因为"其他SUG更好"而降低某个SUG的分数 3. **标准一致性**:对第1个SUG和第10个SUG使用完全相同的评分标准 4. **独立判断**:评估SUG A时,完全不考虑SUG B/C/D的存在 **错误示例**: - ❌ "这个SUG比列表中其他的更好,给0.9" - ❌ "相比第一个SUG,这个稍差一些,给0.7" **正确示例**: - ✅ "这个SUG的动作'获取'与原始问题'获取'完全一致,根据评分标准给0.97" - ✅ "这个SUG无动作意图,根据评分标准给0" --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含evaluations数组,每个元素包含: ```json { "evaluations": [ { "sug_text": "SUG文本", "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估理由", "得分为零的原因": "原始问题无动机/sug词条无动机/动机不匹配/不适用" } ] } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 4. **顺序严格对应(极其重要)**: - evaluations数组必须与输入的sug词条列表严格1对1对应 - 第1个元素必须是输入列表的第1个SUG,第2个元素必须是第2个SUG,以此类推 - 每个元素的sug_text必须与输入SUG完全一致(逐字匹配,包括标点) - 禁止改变顺序、禁止遗漏任何SUG、禁止重复评估 - 示例:输入"1. 秋季摄影素材 2. 川西风光" → 输出[{sug_text:"秋季摄影素材",...}, {sug_text:"川西风光",...}] - 错误示例:输出[{sug_text:"川西风光",...}, {sug_text:"秋季摄影素材",...}] ← 顺序错误❌ --- # 核心原则总结 1. **只评估动作**:完全聚焦于动作意图,不管对象和场景 2. **作用域识别**:识别作用域但只评估动机层 3. **严格标准一致性**:对所有用例使用相同的评估标准,避免评分飘移 4. **理由纯粹**:评分理由只能谈动作,不能谈对象、场景、主题 5. **独立评估**:每个SUG完全独立评估,禁止相对比较 """.strip() # 批量品类评估prompt - 从batch_evaluation_demo.py复制(与单个品类prompt类似,添加批量说明) # 注:完整prompt见batch_evaluation_demo.py:724-966行,此处使用相同内容 batch_category_evaluation_instructions = category_evaluation_instructions.replace( "- **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合", """- **<平台sug词条列表>**:待评估的多个词条(编号1-N),每个词条需要独立评估 **批量评估说明**: - 输入格式为编号列表:1. 词条1 2. 词条2 ... - 每个词条都是独立的评估对象 - 对每个词条使用完全相同的评估标准""" ).replace( '"品类维度得分": "-1到1之间的小数",\n "简要说明品类维度相关度理由": "评估该sug词条与原始问题品类匹配程度的理由,包含作用域覆盖理由"', ''' "evaluations": [ { "sug_text": "SUG文本", "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估理由" } ]''' ).replace( "1. **只看名词和限定词**:完全忽略动作和意图", """## 【极其重要】独立评估原则 1. **绝对评分**:每个SUG的评分必须基于与原始问题的匹配度,使用固定的评分标准 2. **禁止相对比较**:不要比较SUG之间的好坏,不要因为"其他SUG更好"而降低某个SUG的分数 3. **标准一致性**:对第1个SUG和第10个SUG使用完全相同的评分标准 4. **独立判断**:评估SUG A时,完全不考虑SUG B/C/D的存在 --- # 核心原则总结 1. **只看名词和限定词**:完全忽略动作和意图""" ) + """ 6. **独立评估**:每个SUG完全独立评估,禁止相对比较 7. **顺序严格对应(极其重要)**:evaluations数组必须与输入的sug词条列表严格1对1对应 """ # 批量评估Agent定义 batch_motivation_evaluator = Agent[None]( name="批量动机维度评估专家", instructions=batch_motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=BatchMotivationResult, ) batch_category_evaluator = Agent[None]( name="批量品类维度评估专家", instructions=batch_category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=BatchCategoryResult, ) # ============================================================================ # Round 0 专用 Agent(v124新增 - 需求1) # ============================================================================ # Round 0 动机评估 prompt(不含延伸词) round0_motivation_evaluation_instructions = """ #角色 你是**专业的动机意图评估专家** 你的任务是:判断我给你的 <词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** ## 动作意图的识别 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 部分作用域的处理 ## 情况1:词条是原始问题的部分作用域 当词条只包含原始问题的部分作用域时,需要判断: 1. 词条是否包含动作意图 2. 如果包含,动作是否匹配 **示例**: ``` 原始问题:"川西旅行行程规划" - 完整作用域:规划(动作)+ 旅行行程(对象)+ 川西(场景) 词条:"川西旅行" - 包含作用域:旅行(部分对象)+ 川西(场景) - 缺失作用域:规划(动作) - 动作意图评分:0(无动作意图) ``` **评分原则**: - 如果sug词条缺失动机层(动作) → 动作意图得分 = 0 - 如果sug词条包含动机层 → 按动作匹配度评分 --- #评分标准: 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs 词条"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.90: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs 词条"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - sug词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "sug词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该词条与原始问题动机匹配程度的理由" } ``` #注意事项: 始终围绕动机维度:所有评估都基于"动机"维度,不偏离 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当词条对原始问题动机产生误导、冲突或有害引导时给予负分 零分使用原则:当词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。 """.strip() # Round 0 品类评估 prompt(不含延伸词) round0_category_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。 你的任务是:判断我给你的 <词条> 与 <原始问题> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **品类维度** 进行: # 维度独立性警告 【严格约束】本评估**只评估品类维度**,,必须遵守以下规则: 1. **只看名词和限定词**:评估时只考虑主体、限定词的匹配度 2. **完全忽略动词**:动作意图、目的等动机信息对本维度评分无影响 ### 品类维度 **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 ## ⚠️ 品类评估核心原则(必读) ### 原则1:只看词条表面,禁止联想推演 - 只能基于词条实际包含的词汇评分 - 禁止推测"可能包含"、"可以理解为" **错误示例:** 原始问题:"川西旅行行程" vs 词条:"每日计划" - 错误 "每日计划可以包含旅行规划,所以有关联" → 这是不允许的联想 - 正确: "词条只有'每日计划',无'旅行'字眼,品类不匹配" → 正确判断 ### 原则2:通用概念 ≠ 特定概念 - **通用**:计划、方法、技巧、素材(无领域限定) - **特定**:旅行行程、摄影技巧、烘焙方法(有明确领域) IF 词条是通用 且 原始问题是特定: → 品类不匹配 → 评分0.05~0.1 关键:通用概念不等于特定概念,不能因为"抽象上都是规划"就给分 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <词条> ↓ 【品类维度相关性判定】 ├→ 步骤1: 评估<词条>与<原始问题>的内容主体和限定词匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度2: 品类维度评估 评估对象: <词条> 与 <原始问题> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: 词条是通用概念,原始问题是特定概念 词条"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"人像摄影素材" - 例: 原始问题无法识别动机 且 词条也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs 词条"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs 词条"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs 词条"盗版素材下载" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该词条与原始问题品类匹配程度的理由" } ``` --- #注意事项: 始终围绕品类维度:所有评估都基于"品类"维度,不偏离 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当词条对原始问题品类产生误导、冲突或有害引导时给予负分 零分使用原则:当词条与原始问题品类无明确关联,既不相关也不冲突时给予零分 """.strip() # 创建 Round 0 评估 Agent round0_motivation_evaluator = Agent[None]( name="Round 0动机维度评估专家", instructions=round0_motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation, model_settings=ModelSettings(temperature=0.2) ) round0_category_evaluator = Agent[None]( name="Round 0品类维度评估专家", instructions=round0_category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation, model_settings=ModelSettings(temperature=0.2) ) # ============================================================================ # 域内/域间 专用 Agent(v124新增 - 需求2&3) # ============================================================================ # 域内/域间 动机评估 prompt(不含延伸词) scope_motivation_evaluation_instructions = """ # 角色 你是**专业的动机意图评估专家**。 任务:判断<词条>与<同一作用域词条>的**动机意图匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 你将接收到以下输入: **<同一作用域词条>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 --- # 评估架构 输入: <同一作用域词条> + <词条> ↓ 【动机维度相关性判定】 ├→ 步骤1: 评估<词条>与<同一作用域词条>的需求动机匹配度 └→ 输出: -1到1之间的数值 + 判定依据 # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** 当前任务: - **只提取动机层**:动作意图(获取、学习、规划、拍摄等) ## 动作意图的识别 ### 1. 动机维度 **定义:** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 --- # 评分标准 ## 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - 词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该词条与该条作用域匹配程度的理由", "得分为零的原因": "原始问题无动机/sug词条无动机/动机不匹配/不适用" } ``` --- # 核心原则总结 1. **只评估动作**:完全聚焦于动作意图,不管对象和场景 2. **作用域识别**:识别作用域但只评估动机层 3. **严格标准一致性**:对所有用例使用相同的评估标准,避免评分飘移 4. **理由纯粹**:评分理由只能谈动作,不能谈对象、场景、主题 """.strip() # 域内/域间 品类评估 prompt(不含延伸词) scope_category_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。 你的任务是:判断我给你的 <词条> 与 <同一作用域词条> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 输入信息 你将接收到以下输入: - **<同一作用域词条>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 --- #判定流程 #评估架构 输入: <同一作用域词条> + <词条> ↓ 【品类维度相关性判定】 ├→ 步骤1: 评估<词条>与<同一作用域词条>的内容主体和限定词匹配度 └→ 输出: -1到1之间的数值 + 判定依据 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **品类维度** 进行: # 维度独立性警告 【严格约束】本评估**只评估品类维度**,,必须遵守以下规则: 1. **只看名词和限定词**:评估时只考虑主体、限定词的匹配度 2. **完全忽略动词**:动作意图、目的等动机信息对本维度评分无影响 ### 品类维度 **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 ## ⚠️ 品类评估核心原则(必读) ### 原则1:只看词条表面,禁止联想推演 - 只能基于sug词实际包含的词汇评分 - 禁止推测"可能包含"、"可以理解为" **错误示例:** 原始问题:"川西旅行行程" vs sug词:"每日计划" - 错误 "每日计划可以包含旅行规划,所以有关联" → 这是不允许的联想 - 正确: "sug词只有'每日计划',无'旅行'字眼,品类不匹配" → 正确判断 ### 原则2:通用概念 ≠ 特定概念 - **通用**:计划、方法、技巧、素材(无领域限定) - **特定**:旅行行程、摄影技巧、烘焙方法(有明确领域) IF sug词是通用 且 原始问题是特定: → 品类不匹配 → 评分0.05~0.1 关键:通用概念不等于特定概念,不能因为"抽象上都是规划"就给分 --- #相关度评估维度详解 ##评估对象: <词条> 与 <同一作用域词条> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: sug词是通用概念,原始问题是特定概念 sug词"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该词条与同一作用域词条品类匹配程度的理由" } ``` --- #注意事项: 始终围绕品类维度:所有评估都基于"品类"维度,不偏离 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当词条对原始问题品类产生误导、冲突或有害引导时给予负分 零分使用原则:当词条与原始问题品类无明确关联,既不相关也不冲突时给予零分 """.strip() # 创建域内/域间评估 Agent scope_motivation_evaluator = Agent[None]( name="域内动机维度评估专家", instructions=scope_motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation, model_settings=ModelSettings(temperature=0.2) ) scope_category_evaluator = Agent[None]( name="域内品类维度评估专家", instructions=scope_category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation, model_settings=ModelSettings(temperature=0.2) ) # ============================================================================ # v120 保留但不使用的 Agent(v121不再使用) # ============================================================================ # # Agent 3: 加词选择专家(旧版 - v120使用,v121不再使用) # class WordCombination(BaseModel): # """单个词组合""" # selected_word: str = Field(..., description="选择的词") # combined_query: str = Field(..., description="组合后的新query") # reasoning: str = Field(..., description="选择理由") # class WordSelectionTop5(BaseModel): # """加词选择结果(Top 5)""" # combinations: list[WordCombination] = Field( # ..., # description="选择的Top 5组合(不足5个则返回所有)", # min_items=1, # max_items=5 # ) # overall_reasoning: str = Field(..., description="整体选择思路") # word_selection_instructions 已删除 (v121不再使用) # word_selector = Agent[None]( # name="加词组合专家", # instructions=word_selection_instructions, # model=get_model(MODEL_NAME), # output_type=WordSelectionTop5, # model_settings=ModelSettings(temperature=0.2), # ) # ============================================================================ # 辅助函数 # ============================================================================ # ============================================================================ # v121 新增辅助函数 # ============================================================================ def _ensure_sug_cache_dir(): """确保SUG缓存目录存在""" os.makedirs(SUG_CACHE_DIR, exist_ok=True) def _sug_cache_path(keyword: str) -> str: """根据关键词生成缓存文件路径""" key_hash = hashlib.md5(keyword.encode("utf-8")).hexdigest() return os.path.join(SUG_CACHE_DIR, f"{key_hash}.json") def load_sug_cache(keyword: str) -> Optional[list[str]]: """从持久化缓存中读取SUG结果""" if not keyword: return None cache_path = _sug_cache_path(keyword) if not os.path.exists(cache_path): return None file_age = time.time() - os.path.getmtime(cache_path) if file_age > SUG_CACHE_TTL: return None try: with open(cache_path, "r", encoding="utf-8") as f: data = json.load(f) suggestions = data.get("suggestions") if isinstance(suggestions, list): return suggestions except Exception as exc: print(f" ⚠️ 读取SUG缓存失败({keyword}): {exc}") return None def save_sug_cache(keyword: str, suggestions: list[str]): """将SUG结果写入持久化缓存""" if not keyword or not isinstance(suggestions, list): return _ensure_sug_cache_dir() cache_path = _sug_cache_path(keyword) try: payload = { "keyword": keyword, "suggestions": suggestions, "timestamp": datetime.now().isoformat() } with open(cache_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) except Exception as exc: print(f" ⚠️ 写入SUG缓存失败({keyword}): {exc}") def get_suggestions_with_cache(keyword: str, api: XiaohongshuSearchRecommendations, context: RunContext | None = None) -> list[str]: """带持久化缓存的SUG获取""" cached = load_sug_cache(keyword) if cached is not None: print(f" 📦 SUG缓存命中: {keyword} ({len(cached)} 个)") # 统计:SUG请求次数 + 缓存命中次数 if context is not None: context.stats_sug_requests += 1 context.stats_sug_cache_hits += 1 return cached # 统计:SUG请求次数 if context is not None: context.stats_sug_requests += 1 suggestions = api.get_recommendations(keyword=keyword) if suggestions: save_sug_cache(keyword, suggestions) return suggestions # ============================================================================ # 评估缓存持久化函数 # ============================================================================ def _ensure_eval_cache_dir(): """确保评估缓存目录存在""" os.makedirs(EVAL_CACHE_DIR, exist_ok=True) def load_eval_cache() -> dict[str, tuple[float, str]]: """从持久化缓存中读取评估结果 Returns: dict[str, tuple[float, str]]: {文本: (得分, 理由)} """ if not os.path.exists(EVAL_CACHE_FILE): print(f"📦 评估缓存文件不存在,将创建新缓存") return {} try: # 检查缓存文件年龄 file_age = time.time() - os.path.getmtime(EVAL_CACHE_FILE) if file_age > EVAL_CACHE_TTL: print(f"⚠️ 评估缓存已过期({file_age / 86400:.1f}天),清空缓存") return {} with open(EVAL_CACHE_FILE, 'r', encoding='utf-8') as f: data = json.load(f) # 转换回tuple格式 cache = {k: tuple(v) for k, v in data.items()} print(f"📦 加载评估缓存: {len(cache)} 条记录(年龄: {file_age / 3600:.1f}小时)") return cache except Exception as e: print(f"⚠️ 评估缓存加载失败: {e},使用空缓存") return {} def save_eval_cache(cache: dict[str, tuple[float, str]]): """保存评估缓存到磁盘 Args: cache: {文本: (得分, 理由)} """ try: _ensure_eval_cache_dir() # 转换为可序列化格式 data = {k: list(v) for k, v in cache.items()} with open(EVAL_CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"💾 评估缓存已保存: {len(cache)} 条记录 -> {EVAL_CACHE_FILE}") except Exception as e: print(f"⚠️ 评估缓存保存失败: {e}") def get_ordered_subsets(words: list[str], min_len: int = 1) -> list[list[str]]: """ 生成words的所有有序子集(可跳过但不可重排) 使用 itertools.combinations 生成索引组合,保持原始顺序 Args: words: 词列表 min_len: 子集最小长度 Returns: 所有可能的有序子集列表 Example: words = ["川西", "秋季", "风光"] 结果: - 长度1: ["川西"], ["秋季"], ["风光"] - 长度2: ["川西", "秋季"], ["川西", "风光"], ["秋季", "风光"] - 长度3: ["川西", "秋季", "风光"] 共 C(3,1) + C(3,2) + C(3,3) = 3 + 3 + 1 = 7种 """ from itertools import combinations subsets = [] n = len(words) # 遍历所有可能的长度(从min_len到n) for r in range(min_len, n + 1): # 生成长度为r的所有索引组合 for indices in combinations(range(n), r): # 按照原始顺序提取词 subset = [words[i] for i in indices] subsets.append(subset) return subsets def generate_domain_combinations(segments: list[Segment], n_domains: int) -> list[DomainCombination]: """ 生成N域组合 步骤: 1. 从len(segments)个域中选择n_domains个域(组合,保持顺序) 2. 对每个选中的域,生成其words的所有有序子集 3. 计算笛卡尔积,生成所有可能的组合 Args: segments: 语义片段列表 n_domains: 参与组合的域数量 Returns: 所有可能的N域组合列表 Example: 有4个域: [疑问标记, 核心动作, 修饰短语, 中心名词] n_domains=2时,选择域的方式: C(4,2) = 6种 假设选中[核心动作, 中心名词]: - 核心动作的words: ["获取"], 子集: ["获取"] - 中心名词的words: ["风光", "摄影", "素材"], 子集: 7种 则该域选择下的组合数: 1 * 7 = 7种 """ from itertools import combinations, product all_combinations = [] n = len(segments) # 检查参数有效性 if n_domains > n or n_domains < 1: return [] # 1. 选择n_domains个域(保持原始顺序) for domain_indices in combinations(range(n), n_domains): selected_segments = [segments[i] for i in domain_indices] # 新增:如果所有域都只有1个词,跳过(单段落单词不组合) if all(len(seg.words) == 1 for seg in selected_segments): continue # 2. 为每个选中的域生成其words的所有有序子集 domain_subsets = [] for seg in selected_segments: if len(seg.words) == 0: # 如果某个域没有词,跳过该域组合 domain_subsets = [] break subsets = get_ordered_subsets(seg.words, min_len=1) domain_subsets.append(subsets) # 如果某个域没有词,跳过 if len(domain_subsets) != n_domains: continue # 3. 计算笛卡尔积 for word_combination in product(*domain_subsets): # word_combination 是一个tuple,每个元素是一个词列表 # 例如: (["获取"], ["风光", "摄影"]) # 计算总词数 total_words = sum(len(words) for words in word_combination) # 如果总词数<=1,跳过(组词必须大于1个词) if total_words <= 1: continue # 将所有词连接成一个字符串 combined_text = "".join(["".join(words) for words in word_combination]) # 生成类型标签 type_labels = [selected_segments[i].type for i in range(n_domains)] type_label = "[" + "+".join(type_labels) + "]" # 创建DomainCombination对象 comb = DomainCombination( text=combined_text, domains=list(domain_indices), type_label=type_label, source_words=[list(words) for words in word_combination], # 保存来源词 from_segments=[seg.text for seg in selected_segments] ) all_combinations.append(comb) return all_combinations def extract_words_from_segments(segments: list[Segment]) -> list[Q]: """ 从 segments 中提取所有 words,转换为 Q 对象列表 用于 Round 1 的输入:将 Round 0 的 words 转换为可用于请求SUG的 query 列表 Args: segments: Round 0 的语义片段列表 Returns: list[Q]: word 列表,每个 word 作为一个 Q 对象 """ q_list = [] for seg_idx, segment in enumerate(segments): for word in segment.words: # 从 segment.word_scores 获取该 word 的评分 word_score = segment.word_scores.get(word, 0.0) word_reason = segment.word_reasons.get(word, "") # 创建 Q 对象 q = Q( text=word, score_with_o=word_score, reason=word_reason, from_source="word", # 标记来源为 word type_label=f"[{segment.type}]", # 保留域信息 domain_index=seg_idx, # 添加域索引 domain_type=segment.type # 添加域类型(如"中心名词"、"核心动作") ) q_list.append(q) return q_list # ============================================================================ # v120 保留辅助函数 # ============================================================================ def calculate_final_score( motivation_score: float, category_score: float, extension_score: float, zero_reason: Optional[str], extension_reason: str = "" ) -> tuple[float, str]: """ 三维评估综合打分 实现动态权重分配: - 情况1:标准情况 → 动机50% + 品类40% + 延伸词10% - 情况2:原始问题无动机 → 品类70% + 延伸词30% - 情况3:sug词条无动机 → 品类80% + 延伸词20% - 情况4:无延伸词 → 动机70% + 品类30% - 规则3:负分传导 → 核心维度严重负向时上限为0 - 规则4:完美匹配加成 → 双维度≥0.95时加成+0.10 Args: motivation_score: 动机维度得分 -1~1 category_score: 品类维度得分 -1~1 extension_score: 延伸词得分 -1~1 zero_reason: 当motivation_score=0时的原因(可选) extension_reason: 延伸词评估理由,用于判断是否无延伸词 Returns: (最终得分, 规则说明) """ # 情况2:原始问题无动作意图 if motivation_score == 0 and zero_reason == "原始问题无动机": W1, W2, W3 = 0.0, 0.70, 0.30 base_score = category_score * W2 + extension_score * W3 rule_applied = "情况2:原始问题无动作意图,权重调整为 品类70% + 延伸词30%" # 情况3:sug词条无动作意图(但原始问题有) elif motivation_score == 0 and zero_reason == "sug词条无动机": W1, W2, W3 = 0.0, 0.80, 0.20 base_score = category_score * W2 + extension_score * W3 rule_applied = "情况3:sug词条无动作意图,权重调整为 品类80% + 延伸词20%" # 情况4:无延伸词 elif extension_score == 0: W1, W2, W3 = 0.70, 0.30, 0.0 base_score = motivation_score * W1 + category_score * W2 rule_applied = "情况4:无延伸词,权重调整为 动机70% + 品类30%" else: # 情况1:标准权重 W1, W2, W3 = 0.50, 0.40, 0.10 base_score = motivation_score * W1 + category_score * W2 + extension_score * W3 rule_applied = "" # 规则4:完美匹配加成 if motivation_score >= 0.95 and category_score >= 0.95: base_score += 0.10 rule_applied += (" + " if rule_applied else "") + "规则4:双维度完美匹配,加成+0.10" # 规则3:负分传导 if motivation_score <= -0.5 or category_score <= -0.5: base_score = min(base_score, 0) rule_applied += (" + " if rule_applied else "") + "规则3:核心维度严重负向,上限=0" # 边界处理 final_score = max(-1.0, min(1.0, base_score)) return final_score, rule_applied def calculate_final_score_v2( motivation_score: float, category_score: float ) -> tuple[float, str]: """ 两维评估综合打分(v124新增 - 需求1) 用于Round 0分词评估和域内/域间评估,不含延伸词维度 基础权重:动机70% + 品类30% 应用规则: - 规则A:动机高分保护机制 IF 动机维度得分 ≥ 0.8: 品类得分即使为0或轻微负向(-0.2~0) → 最终得分应该不低于0.7 解释: 当目的高度一致时,品类的泛化不应导致"弱相关" - 规则B:动机低分限制机制 IF 动机维度得分 ≤ 0.2: 无论品类得分多高 → 最终得分不高于0.5 解释: 目的不符时,品类匹配的价值有限 - 规则C:动机负向决定机制 IF 动机维度得分 < 0: → 最终得分为0 解释: 动作意图冲突时,推荐具有误导性,不应为正相关 Args: motivation_score: 动机维度得分 -1~1 category_score: 品类维度得分 -1~1 Returns: (最终得分, 规则说明) """ rule_applied = "" # 规则C:动机负向决定机制 if motivation_score < 0: final_score = 0.0 rule_applied = "规则C:动机负向,最终得分=0" return final_score, rule_applied # 基础加权计算: 动机70% + 品类30% base_score = motivation_score * 0.7 + category_score * 0.3 # 规则A:动机高分保护机制 if motivation_score >= 0.8: if base_score < 0.7: final_score = 0.7 rule_applied = f"规则A:动机高分保护(动机{motivation_score:.2f}≥0.8),最终得分下限=0.7" else: final_score = base_score rule_applied = f"规则A:动机高分保护生效(动机{motivation_score:.2f}≥0.8),实际得分{base_score:.2f}已≥0.7" # 规则B:动机低分限制机制 elif motivation_score <= 0.2: if base_score > 0.5: final_score = 0.5 rule_applied = f"规则B:动机低分限制(动机{motivation_score:.2f}≤0.2),最终得分上限=0.5" else: final_score = base_score rule_applied = f"规则B:动机低分限制生效(动机{motivation_score:.2f}≤0.2),实际得分{base_score:.2f}已≤0.5" # 无规则触发 else: final_score = base_score rule_applied = "" # 边界处理 final_score = max(-1.0, min(1.0, final_score)) return final_score, rule_applied def clean_json_string(text: str) -> str: """清理JSON中的非法控制字符(保留 \t \n \r)""" import re # 移除除了 \t(09) \n(0A) \r(0D) 之外的所有控制字符 return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) def process_note_data(note: dict) -> Post: """处理搜索接口返回的帖子数据""" note_card = note.get("note_card", {}) image_list = note_card.get("image_list", []) interact_info = note_card.get("interact_info", {}) user_info = note_card.get("user", {}) # ========== 调试日志 START ========== # note_id = note.get("id", "") # # # 1. 打印完整的 note 结构 # print(f"\n[DEBUG] ===== 处理帖子 {note_id} =====") # print(f"[DEBUG] note 的所有键: {list(note.keys())}") # print(f"[DEBUG] note 完整数据 (前2000字符):") # print(json.dumps(note, ensure_ascii=False, indent=2)[:2000]) # # # 2. 打印 note_card 信息 # print(f"\n[DEBUG] note_card 的所有键: {list(note_card.keys())}") # # # 3. 检查 desc 字段 # raw_desc = note_card.get("desc") # print(f"\n[DEBUG] desc 字段:") # print(f" - 类型: {type(raw_desc).__name__}") # print(f" - 长度: {len(raw_desc) if raw_desc else 0}") # print(f" - 完整内容: {repr(raw_desc)}") # # # 4. 检查是否有其他可能包含完整内容的字段 # print(f"\n[DEBUG] 检查其他可能的内容字段:") # for potential_field in ["full_desc", "content", "full_content", "note_text", "body", "full_body", "title", "display_title"]: # if potential_field in note_card: # value = note_card.get(potential_field) # print(f" - 发现字段 '{potential_field}': 长度={len(str(value))}, 值={repr(str(value)[:200])}") # # # 5. 检查顶层 note 对象中是否有详细内容 # print(f"\n[DEBUG] 检查 note 顶层字段:") # for top_field in ["note_info", "detail", "content", "desc"]: # if top_field in note: # value = note.get(top_field) # print(f" - 发现顶层字段 '{top_field}': 类型={type(value).__name__}, 内容={repr(str(value)[:200])}") # # print(f"[DEBUG] ===== 数据检查完成 =====\n") # ========== 调试日志 END ========== # 提取图片URL - 支持字符串和字典两种格式 images = [] for img in image_list: if isinstance(img, str): # 预处理后的字符串格式(来自xiaohongshu_search.py的_preprocess_response) images.append(img) elif isinstance(img, dict): # 原始字典格式 - 尝试新字段名 image_url,如果不存在则尝试旧字段名 url_default img_url = img.get("image_url") or img.get("url_default") if img_url: images.append(img_url) # 判断类型 note_type = note_card.get("type", "normal") video_url = "" if note_type == "video": video_info = note_card.get("video", {}) if isinstance(video_info, dict): # 尝试获取视频URL video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "") # 构造 Post 对象 post = Post( note_id=note.get("id") or "", title=note_card.get("display_title") or "", body_text=note_card.get("desc") or "", type=note_type, images=images, video=video_url, interact_info={ "liked_count": interact_info.get("liked_count", 0), "collected_count": interact_info.get("collected_count", 0), "comment_count": interact_info.get("comment_count", 0), "shared_count": interact_info.get("shared_count", 0) }, note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}" ) # # 打印最终构造的 Post 对象 # print(f"\n[DEBUG] ===== 构造的 Post 对象 =====") # print(f"[DEBUG] - note_id: {post.note_id}") # print(f"[DEBUG] - title: {post.title}") # print(f"[DEBUG] - body_text 长度: {len(post.body_text)}") # print(f"[DEBUG] - body_text 完整内容: {repr(post.body_text)}") # print(f"[DEBUG] - type: {post.type}") # print(f"[DEBUG] - images 数量: {len(post.images)}") # print(f"[DEBUG] - interact_info: {post.interact_info}") # print(f"[DEBUG] ===== Post 对象构造完成 =====\n") return post async def evaluate_with_o(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None, context: RunContext | None = None, **kwargs) -> tuple[float, str]: """评估文本与原始问题o的相关度 采用两阶段评估 + 代码计算规则: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的文本 o: 原始问题 cache: 评估缓存(可选),用于避免重复评估 context: 运行上下文(可选),用于统计 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 统计LLM调用(无论是否缓存命中都计数,因为是"评估比对"次数) if context is not None: context.stats_llm_calls += 3 # 3个评估器 # 检查缓存 if cache is not None and text in cache: cached_score, cached_reason = cache[text] print(f" ⚡ 缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <原始问题> {o} <平台sug词条> {text} 请评估平台sug词条与原始问题的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用三个评估器 motivation_task = Runner.run(motivation_evaluator, eval_input) category_task = Runner.run(category_evaluator, eval_input) extension_task = Runner.run(extension_word_evaluator, eval_input) motivation_result, category_result, extension_result = await asyncio.gather( motivation_task, category_task, extension_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output extension_eval: ExtensionWordEvaluation = extension_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 extension_score = extension_eval.延伸词得分 zero_reason = motivation_eval.得分为零的原因 # 应用规则计算最终得分 final_score, rule_applied = calculate_final_score( motivation_score, category_score, extension_score, zero_reason, extension_eval.简要说明延伸词维度相关度理由 ) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 extension_reason = extension_eval.简要说明延伸词维度相关度理由 combined_reason = ( f'【评估对象】词条"{text}" vs 原始问题"{o}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【延伸词维度 {extension_score:.2f}】{extension_reason}\n" f"【最终得分 {final_score:.2f}】" ) # 添加规则说明 if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" # 存入缓存 if cache is not None: cache[text] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) # 等待1秒后重试 else: print(f" ❌ 评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason async def evaluate_batch_with_o( texts: list[str], o: str, cache: dict[str, tuple[float, str]] | None = None, context: RunContext | None = None, round_num: int = 1 ) -> list[tuple[float, str]]: """批量评估函数(每批最多10个)- Round 1+ 对多个SUG进行批量评估,自动分批处理(每批最多10个) 使用批量Agent一次性评估多个SUG,显著提升性能 Args: texts: 待评估的SUG列表 o: 原始问题 cache: 评估缓存(可选) context: 运行上下文(可选),用于统计 round_num: 轮次编号,用于日志输出 Returns: list[tuple[float, str]]: 每个SUG的(最终得分, 评估理由)列表,顺序与输入严格对应 """ import time BATCH_SIZE = 10 # 每批最多10个SUG results = [] # 分批处理 for batch_idx in range(0, len(texts), BATCH_SIZE): batch_texts = texts[batch_idx:batch_idx + BATCH_SIZE] batch_start_time = time.time() print(f" [Round {round_num} 批量评估] 批次{batch_idx//BATCH_SIZE + 1}: 评估 {len(batch_texts)} 个SUG...") # 先检查缓存,分离已缓存和未缓存的 cached_results = {} uncached_texts = [] uncached_indices = [] for i, text in enumerate(batch_texts): if cache is not None and text in cache: cached_results[i] = cache[text] print(f" ⚡ 缓存命中: {text} -> {cache[text][0]:.2f}") else: uncached_texts.append(text) uncached_indices.append(i) # 如果全部命中缓存,直接返回 if not uncached_texts: print(f" ✅ 全部命中缓存,跳过批量评估") results.extend([cached_results[i] for i in range(len(batch_texts))]) continue # 构建批量评估输入 sug_list_str = "\n".join([f"{i}. {sug}" for i, sug in enumerate(uncached_texts, 1)]) batch_input = f""" <原始问题> {o} <平台sug词条列表> {sug_list_str} 请对以上所有SUG每一个进行完全独立评估。 """ # 统计LLM调用(批量调用计为2次:动机+品类) if context is not None: context.stats_llm_calls += 2 # 添加重试机制 max_retries = 2 last_error = None batch_success = False for attempt in range(max_retries): try: # 并发调用批量评估器(不含延伸词) motivation_task = Runner.run(batch_motivation_evaluator, batch_input) category_task = Runner.run(batch_category_evaluator, batch_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) batch_motivation: BatchMotivationResult = motivation_result.final_output batch_category: BatchCategoryResult = category_result.final_output # 验证返回数量 if len(batch_motivation.evaluations) != len(uncached_texts): raise ValueError(f"动机评估数量不匹配: 期望{len(uncached_texts)},实际{len(batch_motivation.evaluations)}") if len(batch_category.evaluations) != len(uncached_texts): raise ValueError(f"品类评估数量不匹配: 期望{len(uncached_texts)},实际{len(batch_category.evaluations)}") # 验证顺序 for i, (expected_text, mot_item, cat_item) in enumerate(zip(uncached_texts, batch_motivation.evaluations, batch_category.evaluations)): if mot_item.sug_text != expected_text: raise ValueError(f"动机评估顺序错误: 位置{i+1}期望'{expected_text}',实际'{mot_item.sug_text}'") if cat_item.sug_text != expected_text: raise ValueError(f"品类评估顺序错误: 位置{i+1}期望'{expected_text}',实际'{cat_item.sug_text}'") # 处理每个SUG的结果 batch_results_temp = [] for mot_item, cat_item in zip(batch_motivation.evaluations, batch_category.evaluations): motivation_score = mot_item.动机维度得分 category_score = cat_item.品类维度得分 zero_reason = mot_item.得分为零的原因 # 应用规则计算最终得分(不含延伸词维度) final_score, rule_applied = calculate_final_score_v2( motivation_score, category_score ) # 组合评估理由 core_motivation = mot_item.原始问题核心动机提取.简要说明核心动机 motivation_reason = mot_item.简要说明动机维度相关度理由 category_reason = cat_item.简要说明品类维度相关度理由 combined_reason = ( f'【评估对象】词条"{mot_item.sug_text}" vs 原始问题"{o}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【最终得分 {final_score:.2f}】" ) if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" batch_results_temp.append((final_score, combined_reason)) # 存入缓存 if cache is not None: cache[mot_item.sug_text] = (final_score, combined_reason) # 合并缓存结果和批量评估结果 final_batch_results = [] uncached_idx = 0 for i in range(len(batch_texts)): if i in cached_results: final_batch_results.append(cached_results[i]) else: final_batch_results.append(batch_results_temp[uncached_idx]) uncached_idx += 1 results.extend(final_batch_results) batch_success = True batch_elapsed = time.time() - batch_start_time print(f" ✅ 批次{batch_idx//BATCH_SIZE + 1}完成: {len(uncached_texts)}个SUG,耗时{batch_elapsed:.2f}秒") break except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 批量评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) else: print(f" ❌ 批量评估失败 (已达最大重试次数): {error_msg[:150]}") # 如果批量评估失败,回退到单个评估 if not batch_success: print(f" ⚠️ 批量评估失败,回退到单个评估模式...") for text in uncached_texts: try: score, reason = await evaluate_with_o(text, o, cache, context) batch_results_temp.append((score, reason)) except Exception as e: print(f" ❌ 单个评估也失败: {text[:30]}... - {str(e)[:100]}") batch_results_temp.append((0.0, f"评估失败: {str(e)[:100]}")) # 合并结果 final_batch_results = [] uncached_idx = 0 for i in range(len(batch_texts)): if i in cached_results: final_batch_results.append(cached_results[i]) else: final_batch_results.append(batch_results_temp[uncached_idx]) uncached_idx += 1 results.extend(final_batch_results) return results async def evaluate_with_o_round0(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]: """Round 0专用评估函数(v124新增 - 需求1) 用于评估segment和word与原始问题的相关度 不含延伸词维度,使用Round 0专用Prompt和新评分逻辑 采用两维评估: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的文本(segment或word) o: 原始问题 cache: 评估缓存(可选),用于避免重复评估 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 检查缓存 cache_key = f"round0:{text}:{o}" # 添加前缀以区分不同评估类型 if cache is not None and cache_key in cache: cached_score, cached_reason = cache[cache_key] print(f" ⚡ Round0缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <原始问题> {o} <词条> {text} 请评估词条与原始问题的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用两个评估器(不含延伸词) motivation_task = Runner.run(round0_motivation_evaluator, eval_input) category_task = Runner.run(round0_category_evaluator, eval_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 # 应用新规则计算最终得分 final_score, rule_applied = calculate_final_score_v2( motivation_score, category_score ) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 combined_reason = ( f'【评估对象】词条"{text}" vs 原始问题"{o}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【最终得分 {final_score:.2f}】" ) # 添加规则说明 if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" # 存入缓存 if cache is not None: cache[cache_key] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ Round0评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) else: print(f" ❌ Round0评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"Round0评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason async def evaluate_batch_with_o_round0( texts: list[str], o: str, cache: dict[str, tuple[float, str]] | None = None ) -> list[tuple[float, str]]: """批量评估函数(每批最多10个)- Round 0 专用 对多个words进行批量评估,自动分批处理(每批最多10个) 使用批量Agent一次性评估多个words,显著提升性能 专用于Round 0的segment和word评估 Args: texts: 待评估的word列表 o: 原始问题 cache: 评估缓存(可选) Returns: list[tuple[float, str]]: 每个word的(最终得分, 评估理由)列表,顺序与输入严格对应 """ import time BATCH_SIZE = 10 # 每批最多10个words results = [] # 分批处理 for batch_idx in range(0, len(texts), BATCH_SIZE): batch_texts = texts[batch_idx:batch_idx + BATCH_SIZE] batch_start_time = time.time() print(f" [Round 0 批量评估] 批次{batch_idx//BATCH_SIZE + 1}: 评估 {len(batch_texts)} 个words...") # 先检查缓存,分离已缓存和未缓存的 cached_results = {} uncached_texts = [] uncached_indices = [] for i, text in enumerate(batch_texts): cache_key = f"round0:{text}:{o}" if cache is not None and cache_key in cache: cached_results[i] = cache[cache_key] print(f" ⚡ Round0缓存命中: {text} -> {cache[cache_key][0]:.2f}") else: uncached_texts.append(text) uncached_indices.append(i) # 如果全部命中缓存,直接返回 if not uncached_texts: print(f" ✅ 全部命中缓存,跳过批量评估") results.extend([cached_results[i] for i in range(len(batch_texts))]) continue # 构建批量评估输入 word_list_str = "\n".join([f"{i}. {word}" for i, word in enumerate(uncached_texts, 1)]) batch_input = f""" <原始问题> {o} <词条列表> {word_list_str} 请对以上所有词条每一个进行完全独立评估。 """ # 添加重试机制 max_retries = 2 last_error = None batch_success = False for attempt in range(max_retries): try: # 并发调用批量评估器(不含延伸词,使用Round 0专用prompt) # 注意: Round 0使用与Round 1+相同的批量Agent,因为prompt中已包含所有必要约束 motivation_task = Runner.run(batch_motivation_evaluator, batch_input) category_task = Runner.run(batch_category_evaluator, batch_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) batch_motivation: BatchMotivationResult = motivation_result.final_output batch_category: BatchCategoryResult = category_result.final_output # 验证返回数量 if len(batch_motivation.evaluations) != len(uncached_texts): raise ValueError(f"Round0动机评估数量不匹配: 期望{len(uncached_texts)},实际{len(batch_motivation.evaluations)}") if len(batch_category.evaluations) != len(uncached_texts): raise ValueError(f"Round0品类评估数量不匹配: 期望{len(uncached_texts)},实际{len(batch_category.evaluations)}") # 验证顺序 for i, (expected_text, mot_item, cat_item) in enumerate(zip(uncached_texts, batch_motivation.evaluations, batch_category.evaluations)): if mot_item.sug_text != expected_text: raise ValueError(f"Round0动机评估顺序错误: 位置{i+1}期望'{expected_text}',实际'{mot_item.sug_text}'") if cat_item.sug_text != expected_text: raise ValueError(f"Round0品类评估顺序错误: 位置{i+1}期望'{expected_text}',实际'{cat_item.sug_text}'") # 处理每个word的结果 batch_results_temp = [] for mot_item, cat_item in zip(batch_motivation.evaluations, batch_category.evaluations): motivation_score = mot_item.动机维度得分 category_score = cat_item.品类维度得分 # 应用Round 0专用规则计算最终得分(不含延伸词) final_score, rule_applied = calculate_final_score_v2( motivation_score, category_score ) # 组合评估理由 core_motivation = mot_item.原始问题核心动机提取.简要说明核心动机 motivation_reason = mot_item.简要说明动机维度相关度理由 category_reason = cat_item.简要说明品类维度相关度理由 combined_reason = ( f'【评估对象】词条"{mot_item.sug_text}" vs 原始问题"{o}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【最终得分 {final_score:.2f}】" ) if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" batch_results_temp.append((final_score, combined_reason)) # 存入缓存(使用round0前缀) if cache is not None: cache_key = f"round0:{mot_item.sug_text}:{o}" cache[cache_key] = (final_score, combined_reason) # 合并缓存结果和批量评估结果 final_batch_results = [] uncached_idx = 0 for i in range(len(batch_texts)): if i in cached_results: final_batch_results.append(cached_results[i]) else: final_batch_results.append(batch_results_temp[uncached_idx]) uncached_idx += 1 results.extend(final_batch_results) batch_success = True batch_elapsed = time.time() - batch_start_time print(f" ✅ 批次{batch_idx//BATCH_SIZE + 1}完成: {len(uncached_texts)}个words,耗时{batch_elapsed:.2f}秒") break except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ Round0批量评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) else: print(f" ❌ Round0批量评估失败 (已达最大重试次数): {error_msg[:150]}") # 如果批量评估失败,回退到单个评估 if not batch_success: print(f" ⚠️ Round0批量评估失败,回退到单个评估模式...") batch_results_temp = [] for text in uncached_texts: try: score, reason = await evaluate_with_o_round0(text, o, cache) batch_results_temp.append((score, reason)) except Exception as e: print(f" ❌ Round0单个评估也失败: {text[:30]}... - {str(e)[:100]}") batch_results_temp.append((0.0, f"Round0评估失败: {str(e)[:100]}")) # 合并结果 final_batch_results = [] uncached_idx = 0 for i in range(len(batch_texts)): if i in cached_results: final_batch_results.append(cached_results[i]) else: final_batch_results.append(batch_results_temp[uncached_idx]) uncached_idx += 1 results.extend(final_batch_results) return results async def evaluate_within_scope(text: str, scope_text: str, cache: dict[str, tuple[float, str]] | None = None, context: RunContext | None = None) -> tuple[float, str]: """域内/域间专用评估函数(v124新增 - 需求2&3) 用于评估词条与作用域词条(单域或域组合)的相关度 不含延伸词维度,使用域内专用Prompt和新评分逻辑 采用两维评估: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的词条 scope_text: 作用域词条(可以是单域词条或域组合词条) cache: 评估缓存(可选),用于避免重复评估 context: 运行上下文(可选),用于统计 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 统计LLM调用(无论是否缓存命中都计数) if context is not None: context.stats_llm_calls += 2 # 2个评估器 # 检查缓存 cache_key = f"scope:{text}:{scope_text}" # 添加前缀以区分不同评估类型 if cache is not None and cache_key in cache: cached_score, cached_reason = cache[cache_key] print(f" ⚡ 域内缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <同一作用域词条> {scope_text} <词条> {text} 请评估词条与同一作用域词条的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用两个评估器(不含延伸词) motivation_task = Runner.run(scope_motivation_evaluator, eval_input) category_task = Runner.run(scope_category_evaluator, eval_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 # 应用新规则计算最终得分 final_score, rule_applied = calculate_final_score_v2( motivation_score, category_score ) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 combined_reason = ( f'【评估对象】词条"{text}" vs 作用域词条"{scope_text}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【最终得分 {final_score:.2f}】" ) # 添加规则说明 if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" # 存入缓存 if cache is not None: cache[cache_key] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 域内评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) else: print(f" ❌ 域内评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"域内评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason # ============================================================================ # v125 新增辅助函数(用于新评分逻辑) # ============================================================================ def get_source_word_score( word_text: str, segment: Segment, context: RunContext ) -> float: """ 查找来源词的得分 查找顺序: 1. 先查 segment.word_scores (Round 0的单个词) 2. 再查 context.word_score_history (Round 1+的组合) Args: word_text: 词文本 segment: 该词所在的segment context: 运行上下文 Returns: 词的得分,找不到返回0.0 """ # 优先查Round 0的词得分 if word_text in segment.word_scores: return segment.word_scores[word_text] # 其次查历史组合得分 if word_text in context.word_score_history: return context.word_score_history[word_text] # 都找不到 print(f" ⚠️ 警告: 未找到来源词得分: {word_text}") return 0.0 async def evaluate_domain_combination_round1( comb: DomainCombination, segments: list[Segment], context: RunContext ) -> tuple[float, str]: """ Round 1 域内组合评估(新逻辑) 最终得分 = 品类得分 × 原始域得分 Args: comb: 域内组合对象 segments: 所有segment列表 context: 运行上下文 Returns: (最终得分, 评估理由) """ # 统计LLM调用 context.stats_llm_calls += 1 # 1个评估器 # 获取所属segment domain_idx = comb.domains[0] if comb.domains else 0 segment = segments[domain_idx] if 0 <= domain_idx < len(segments) else None if not segment: return 0.0, "错误: 无法找到所属segment" # 拼接作用域文本 scope_text = segment.text # 准备输入 eval_input = f""" <同一作用域词条> {scope_text} <词条> {comb.text} 请评估词条与同一作用域词条的匹配度。 """ # 只调用品类评估器 try: category_result = await Runner.run(scope_category_evaluator, eval_input) category_eval: CategoryEvaluation = category_result.final_output category_score = category_eval.品类维度得分 category_reason = category_eval.简要说明品类维度相关度理由 except Exception as e: print(f" ❌ Round 1品类评估失败: {e}") return 0.0, f"评估失败: {str(e)[:100]}" # 计算最终得分 domain_score = segment.score_with_o final_score = category_score * domain_score # 组合评估理由 combined_reason = ( f'【Round 1 域内评估】\n' f'【评估对象】组合"{comb.text}" vs 作用域"{scope_text}"\n' f'【品类得分】{category_score:.2f} - {category_reason}\n' f'【原始域得分】{domain_score:.2f}\n' f'【计算公式】品类得分 × 域得分 = {category_score:.2f} × {domain_score:.2f}\n' f'【最终得分】{final_score:.2f}' ) return final_score, combined_reason async def evaluate_domain_combination_round2plus( comb: DomainCombination, segments: list[Segment], context: RunContext ) -> tuple[float, str]: """ Round 2+ 域间组合评估(新逻辑) 步骤: 1. 用现有逻辑评估得到 base_score 2. 计算加权系数 = Σ(来源词得分) / Σ(域得分) 3. 最终得分 = base_score × 系数,截断到1.0 Args: comb: 域间组合对象 segments: 所有segment列表 context: 运行上下文 Returns: (最终得分, 评估理由) """ # 步骤1: 现有逻辑评估(域内评估) scope_text = "".join(comb.from_segments) base_score, base_reason = await evaluate_within_scope( comb.text, scope_text, context.evaluation_cache, context ) # 步骤2: 计算加权系数 total_source_score = 0.0 total_domain_score = 0.0 coefficient_details = [] for domain_idx, source_words_list in zip(comb.domains, comb.source_words): # 获取segment segment = segments[domain_idx] if 0 <= domain_idx < len(segments) else None if not segment: continue domain_score = segment.score_with_o total_domain_score += domain_score # 如果该域贡献了多个词(组合),需要拼接后查找 if len(source_words_list) == 1: # 单个词 source_word_text = source_words_list[0] else: # 多个词组合 source_word_text = "".join(source_words_list) # 查找来源词得分 source_score = get_source_word_score(source_word_text, segment, context) total_source_score += source_score coefficient_details.append( f" 域{domain_idx}[{segment.type}]: \"{source_word_text}\"得分={source_score:.2f}, 域得分={domain_score:.2f}" ) # 计算系数 if total_domain_score > 0: coefficient = total_source_score / total_domain_score else: coefficient = 0.0 # 步骤3: 计算最终得分并截断 final_score = base_score * total_source_score final_score = min(1.0, max(-1.0, final_score)) # 截断到[-1.0, 1.0] # 组合评估理由 coefficient_detail_str = "\n".join(coefficient_details) combined_reason = ( f'【Round 2+ 域间评估】\n' f'【评估对象】组合"{comb.text}"\n' f'{base_reason}\n' f'【加权系数计算】\n' f'{total_source_score}\n' f' 来源词总得分: {total_source_score:.2f}\n' f' 系数: {total_source_score:.2f}' f'【计算公式】base_score × 系数 = {base_score:.2f} × {total_source_score:.2f}\n' f'【最终得分(截断后)】{final_score:.2f}' ) return final_score, combined_reason # ============================================================================ # 核心流程函数 # ============================================================================ async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]: """ 初始化阶段 Returns: (seg_list, word_list_1, q_list_1, seed_list) """ print(f"\n{'='*60}") print(f"初始化阶段") print(f"{'='*60}") # 1. 分词:原始问题(o) ->分词-> seg_list print(f"\n[步骤1] 分词...") result = await Runner.run(word_segmenter, o) segmentation: WordSegmentation = result.final_output seg_list = [] for word in segmentation.words: seg_list.append(Seg(text=word, from_o=o)) print(f"分词结果: {[s.text for s in seg_list]}") print(f"分词理由: {segmentation.reasoning}") # 2. 分词评估:seg_list -> 每个seg与o进行评分(使用信号量限制并发数) print(f"\n[步骤2] 评估每个分词与原始问题的相关度...") MAX_CONCURRENT_SEG_EVALUATIONS = 10 seg_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SEG_EVALUATIONS) async def evaluate_seg(seg: Seg) -> Seg: async with seg_semaphore: # 初始化阶段的分词评估使用第一轮 prompt (round_num=1) seg.score_with_o, seg.reason = await evaluate_with_o(seg.text, o, context.evaluation_cache, context=context, round_num=1) return seg if seg_list: print(f" 开始评估 {len(seg_list)} 个分词(并发限制: {MAX_CONCURRENT_SEG_EVALUATIONS})...") eval_tasks = [evaluate_seg(seg) for seg in seg_list] await asyncio.gather(*eval_tasks) for seg in seg_list: print(f" {seg.text}: {seg.score_with_o:.2f}") # 3. 构建word_list_1: seg_list -> word_list_1(固定词库) print(f"\n[步骤3] 构建word_list_1(固定词库)...") word_list_1 = [] for seg in seg_list: word_list_1.append(Word( text=seg.text, score_with_o=seg.score_with_o, from_o=o )) print(f"word_list_1(固定): {[w.text for w in word_list_1]}") # 4. 构建q_list_1:seg_list 作为 q_list_1 print(f"\n[步骤4] 构建q_list_1...") q_list_1 = [] for seg in seg_list: q_list_1.append(Q( text=seg.text, score_with_o=seg.score_with_o, reason=seg.reason, from_source="seg" )) print(f"q_list_1: {[q.text for q in q_list_1]}") # 5. 构建seed_list: seg_list -> seed_list print(f"\n[步骤5] 构建seed_list...") seed_list = [] for seg in seg_list: seed_list.append(Seed( text=seg.text, added_words=[], from_type="seg", score_with_o=seg.score_with_o )) print(f"seed_list: {[s.text for s in seed_list]}") return seg_list, word_list_1, q_list_1, seed_list async def run_round( round_num: int, q_list: list[Q], word_list_1: list[Word], seed_list: list[Seed], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, xiaohongshu_detail: XiaohongshuDetail, sug_threshold: float = 0.7, enable_evaluation: bool = False ) -> tuple[list[Q], list[Seed], list[Search]]: """ 运行一轮 Args: round_num: 轮次编号 q_list: 当前轮的q列表 word_list_1: 固定的词库(第0轮分词结果) seed_list: 当前的seed列表 o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: suggestion的阈值 Returns: (q_list_next, seed_list_next, search_list) """ print(f"\n{'='*60}") print(f"第{round_num}轮") print(f"{'='*60}") round_data = { "round_num": round_num, "input_q_list": [{"text": q.text, "score": q.score_with_o, "type": "query"} for q in q_list], "input_word_list_1_size": len(word_list_1), "input_seed_list_size": len(seed_list) } # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list print(f"\n[步骤1] 为每个q请求建议词...") sug_list_list = [] # list of list for q in q_list: print(f"\n 处理q: {q.text}") suggestions = get_suggestions_with_cache(q.text, xiaohongshu_api, context) q_sug_list = [] if suggestions: print(f" 获取到 {len(suggestions)} 个建议词") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) q_sug_list.append(sug) else: print(f" 未获取到建议词") sug_list_list.append(q_sug_list) # 2. sug评估:sug_list_list -> 每个sug与o进行评分(并发) print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...") # 2.1 收集所有需要评估的sug,并记录它们所属的q all_sugs = [] sug_to_q_map = {} # 记录每个sug属于哪个q for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text for sug in q_sug_list: all_sugs.append(sug) sug_to_q_map[id(sug)] = q_text # 2.2 批量评估所有sug(每批最多10个) # 🚀 性能优化:使用批量评估替代单个并发评估,显著提升性能 if all_sugs: print(f" 开始批量评估 {len(all_sugs)} 个建议词(每批最多10个)...") # 提取所有sug的text sug_texts = [sug.text for sug in all_sugs] # 批量评估 batch_results = await evaluate_batch_with_o( texts=sug_texts, o=o, cache=context.evaluation_cache, context=context, round_num=round_num ) # 将结果分配回sug对象 for sug, (score, reason) in zip(all_sugs, batch_results): sug.score_with_o = score sug.reason = reason # 2.3 打印结果并组织到sug_details sug_details = {} # 保存每个Q对应的sug列表 for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text print(f"\n 来自q '{q_text}' 的建议词:") sug_details[q_text] = [] for sug in q_sug_list: print(f" {sug.text}: {sug.score_with_o:.2f}") # 保存到sug_details sug_details[q_text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 2.4 剪枝判断(已禁用 - 保留所有分支) pruned_query_texts = set() if False: # 原: if round_num >= 2: # 剪枝功能已禁用,保留代码以便后续调整 print(f"\n[剪枝判断] 第{round_num}轮开始应用剪枝策略...") for i, q in enumerate(q_list): q_sug_list = sug_list_list[i] if len(q_sug_list) == 0: continue # 没有sug则不剪枝 # 剪枝条件1: 所有sug分数都低于query分数 all_lower_than_query = all(sug.score_with_o < q.score_with_o for sug in q_sug_list) # 剪枝条件2: 所有sug分数都低于0.5 all_below_threshold = all(sug.score_with_o < 0.5 for sug in q_sug_list) if all_lower_than_query and all_below_threshold: pruned_query_texts.add(q.text) max_sug_score = max(sug.score_with_o for sug in q_sug_list) print(f" 🔪 剪枝: {q.text} (query分数:{q.score_with_o:.2f}, sug最高分:{max_sug_score:.2f}, 全部<0.5)") if pruned_query_texts: print(f" 本轮共剪枝 {len(pruned_query_texts)} 个query") else: print(f" 本轮无query被剪枝") else: print(f"\n[剪枝判断] 剪枝功能已禁用,保留所有分支") # 3. search_list构建 print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...") search_list = [] high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] if high_score_sugs: print(f" 找到 {len(high_score_sugs)} 个高分建议词") # 并发搜索 async def search_for_sug(sug: Sug) -> Search: print(f" 搜索: {sug.text}") try: search_result = xiaohongshu_search.search(keyword=sug.text) # xiaohongshu_search.search() 已经返回解析后的数据 notes = search_result.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: # 只取前10个 try: post = process_note_data(note) post_list.append(post) except Exception as e: print(f" ⚠️ 解析帖子失败 {note.get('id', 'unknown')}: {str(e)[:50]}") # 补充详情信息(仅视频类型需要补充视频URL) video_posts = [p for p in post_list if p.type == "video"] if video_posts: print(f" 补充详情({len(video_posts)}个视频)...") for post in video_posts: try: detail_response = xiaohongshu_detail.get_detail(post.note_id) enrich_post_with_detail(post, detail_response) except Exception as e: print(f" ⚠️ 详情补充失败 {post.note_id}: {str(e)[:50]}") print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) search_tasks = [search_for_sug(sug) for sug in high_score_sugs] search_list = await asyncio.gather(*search_tasks) # 评估搜索结果中的帖子 if enable_evaluation: print(f"\n[评估] 评估搜索结果中的帖子...") for search in search_list: if search.post_list: print(f" 评估来自 '{search.text}' 的 {len(search.post_list)} 个帖子") # 对每个帖子进行评估 (V3) for post in search.post_list: knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = await evaluate_post_v3(post, o, semaphore=None) if knowledge_eval: apply_evaluation_v3_to_post(post, knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level) else: print(f"\n[评估] 实时评估已关闭 (使用 --enable-evaluation 启用)") else: print(f" 没有高分建议词,search_list为空") # 4. 构建q_list_next print(f"\n[步骤4] 构建q_list_next...") q_list_next = [] existing_q_texts = set() # 用于去重 add_word_details = {} # 保存每个seed对应的组合词列表 all_seed_combinations = [] # 保存本轮所有seed的组合词(用于后续构建seed_list_next) # 4.1 对于seed_list中的每个seed,从word_list_1中选词组合,产生Top 5 print(f"\n 4.1 为每个seed加词(产生Top 5组合)...") for seed in seed_list: print(f"\n 处理seed: {seed.text}") # 剪枝检查:跳过被剪枝的seed if seed.text in pruned_query_texts: print(f" ⊗ 跳过被剪枝的seed: {seed.text}") continue # 从固定词库word_list_1筛选候选词 candidate_words = [] for word in word_list_1: # 检查词是否已在seed中 if word.text in seed.text: continue # 检查词是否已被添加过 if word.text in seed.added_words: continue candidate_words.append(word) if not candidate_words: print(f" 没有可用的候选词") continue print(f" 候选词数量: {len(candidate_words)}") # 调用Agent一次性选择并组合Top 5(添加重试机制) candidate_words_text = ', '.join([w.text for w in candidate_words]) selection_input = f""" <原始问题> {o} <当前Seed> {seed.text} <候选词列表> {candidate_words_text} 请从候选词列表中选择最多5个最合适的词,分别与当前seed组合成新的query。 """ # 重试机制 max_retries = 2 selection_result = None for attempt in range(max_retries): try: result = await Runner.run(word_selector, selection_input) selection_result = result.final_output break # 成功则跳出 except Exception as e: error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 选词失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:100]}") await asyncio.sleep(1) else: print(f" ❌ 选词失败,跳过该seed: {error_msg[:100]}") break if selection_result is None: print(f" 跳过seed: {seed.text}") continue print(f" Agent选择了 {len(selection_result.combinations)} 个组合") print(f" 整体选择思路: {selection_result.overall_reasoning}") # 并发评估所有组合的相关度 async def evaluate_combination(comb: WordCombination) -> dict: combined = comb.combined_query # 验证:组合结果必须包含完整的seed和word # 检查是否包含seed的所有字符 seed_chars_in_combined = all(char in combined for char in seed.text) # 检查是否包含word的所有字符 word_chars_in_combined = all(char in combined for char in comb.selected_word) if not seed_chars_in_combined or not word_chars_in_combined: print(f" ⚠️ 警告:组合不完整") print(f" Seed: {seed.text}") print(f" Word: {comb.selected_word}") print(f" 组合: {combined}") print(f" 包含完整seed? {seed_chars_in_combined}") print(f" 包含完整word? {word_chars_in_combined}") # 返回极低分数,让这个组合不会被选中 return { 'word': comb.selected_word, 'query': combined, 'score': -1.0, # 极低分数 'reason': f"组合不完整:缺少seed或word的部分内容", 'reasoning': comb.reasoning } # 正常评估,根据轮次选择 prompt score, reason = await evaluate_with_o(combined, o, context.evaluation_cache, context=context, round_num=round_num) return { 'word': comb.selected_word, 'query': combined, 'score': score, 'reason': reason, 'reasoning': comb.reasoning } eval_tasks = [evaluate_combination(comb) for comb in selection_result.combinations] top_5 = await asyncio.gather(*eval_tasks) print(f" 评估完成,得到 {len(top_5)} 个组合") # 将Top 5全部加入q_list_next(去重检查 + 得分过滤) for comb in top_5: # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才能加入下一轮 if comb['score'] < seed.score_with_o + REQUIRED_SCORE_GAIN: print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") continue # 去重检查 if comb['query'] in existing_q_texts: print(f" ⊗ 跳过重复: {comb['query']}") continue print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} > 种子: {seed.score_with_o:.2f})") new_q = Q( text=comb['query'], score_with_o=comb['score'], reason=comb['reason'], from_source="add" ) q_list_next.append(new_q) existing_q_texts.add(comb['query']) # 记录到去重集合 # 记录已添加的词 seed.added_words.append(comb['word']) # 保存到add_word_details add_word_details[seed.text] = [ { "text": comb['query'], "score": comb['score'], "reason": comb['reason'], "selected_word": comb['word'], "seed_score": seed.score_with_o, # 添加原始种子的得分 "type": "add" } for comb in top_5 ] # 保存到all_seed_combinations(用于构建seed_list_next) # 附加seed_score,用于后续过滤 for comb in top_5: comb['seed_score'] = seed.score_with_o all_seed_combinations.extend(top_5) # 4.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next(去重检查) print(f"\n 4.2 将高分sug加入q_list_next...") for sug in all_sugs: # 剪枝检查:跳过来自被剪枝query的sug if sug.from_q and sug.from_q.text in pruned_query_texts: print(f" ⊗ 跳过来自被剪枝query的sug: {sug.text} (来源: {sug.from_q.text})") continue # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才能加入下一轮 if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: # 去重检查 if sug.text in existing_q_texts: print(f" ⊗ 跳过重复: {sug.text}") continue new_q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug" ) q_list_next.append(new_q) existing_q_texts.add(sug.text) # 记录到去重集合 print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 5. 构建seed_list_next(关键修改:不保留上一轮的seed) print(f"\n[步骤5] 构建seed_list_next(不保留上轮seed)...") seed_list_next = [] existing_seed_texts = set() # 5.1 加入本轮所有组合词(只加入得分提升的) print(f" 5.1 加入本轮所有组合词(得分过滤)...") for comb in all_seed_combinations: # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才作为下一轮种子 seed_score = comb.get('seed_score', 0) if comb['score'] < seed_score + REQUIRED_SCORE_GAIN: print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})") continue if comb['query'] not in existing_seed_texts: new_seed = Seed( text=comb['query'], added_words=[], # 新seed的added_words清空 from_type="add", score_with_o=comb['score'] ) seed_list_next.append(new_seed) existing_seed_texts.add(comb['query']) print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} >= 种子: {seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 5.2 加入高分sug print(f" 5.2 加入高分sug...") for sug in all_sugs: # 剪枝检查:跳过来自被剪枝query的sug if sug.from_q and sug.from_q.text in pruned_query_texts: continue # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才作为下一轮种子 if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN and sug.text not in existing_seed_texts: new_seed = Seed( text=sug.text, added_words=[], from_type="sug", score_with_o=sug.score_with_o ) seed_list_next.append(new_seed) existing_seed_texts.add(sug.text) print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 序列化搜索结果数据(包含帖子详情) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [post.model_dump() for post in search.post_list] }) # 记录本轮数据 round_data.update({ "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "total_posts": sum(len(s.post_list) for s in search_list), "q_list_next_size": len(q_list_next), "seed_list_next_size": len(seed_list_next), "total_combinations": len(all_seed_combinations), "pruned_query_count": len(pruned_query_texts), "pruned_queries": list(pruned_query_texts), "output_q_list": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "from": q.from_source, "type": "query"} for q in q_list_next], "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next], "sug_details": sug_details, "add_word_details": add_word_details, "search_results": search_results_data }) context.rounds.append(round_data) print(f"\n本轮总结:") print(f" 建议词数量: {len(all_sugs)}") print(f" 高分建议词: {len(high_score_sugs)}") print(f" 搜索数量: {len(search_list)}") print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}") print(f" 组合词数量: {len(all_seed_combinations)}") print(f" 下轮q数量: {len(q_list_next)}") print(f" 下轮seed数量: {len(seed_list_next)}") return q_list_next, seed_list_next, search_list async def iterative_loop( context: RunContext, max_rounds: int = 2, sug_threshold: float = 0.7, enable_evaluation: bool = False ): """主迭代循环""" print(f"\n{'='*60}") print(f"开始迭代循环") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # 初始化 seg_list, word_list_1, q_list, seed_list = await initialize(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() xiaohongshu_detail = XiaohongshuDetail() # 详情API客户端 # 保存初始化数据 context.rounds.append({ "round_num": 0, "type": "initialization", "seg_list": [{"text": s.text, "score": s.score_with_o, "reason": s.reason, "type": "seg"} for s in seg_list], "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list_1], "q_list_1": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "type": "query"} for q in q_list], "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o, "type": "seed"} for s in seed_list] }) # 收集所有搜索结果 all_search_list = [] # 迭代 round_num = 1 while q_list and round_num <= max_rounds: q_list, seed_list, search_list = await run_round( round_num=round_num, q_list=q_list, word_list_1=word_list_1, # 传递固定词库 seed_list=seed_list, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, sug_threshold=sug_threshold, enable_evaluation=enable_evaluation ) all_search_list.extend(search_list) round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 总轮数: {round_num - 1}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") print(f"{'='*60}") return all_search_list # ============================================================================ # v121 新架构核心流程函数 # ============================================================================ async def initialize_v2(o: str, context: RunContext) -> list[Segment]: """ v121 Round 0 初始化阶段 流程: 1. 语义分段: 调用 semantic_segmenter 将原始问题拆分成语义片段 2. 拆词: 对每个segment调用 word_segmenter 进行拆词 3. 评估: 对每个segment和词进行评估 4. 不进行组合(Round 0只分段和拆词) Returns: 语义片段列表 (Segment) """ print(f"\n{'='*60}") print(f"Round 0: 初始化阶段(语义分段 + 拆词)") print(f"{'='*60}") # 1. 语义分段 print(f"\n[步骤1] 语义分段...") result = await Runner.run(semantic_segmenter, o) segmentation: SemanticSegmentation = result.final_output print(f"语义分段结果: {len(segmentation.segments)} 个片段") print(f"整体分段思路: {segmentation.overall_reasoning}") segment_list = [] for seg_item in segmentation.segments: segment = Segment( text=seg_item.segment_text, type=seg_item.segment_type, from_o=o ) segment_list.append(segment) print(f" - [{segment.type}] {segment.text}") # 2. 对每个segment拆词并评估 print(f"\n[步骤2] 对每个segment拆词并评估...") # 2.1 先对所有segment拆词(并发) MAX_CONCURRENT_EVALUATIONS = 30 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) async def segment_words(segment: Segment) -> Segment: """对segment进行拆词""" async with semaphore: word_result = await Runner.run(word_segmenter, segment.text) word_segmentation: WordSegmentation = word_result.final_output segment.words = word_segmentation.words return segment if segment_list: print(f" [步骤2.1] 对 {len(segment_list)} 个segment进行拆词...") word_tasks = [segment_words(seg) for seg in segment_list] await asyncio.gather(*word_tasks) # 2.2 批量评估所有segments print(f" [步骤2.2] 批量评估 {len(segment_list)} 个segments...") segment_texts = [seg.text for seg in segment_list] segment_results = await evaluate_batch_with_o_round0( texts=segment_texts, o=o, cache=context.evaluation_cache ) # 分配segment评估结果 for segment, (score, reason) in zip(segment_list, segment_results): segment.score_with_o = score segment.reason = reason # 2.3 收集所有words并批量评估 all_words = [] word_to_segments = {} # 记录每个word属于哪些segments for segment in segment_list: for word in segment.words: if word not in word_to_segments: all_words.append(word) word_to_segments[word] = [] word_to_segments[word].append(segment) if all_words: print(f" [步骤2.3] 批量评估 {len(all_words)} 个words(去重后)...") word_results = await evaluate_batch_with_o_round0( texts=all_words, o=o, cache=context.evaluation_cache ) # 分配word评估结果到所有相关的segments for word, (score, reason) in zip(all_words, word_results): for segment in word_to_segments[word]: segment.word_scores[word] = score segment.word_reasons[word] = reason # 打印步骤1结果 print(f"\n[步骤1: 分段及拆词 结果]") for segment in segment_list: print(f" [{segment.type}] {segment.text} (分数: {segment.score_with_o:.2f})") print(f" 拆词: {segment.words}") for word in segment.words: score = segment.word_scores.get(word, 0.0) print(f" - {word}: {score:.2f}") # 保存到context(保留旧格式以兼容) context.segments = [ { "text": seg.text, "type": seg.type, "score": seg.score_with_o, "reason": seg.reason, "words": seg.words, "word_scores": seg.word_scores, "word_reasons": seg.word_reasons } for seg in segment_list ] # 保存 Round 0 到 context.rounds(新格式用于可视化) context.rounds.append({ "round_num": 0, "type": "initialization", "segments": [ { "text": seg.text, "type": seg.type, "domain_index": idx, "score": seg.score_with_o, "reason": seg.reason, "words": [ { "text": word, "score": seg.word_scores.get(word, 0.0), "reason": seg.word_reasons.get(word, "") } for word in seg.words ] } for idx, seg in enumerate(segment_list) ] }) # 🆕 存储Round 0的所有word得分到历史记录 print(f"\n[存储Round 0词得分到历史记录]") for segment in segment_list: for word, score in segment.word_scores.items(): context.word_score_history[word] = score print(f" {word}: {score:.2f}") print(f"\n[Round 0 完成]") print(f" 分段数: {len(segment_list)}") total_words = sum(len(seg.words) for seg in segment_list) print(f" 总词数: {total_words}") return segment_list async def run_round_v2( round_num: int, query_input: list[Q], segments: list[Segment], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, xiaohongshu_detail: XiaohongshuDetail, sug_threshold: float = 0.7, enable_evaluation: bool = False ) -> tuple[list[Q], list[Search], dict]: """ v121 Round N 执行 正确的流程顺序: 1. 为 query_input 请求SUG 2. 评估SUG 3. 高分SUG搜索(含多模态提取) 4. N域组合(从segments生成) 5. 评估组合 6. 生成 q_list_next(组合 + 高分SUG) Args: round_num: 轮次编号 (1-4) query_input: 本轮的输入query列表(Round 1是words,Round 2+是上轮输出) segments: 语义片段列表(用于组合) o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: SUG搜索阈值 Returns: (q_list_next, search_list, extraction_results) """ print(f"\n{'='*60}") print(f"Round {round_num}: {round_num}域组合") print(f"{'='*60}") round_data = { "round_num": round_num, "n_domains": round_num, "input_query_count": len(query_input) } MAX_CONCURRENT_EVALUATIONS = 30 # 🚀 性能优化:从5提升到30,并发评估能力提升6倍 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) # 步骤1: 为 query_input 请求SUG print(f"\n[步骤1] 为{len(query_input)}个输入query请求SUG...") all_sugs = [] sug_details = {} for q in query_input: suggestions = get_suggestions_with_cache(q.text, xiaohongshu_api, context) if suggestions: print(f" {q.text}: 获取到 {len(suggestions)} 个SUG") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) all_sugs.append(sug) else: print(f" {q.text}: 未获取到SUG") print(f" 共获取 {len(all_sugs)} 个SUG") # 步骤2: 评估SUG if len(all_sugs) > 0: print(f"\n[步骤2] 评估{len(all_sugs)}个SUG...") async def evaluate_sug(sug: Sug) -> Sug: async with semaphore: sug.score_with_o, sug.reason = await evaluate_with_o( sug.text, o, context.evaluation_cache, context=context ) return sug eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 打印结果 for sug in all_sugs: print(f" {sug.text}: {sug.score_with_o:.2f}") if sug.from_q: if sug.from_q.text not in sug_details: sug_details[sug.from_q.text] = [] sug_details[sug.from_q.text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 定义通用搜索函数(供步骤2.5、3、5.5共用) async def search_keyword(text: str, score: float, source_type: str) -> Search: """通用搜索函数""" print(f" 搜索: {text} (来源: {source_type})") # 统计:搜索调用次数 context.stats_search_calls += 1 try: search_result = xiaohongshu_search.search(keyword=text) notes = search_result.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: try: post = process_note_data(note) post_list.append(post) except Exception as e: print(f" ⚠️ 解析帖子失败 {note.get('id', 'unknown')}: {str(e)[:50]}") # 补充详情信息(仅视频类型需要补充视频URL) video_posts = [p for p in post_list if p.type == "video"] if video_posts: print(f" 补充详情({len(video_posts)}个视频)...") for post in video_posts: try: detail_response = xiaohongshu_detail.get_detail(post.note_id) enrich_post_with_detail(post, detail_response) except Exception as e: print(f" ⚠️ 详情补充失败 {post.note_id}: {str(e)[:50]}") print(f" → 找到 {len(post_list)} 个帖子") return Search(text=text, score_with_o=score, post_list=post_list) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search(text=text, score_with_o=score, post_list=[]) # 初始化search_list search_list = [] # 步骤2.5: 搜索高分query_input print(f"\n[步骤2.5] 搜索高分输入query(阈值 > {sug_threshold})...") high_score_queries = [q for q in query_input if q.score_with_o > sug_threshold] print(f" 找到 {len(high_score_queries)} 个高分输入query") if high_score_queries: query_search_tasks = [search_keyword(q.text, q.score_with_o, "query_input") for q in high_score_queries] query_searches = await asyncio.gather(*query_search_tasks) search_list.extend(query_searches) # 评估搜索结果中的帖子 if enable_evaluation: print(f"\n[评估] 评估query_input搜索结果中的帖子...") for search in query_searches: if search.post_list: print(f" 评估来自 '{search.text}' 的 {len(search.post_list)} 个帖子") for post in search.post_list: knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = await evaluate_post_v3(post, o, semaphore=None) if knowledge_eval: apply_evaluation_v3_to_post(post, knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level) # 步骤3: 搜索高分SUG print(f"\n[步骤3] 搜索高分SUG(阈值 > {sug_threshold})...") high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] print(f" 找到 {len(high_score_sugs)} 个高分SUG") if high_score_sugs: sug_search_tasks = [search_keyword(sug.text, sug.score_with_o, "sug") for sug in high_score_sugs] sug_searches = await asyncio.gather(*sug_search_tasks) search_list.extend(sug_searches) # 评估搜索结果中的帖子 if enable_evaluation: print(f"\n[评估] 评估SUG搜索结果中的帖子...") for search in sug_searches: if search.post_list: print(f" 评估来自 '{search.text}' 的 {len(search.post_list)} 个帖子") for post in search.post_list: knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = await evaluate_post_v3(post, o, semaphore=None) if knowledge_eval: apply_evaluation_v3_to_post(post, knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level) # 步骤4: 生成N域组合 print(f"\n[步骤4] 生成{round_num}域组合...") domain_combinations = generate_domain_combinations(segments, round_num) print(f" 生成了 {len(domain_combinations)} 个组合") if len(domain_combinations) == 0: print(f" 无法生成{round_num}域组合") # 即使无法组合,也返回高分SUG作为下轮输入 q_list_next = [] for sug in all_sugs: if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug", type_label="" ) q_list_next.append(q) round_data.update({ "domain_combinations_count": 0, "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "sug_details": sug_details, "q_list_next_size": len(q_list_next) }) context.rounds.append(round_data) return q_list_next, search_list # 步骤5: 评估所有组合 print(f"\n[步骤5] 评估{len(domain_combinations)}个组合...") async def evaluate_combination(comb: DomainCombination) -> DomainCombination: async with semaphore: # 🆕 根据轮次选择评估逻辑 if round_num == 1: # Round 1: 域内评估(新逻辑) comb.score_with_o, comb.reason = await evaluate_domain_combination_round1( comb, segments, context ) else: # Round 2+: 域间评估(新逻辑) comb.score_with_o, comb.reason = await evaluate_domain_combination_round2plus( comb, segments, context ) # 🆕 存储组合得分到历史记录 context.word_score_history[comb.text] = comb.score_with_o return comb eval_tasks = [evaluate_combination(comb) for comb in domain_combinations] await asyncio.gather(*eval_tasks) # 排序 - 已注释,保持原始顺序 # domain_combinations.sort(key=lambda x: x.score_with_o, reverse=True) # 打印所有组合(保持原始顺序) evaluation_strategy = 'Round 1 域内评估(品类×域得分)' if round_num == 1 else 'Round 2+ 域间评估(加权系数调整)' print(f" 评估完成,共{len(domain_combinations)}个组合 [策略: {evaluation_strategy}]") for i, comb in enumerate(domain_combinations, 1): print(f" {i}. {comb.text} {comb.type_label} (分数: {comb.score_with_o:.2f})") # 为每个组合补充来源词分数信息,并判断是否超过所有来源词得分 for comb in domain_combinations: word_details = [] flat_scores: list[float] = [] for domain_index, words in zip(comb.domains, comb.source_words): segment = segments[domain_index] if 0 <= domain_index < len(segments) else None segment_type = segment.type if segment else "" segment_text = segment.text if segment else "" items = [] for word in words: score = 0.0 if segment and word in segment.word_scores: score = segment.word_scores[word] items.append({ "text": word, "score": score }) flat_scores.append(score) word_details.append({ "domain_index": domain_index, "segment_type": segment_type, "segment_text": segment_text, "words": items }) comb.source_word_details = word_details comb.source_scores = flat_scores comb.max_source_score = max(flat_scores) if flat_scores else None comb.is_above_source_scores = bool(flat_scores) and all( comb.score_with_o > score for score in flat_scores ) # 步骤5.5: 搜索高分组合词 print(f"\n[步骤5.5] 搜索高分组合词(阈值 > {sug_threshold})...") high_score_combinations = [comb for comb in domain_combinations if comb.score_with_o > sug_threshold] print(f" 找到 {len(high_score_combinations)} 个高分组合词") if high_score_combinations: comb_search_tasks = [search_keyword(comb.text, comb.score_with_o, "combination") for comb in high_score_combinations] comb_searches = await asyncio.gather(*comb_search_tasks) search_list.extend(comb_searches) # 评估搜索结果中的帖子 if enable_evaluation: print(f"\n[评估] 评估组合词搜索结果中的帖子...") for search in comb_searches: if search.post_list: print(f" 评估来自 '{search.text}' 的 {len(search.post_list)} 个帖子") for post in search.post_list: knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = await evaluate_post_v3(post, o, semaphore=None) if knowledge_eval: apply_evaluation_v3_to_post(post, knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level) # 步骤6: 构建 q_list_next(组合 + 高分SUG) print(f"\n[步骤6] 生成下轮输入...") q_list_next: list[Q] = [] # 6.1 添加高增益SUG(满足增益条件),并按分数排序 sug_candidates: list[tuple[Q, Sug]] = [] for sug in all_sugs: if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug", type_label="" ) sug_candidates.append((q, sug)) sug_candidates.sort(key=lambda item: item[0].score_with_o, reverse=True) q_list_next.extend([item[0] for item in sug_candidates]) high_gain_sugs = [item[1] for item in sug_candidates] print(f" 添加 {len(high_gain_sugs)} 个高增益SUG(增益 ≥ {REQUIRED_SCORE_GAIN:.2f})") # 6.2 添加高分组合(需超过所有来源词得分),并按分数排序 combination_candidates: list[tuple[Q, DomainCombination]] = [] for comb in domain_combinations: if comb.is_above_source_scores and comb.score_with_o > 0: domains_str = ','.join([f'D{d}' for d in comb.domains]) if comb.domains else '' q = Q( text=comb.text, score_with_o=comb.score_with_o, reason=comb.reason, from_source="domain_comb", type_label=comb.type_label, domain_type=domains_str # 添加域信息 ) combination_candidates.append((q, comb)) combination_candidates.sort(key=lambda item: item[0].score_with_o, reverse=True) q_list_next.extend([item[0] for item in combination_candidates]) high_score_combinations = [item[1] for item in combination_candidates] print(f" 添加 {len(high_score_combinations)} 个高分组合(组合得分 > 所有来源词)") # 保存round数据(包含完整帖子信息) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [post.model_dump() for post in search.post_list] }) round_data.update({ "input_queries": [{"text": q.text, "score": q.score_with_o, "from_source": q.from_source, "type": "input", "domain_index": q.domain_index, "domain_type": q.domain_type} for q in query_input], "domain_combinations_count": len(domain_combinations), "domain_combinations": [ { "text": comb.text, "type_label": comb.type_label, "score": comb.score_with_o, "reason": comb.reason, "domains": comb.domains, "source_words": comb.source_words, "from_segments": comb.from_segments, "source_word_details": comb.source_word_details, "source_scores": comb.source_scores, "is_above_source_scores": comb.is_above_source_scores, "max_source_score": comb.max_source_score } for comb in domain_combinations ], "high_score_combinations": [ { "text": item[0].text, "score": item[0].score_with_o, "type_label": item[0].type_label, "type": "combination", "is_above_source_scores": item[1].is_above_source_scores } for item in combination_candidates ], "sug_count": len(all_sugs), "sug_details": sug_details, "high_score_sug_count": len(high_score_sugs), "high_gain_sugs": [{"text": q.text, "score": q.score_with_o, "type": "sug"} for q in q_list_next if q.from_source == "sug"], "search_count": len(search_list), "search_results": search_results_data, "q_list_next_size": len(q_list_next), "q_list_next_sections": { "sugs": [ { "text": item[0].text, "score": item[0].score_with_o, "from_source": "sug" } for item in sug_candidates ], "domain_combinations": [ { "text": item[0].text, "score": item[0].score_with_o, "from_source": "domain_comb", "is_above_source_scores": item[1].is_above_source_scores } for item in combination_candidates ] } }) context.rounds.append(round_data) print(f"\nRound {round_num} 总结:") print(f" 输入Query数: {len(query_input)}") print(f" 域组合数: {len(domain_combinations)}") print(f" 高分组合: {len(high_score_combinations)}") print(f" SUG数: {len(all_sugs)}") print(f" 高分SUG数: {len(high_score_sugs)}") print(f" 高增益SUG: {len(high_gain_sugs)}") print(f" 搜索数: {len(search_list)}") # print(f" 提取帖子数: {len(extraction_results)}") # 内容提取流程已断开 print(f" 下轮Query数: {len(q_list_next)}") return q_list_next, search_list # 不再返回提取结果 async def iterative_loop_v2( context: RunContext, max_rounds: int = 4, sug_threshold: float = 0.7, enable_evaluation: bool = False ): """v121 主迭代循环""" import time print(f"\n{'='*60}") print(f"开始v121迭代循环(语义分段跨域组词版)") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # Round 0: 初始化(语义分段 + 拆词) print(f"\n{'='*60}") print(f"Round 0: 初始化(语义分段 + 拆词)") print(f"{'='*60}") round0_start_time = time.time() segments = await initialize_v2(context.o, context) round0_elapsed = time.time() - round0_start_time print(f"\n✅ Round 0 完成,耗时: {round0_elapsed:.2f}秒") # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() xiaohongshu_detail = XiaohongshuDetail() # 详情API客户端 # 收集所有搜索结果 all_search_list = [] # all_extraction_results = {} # 内容提取流程已断开 # 准备 Round 1 的输入:从 segments 提取所有 words query_input = extract_words_from_segments(segments) print(f"\n提取了 {len(query_input)} 个词作为 Round 1 的输入") # Round 1-N: 迭代循环 num_segments = len(segments) actual_max_rounds = min(max_rounds, num_segments) round_num = 1 rounds_elapsed_times = [] # 记录每轮耗时 while query_input and round_num <= actual_max_rounds: round_start_time = time.time() query_input, search_list = await run_round_v2( # 不再接收提取结果 round_num=round_num, query_input=query_input, # 传递上一轮的输出 segments=segments, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, xiaohongshu_detail=xiaohongshu_detail, sug_threshold=sug_threshold, enable_evaluation=enable_evaluation ) round_elapsed = time.time() - round_start_time rounds_elapsed_times.append(round_elapsed) print(f"\n✅ Round {round_num} 完成,耗时: {round_elapsed:.2f}秒") all_search_list.extend(search_list) # all_extraction_results.update(extraction_results) # 内容提取流程已断开 # 如果没有新的query,提前结束 if not query_input: print(f"\n第{round_num}轮后无新query生成,提前结束迭代") break round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 实际轮数: {round_num}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") # print(f" 提取帖子数: {len(all_extraction_results)}") # 内容提取流程已断开 print(f"\n[耗时统计]") print(f" Round 0 耗时: {round0_elapsed:.2f}秒") for i, elapsed in enumerate(rounds_elapsed_times, 1): print(f" Round {i} 耗时: {elapsed:.2f}秒") total_rounds_time = round0_elapsed + sum(rounds_elapsed_times) print(f" 所有轮次总耗时: {total_rounds_time:.2f}秒 ({total_rounds_time/60:.2f}分钟)") print(f"\n[统计信息]") print(f" LLM评估调用: {context.stats_llm_calls} 次") print(f" SUG请求: {context.stats_sug_requests} 次 (缓存命中: {context.stats_sug_cache_hits} 次)") print(f" 搜索调用: {context.stats_search_calls} 次") print(f"{'='*60}") return all_search_list # 不再返回提取结果 # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False, enable_evaluation: bool = False): """主函数""" import time total_start_time = time.time() # 记录总开始时间 current_time, log_url = set_trace() # 读取输入 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') c = read_file_as_string(input_context_file) # 原始需求 o = read_file_as_string(input_q_file) # 原始问题 # 版本信息 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) # 🆕 加载持久化评估缓存 evaluation_cache = load_eval_cache() # 创建运行上下文 run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, c=c, o=o, log_dir=log_dir, log_url=log_url, evaluation_cache=evaluation_cache, # 🆕 使用加载的缓存 ) # 创建日志目录 os.makedirs(run_context.log_dir, exist_ok=True) # 配置日志文件 log_file_path = os.path.join(run_context.log_dir, "run.log") log_file = open(log_file_path, 'w', encoding='utf-8') # 重定向stdout到TeeLogger(同时输出到控制台和文件) original_stdout = sys.stdout sys.stdout = TeeLogger(original_stdout, log_file) try: print(f"📝 日志文件: {log_file_path}") print(f"{'='*60}\n") # 执行迭代 (v121: 使用新架构) all_search_list = await iterative_loop_v2( # 不再接收提取结果 run_context, max_rounds=max_rounds, sug_threshold=sug_threshold, enable_evaluation=enable_evaluation ) # 格式化输出 output = f"原始需求:{run_context.c}\n" output += f"原始问题:{run_context.o}\n" output += f"总搜索次数:{len(all_search_list)}\n" output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n" # output += f"提取帖子数:{len(all_extraction_results)}\n" # 内容提取流程已断开 # 计算总耗时 total_elapsed_time = time.time() - total_start_time output += f"\n统计信息:\n" output += f" 总耗时: {total_elapsed_time:.2f}秒 ({total_elapsed_time/60:.2f}分钟)\n" output += f" LLM评估调用: {run_context.stats_llm_calls} 次\n" output += f" SUG请求: {run_context.stats_sug_requests} 次 (缓存命中: {run_context.stats_sug_cache_hits} 次)\n" output += f" 搜索调用: {run_context.stats_search_calls} 次\n" output += "\n" + "="*60 + "\n" if all_search_list: output += "【搜索结果】\n\n" for idx, search in enumerate(all_search_list, 1): output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n" output += f" 帖子数: {len(search.post_list)}\n" if search.post_list: for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个 output += f" {post_idx}) {post.title}\n" output += f" URL: {post.note_url}\n" output += "\n" else: output += "未找到搜索结果\n" run_context.final_output = output print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(output) # 保存上下文文件 context_file_path = os.path.join(run_context.log_dir, "run_context.json") context_dict = run_context.model_dump() with open(context_file_path, "w", encoding="utf-8") as f: json.dump(context_dict, f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") # 保存详细的搜索结果 search_results_path = os.path.join(run_context.log_dir, "search_results.json") search_results_data = [s.model_dump() for s in all_search_list] with open(search_results_path, "w", encoding="utf-8") as f: json.dump(search_results_data, f, ensure_ascii=False, indent=2) print(f"Search results saved to: {search_results_path}") # # 🆕 保存图片提取结果 - 内容提取流程已断开 # if all_extraction_results: # extraction_path = os.path.join(run_context.log_dir, "search_extract.json") # extraction_data = { # note_id: extraction.model_dump() # for note_id, extraction in all_extraction_results.items() # } # with open(extraction_path, "w", encoding="utf-8") as f: # json.dump(extraction_data, f, ensure_ascii=False, indent=2) # print(f"Image extractions saved to: {extraction_path}") # print(f" 提取了 {len(all_extraction_results)} 个帖子的图片内容") # 可视化 if visualize: import subprocess output_html = os.path.join(run_context.log_dir, "visualization.html") print(f"\n🎨 生成可视化HTML...") # 获取绝对路径 abs_context_file = os.path.abspath(context_file_path) abs_output_html = os.path.abspath(output_html) # 运行可视化脚本 result = subprocess.run([ "node", "visualization/knowledge_search_traverse/index.js", abs_context_file, abs_output_html ]) if result.returncode == 0: print(f"✅ 可视化已生成: {output_html}") else: print(f"❌ 可视化生成失败") finally: # 🆕 保存评估缓存 save_eval_cache(run_context.evaluation_cache) # 恢复stdout sys.stdout = original_stdout log_file.close() print(f"\n📝 运行日志已保存: {log_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.121 语义分段跨域组词版") parser.add_argument( "--input-dir", type=str, default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?", help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?" ) parser.add_argument( "--max-rounds", type=int, default=4, help="最大轮数,默认: 4" ) parser.add_argument( "--sug-threshold", type=float, default=0.7, help="suggestion阈值,默认: 0.7" ) parser.add_argument( "--visualize", action="store_true", default=True, help="运行完成后自动生成可视化HTML" ) parser.add_argument( "--enable-evaluation", action="store_true", default=False, help="是否启用实时评估功能,默认: 关闭" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize, enable_evaluation=args.enable_evaluation))