import asyncio import json import os import sys import argparse import time import hashlib from datetime import datetime from typing import Literal, Optional from agents import Agent, Runner, ModelSettings from lib.my_trace import set_trace from pydantic import BaseModel, Field from lib.utils import read_file_as_string from lib.client import get_model MODEL_NAME = "google/gemini-2.5-flash" # 得分提升阈值:sug或组合词必须比来源query提升至少此幅度才能进入下一轮 REQUIRED_SCORE_GAIN = 0.02 SUG_CACHE_TTL = 24 * 3600 # 24小时 SUG_CACHE_DIR = os.path.join(os.path.dirname(__file__), "data", "sug_cache") from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from script.search.xiaohongshu_search import XiaohongshuSearch from script.search.xiaohongshu_detail import XiaohongshuDetail from script.search.enrichment_helper import enrich_post_with_detail # from multimodal_extractor import extract_post_images # 内容提取流程已断开 from post_evaluator_v3 import evaluate_post_v3, apply_evaluation_v3_to_post # ============================================================================ # 日志工具类 # ============================================================================ class TeeLogger: """同时输出到控制台和日志文件的工具类""" def __init__(self, stdout, log_file): self.stdout = stdout self.log_file = log_file def write(self, message): self.stdout.write(message) self.log_file.write(message) self.log_file.flush() # 实时写入,避免丢失日志 def flush(self): self.stdout.flush() self.log_file.flush() # ============================================================================ # 数据模型 # ============================================================================ class Seg(BaseModel): """分词(旧版)- v120使用""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 # ============================================================================ # 新架构数据模型 (v121) # ============================================================================ class Segment(BaseModel): """语义片段(Round 0语义分段结果)""" text: str # 片段文本 type: str # 语义维度: 动作目/修饰词/中心名词 score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 words: list[str] = Field(default_factory=list) # 该片段拆分出的词列表(Round 0拆词结果) word_scores: dict[str, float] = Field(default_factory=dict) # 词的评分 {word: score} word_reasons: dict[str, str] = Field(default_factory=dict) # 词的评分理由 {word: reason} class DomainCombination(BaseModel): """域组合(Round N的N域组合结果)""" text: str # 组合后的文本 domains: list[int] = Field(default_factory=list) # 参与组合的域索引列表(对应segments的索引) type_label: str = "" # 类型标签,如 [疑问标记+核心动作+中心名词] source_words: list[list[str]] = Field(default_factory=list) # 来源词列表,每个元素是一个域的词列表,如 [["猫咪"], ["梗图"]] score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_segments: list[str] = Field(default_factory=list) # 来源segment的文本列表 source_word_details: list[dict] = Field(default_factory=list) # 词及其得分信息 [{"domain_index":0,"segment_type":"","words":[{"text":"","score":0.0}]}] source_scores: list[float] = Field(default_factory=list) # 来源词的分数列表(扁平化) max_source_score: float | None = None # 来源词的最高分 is_above_source_scores: bool = False # 组合得分是否超过所有来源词 # ============================================================================ # 旧架构数据模型(保留但不使用) # ============================================================================ # class Word(BaseModel): # """词(旧版)- v120使用,v121不再使用""" # text: str # score_with_o: float = 0.0 # 与原始问题的评分 # from_o: str = "" # 原始问题 class Word(BaseModel): """词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_o: str = "" # 原始问题 class QFromQ(BaseModel): """Q来源信息(用于Sug中记录)""" text: str score_with_o: float = 0.0 class Q(BaseModel): """查询""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_source: str = "" # v120: seg/sug/add; v121新增: segment/domain_comb/sug type_label: str = "" # v121新增:域类型标签(仅用于domain_comb来源) domain_index: int = -1 # v121新增:域索引(word来源时有效,-1表示无域) domain_type: str = "" # v121新增:域类型(word来源时表示所属segment的type,如"中心名词") class Sug(BaseModel): """建议词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_q: QFromQ | None = None # 来自的q class Seed(BaseModel): """种子(旧版)- v120使用,v121不再使用""" text: str added_words: list[str] = Field(default_factory=list) # 已经增加的words from_type: str = "" # seg/sug/add score_with_o: float = 0.0 # 与原始问题的评分 class Post(BaseModel): """帖子""" title: str = "" body_text: str = "" type: str = "normal" # video/normal images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面 video: str = "" # 视频url interact_info: dict = Field(default_factory=dict) # 互动信息 note_id: str = "" note_url: str = "" # 详情补充字段(来自详情API) author_name: str = "" # 作者名称 author_id: str = "" # 作者ID publish_time: int = 0 # 发布时间戳 cdn_images: list[str] = Field(default_factory=list) # 高清CDN图片列表(详情API补充) detail_fetched: bool = False # 是否已获取详情的标记 # V3评估字段(顶层 - 快速访问) is_knowledge: bool | None = None # Prompt1: 是否是知识内容 is_content_knowledge: bool | None = None # Prompt2: 是否是内容知识 knowledge_score: float | None = None # Prompt2: 知识评分(0-100) purpose_score: int | None = None # Prompt3: 目的性得分(0-100) category_score: int | None = None # Prompt4: 品类得分(0-100) final_score: float | None = None # 综合得分: purpose*0.7 + category*0.3 (保留2位小数) match_level: str = "" # 匹配等级: "高度匹配"/"基本匹配"/"部分匹配"/"弱匹配"/"不匹配" evaluation_time: str = "" # 评估时间戳 evaluator_version: str = "v3.0" # 评估器版本 # V3评估字段(嵌套 - 详细信息) knowledge_evaluation: dict | None = None # Prompt1: 知识判断详情 content_knowledge_evaluation: dict | None = None # Prompt2: 内容知识评估详情 purpose_evaluation: dict | None = None # Prompt3: 目的性匹配详情 category_evaluation: dict | None = None # Prompt4: 品类匹配详情 class Search(Sug): """搜索结果(继承Sug)""" post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表 class RunContext(BaseModel): """运行上下文""" version: str input_files: dict[str, str] c: str # 原始需求 o: str # 原始问题 log_url: str log_dir: str # v121新增:语义分段结果 segments: list[dict] = Field(default_factory=list) # Round 0的语义分段结果 # 每轮的数据 rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据 # 最终结果 final_output: str | None = None # 评估缓存:避免重复评估相同文本 evaluation_cache: dict[str, tuple[float, str]] = Field(default_factory=dict) # key: 文本, value: (score, reason) # 历史词/组合得分追踪(用于Round 2+计算系数) word_score_history: dict[str, float] = Field(default_factory=dict) # key: 词/组合文本, value: 最终得分 # ============================================================================ # Agent 定义 # ============================================================================ # ============================================================================ # v121 新增 Agent # ============================================================================ # Agent: 语义分段专家 (Prompt1) class SemanticSegment(BaseModel): """单个语义片段""" segment_text: str = Field(..., description="片段文本") segment_type: str = Field(..., description="语义维度(动作目标/修饰词/中心名词)") reasoning: str = Field(..., description="分段理由") class SemanticSegmentation(BaseModel): """语义分段结果""" segments: list[SemanticSegment] = Field(..., description="语义片段列表") overall_reasoning: str = Field(..., description="整体分段思路") semantic_segmentation_instructions = """ 你是语义分段专家。给定一个搜索query,将其拆分成3种语义维度的片段。 ## 语义定义 ### 1. 动作目标 **定义**:表达"想要做什么"的完整语义单元 **包含**: - 疑问词:如何、什么、哪些、有没有、怎么 - 动作词:获取、制作、拍摄、寻找、学习、规划 **示例**: - "如何获取" → 动作目标 - "有哪些" → 动作目标(完整疑问表达) - "寻找" → 动作目标 **注意**:疑问词和动作词应该作为一个完整的语义单元,不要拆分 --- ### 2. 修饰词 **定义**:对中心名词的限定和修饰的完整语义单元 **包含**:形容词、时间词、地点词、程度词 **注意**:多个连续的修饰词可以组合成一个片段 --- ### 3. 中心名词 **定义**:动作和目标的核心对象 **包含**: - 核心名词:素材、梗图、表情包、教程 - 复合名词:摄影素材、风光摄影素材、表情包梗图 --- ## 分段原则 1. **语义完整性**:每个片段应该是完整的语义单元 - 动作目标:疑问词+动作词应该合并 - 修饰词:连续的修饰成分可以合并 - 中心名词:复合名词保持完整 2. **维度互斥**:每个片段只能属于一种维度 3. **保留原文**:片段文本必须保留原query中的字符,不得改写 4. **顺序保持**:片段顺序应与原query一致 --- ## 输出格式(严格遵守) ```json { "segments": [ { "segment_text": "分段结果文本1", "segment_type": "语义类型", "reasoning": "" }, { "segment_text": "分段结果文本2", "segment_type": "语义类型", "reasoning": "" }, { "segment_text": "分段结果文本3", "segment_type": "语义类型", "reasoning": "" }, ], "overall_reasoning": "" } ``` **Query**: "职场相关的网络热梗有哪些" **分段结果**: ```json { "segments": [ { "segment_text": "职场相关的网络热", "segment_type": "修饰词", "reasoning": "多个修饰词组合,限定了梗的类型和范围" }, { "segment_text": "梗", "segment_type": "中心名词", "reasoning": "核心对象" }, { "segment_text": "有哪些", "segment_type": "动作目标", "reasoning": "完整的疑问表达,表达寻找/列举的动机" } ], "overall_reasoning": "修饰词+名词+疑问表达的结构" } ``` ## 输出要求 - segments: 片段列表 - segment_text: 片段文本(必须来自原query) - segment_type: 语义维度(动作目标/修饰词/中心名词) - reasoning: 为什么这样分段 - overall_reasoning: 整体分段思路 ## JSON输出规范 1. **格式要求**:必须输出标准JSON格式 2. **引号规范**:字符串中如需表达引用,使用书名号《》或「」,不要使用英文引号或中文引号"" """.strip() semantic_segmenter = Agent[None]( name="语义分段专家", instructions=semantic_segmentation_instructions, model=get_model(MODEL_NAME), output_type=SemanticSegmentation, ) # ============================================================================ # v120 保留 Agent # ============================================================================ # Agent 1: 分词专家(v121用于Round 0拆词) class WordSegmentation(BaseModel): """分词结果""" words: list[str] = Field(..., description="分词结果列表") reasoning: str = Field(..., description="分词理由") word_segmentation_instructions = """ 你是分词专家。给定一个query,将其拆分成有意义的最小单元。 ## 分词原则 1. 保留有搜索意义的词汇 2. 拆分成独立的概念 3. 保留专业术语的完整性 4. 去除虚词(的、吗、呢等),但保留疑问词(如何、为什么、怎样等) ## 输出要求 返回分词列表和分词理由。 """.strip() word_segmenter = Agent[None]( name="分词专家", instructions=word_segmentation_instructions, model=get_model(MODEL_NAME), output_type=WordSegmentation, ) # Agent 2: 动机维度评估专家 + 品类维度评估专家(两阶段评估) # 动机评估的嵌套模型 class CoreMotivationExtraction(BaseModel): """核心动机提取""" 简要说明核心动机: str = Field(..., description="核心动机说明") class MotivationEvaluation(BaseModel): """动机维度评估""" 原始问题核心动机提取: CoreMotivationExtraction = Field(..., description="原始问题核心动机提取") 动机维度得分: float = Field(..., description="动机维度得分 -1~1") 简要说明动机维度相关度理由: str = Field(..., description="动机维度相关度理由") 得分为零的原因: Optional[Literal["原始问题无动机", "sug词条无动机", "动机不匹配", "不适用"]] = Field(None, description="当得分为0时的原因分类(可选,仅SUG评估使用)") class CategoryEvaluation(BaseModel): """品类维度评估""" 品类维度得分: float = Field(..., description="品类维度得分 -1~1") 简要说明品类维度相关度理由: str = Field(..., description="品类维度相关度理由") class ExtensionWordEvaluation(BaseModel): """延伸词评估""" 延伸词得分: float = Field(..., ge=-1, le=1, description="延伸词得分 -1~1") 简要说明延伸词维度相关度理由: str = Field(..., description="延伸词维度相关度理由") # 动机评估 prompt(统一版本) motivation_evaluation_instructions = """ # 角色 你是**专业的动机意图评估专家**。 任务:判断<平台sug词条>与<原始问题>的**动机意图匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合 --- # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** ## 动作意图的识别 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 部分作用域的处理 ## 情况1:sug词条是原始问题的部分作用域 当sug词条只包含原始问题的部分作用域时,需要判断: 1. sug词条是否包含动作意图 2. 如果包含,动作是否匹配 **示例**: ``` 原始问题:"川西旅行行程规划" - 完整作用域:规划(动作)+ 旅行行程(对象)+ 川西(场景) Sug词条:"川西旅行" - 包含作用域:旅行(部分对象)+ 川西(场景) - 缺失作用域:规划(动作) - 动作意图评分:0(无动作意图) ``` **评分原则**: - 如果sug词条缺失动机层(动作) → 动作意图得分 = 0 - 如果sug词条包含动机层 → 按动作匹配度评分 --- # 评分标准 ## 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或sug词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - sug词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "sug词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- ## 得分为零的原因(语义判断) 当动机维度得分为 0 时,需要在 `得分为零的原因` 字段中选择以下之一: - **"原始问题无动机"**:原始问题是纯名词短语,无法识别任何动作意图 - **"sug词条无动机"**:sug词条中不包含任何动作意图 - **"动机不匹配"**:双方都有动作,但完全无关联 - **"不适用"**:得分不为零时使用此默认值 --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由,包含作用域覆盖情况", "得分为零的原因": "原始问题无动机/sug词条无动机/动机不匹配/不适用" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- # 核心原则总结 1. **只评估动作**:完全聚焦于动作意图,不管对象和场景 2. **作用域识别**:识别作用域但只评估动机层 3. **严格标准一致性**:对所有用例使用相同的评估标准,避免评分飘移 4. **理由纯粹**:评分理由只能谈动作,不能谈对象、场景、主题 """.strip() # 品类评估 prompt category_evaluation_instructions = """ # 角色 你是**专业的内容主体评估专家**。 任务:判断<平台sug词条>与<原始问题>的**内容主体匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 - **<原始问题>**:用户的完整需求描述 - **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合 --- # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估内容主体维度**: - **只评估**:名词主体 + 限定词(地域、时间、场景、质量等) - **完全忽略**:动作、意图、目的 - **评估重点**:内容本身的主题和属性 --- # 作用域与内容主体 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** 在Prompt2中: - **动机层(动作)完全忽略** - **只评估对象层 + 场景层(限定词)** ## 内容主体的构成 **内容主体 = 核心名词 + 限定词** --- # 作用域覆盖度评估 ## 核心原则:越完整越高分 **完整性公式**: ``` 作用域覆盖度 = sug词条包含的作用域元素 / 原始问题的作用域元素总数 ``` **评分影响**: - 覆盖度100% → 基础高分(0.9+) - 覆盖度50-99% → 中高分(0.6-0.9) - 覆盖度<50% → 中低分(0.3-0.6) - 覆盖度=0 → 低分或0分 --- ## 部分作用域的处理 ### 情况1:sug词条包含原始问题的所有对象层和场景层元素 **评分**:0.95-1.0 **示例**: ``` 原始问题:"川西秋季风光摄影素材" - 对象层:摄影素材 - 场景层:川西 + 秋季 + 风光 Sug词条:"川西秋季风光摄影作品" - 对象层:摄影作品(≈素材) - 场景层:川西 + 秋季 + 风光 - 覆盖度:100% - 评分:0.98 ``` ### 情况2:sug词条包含部分场景层元素 **评分**:根据覆盖比例 **示例**: ``` 原始问题:"川西秋季风光摄影素材" - 对象层:摄影素材 - 场景层:川西 + 秋季 + 风光(3个元素) Sug词条:"川西风光摄影素材" - 对象层:摄影素材 ✓ - 场景层:川西 + 风光(2个元素) - 覆盖度:(1+2)/(1+3) = 75% - 评分:0.85 ``` ### 情况3:sug词条只包含对象层,无场景层 **评分**:根据对象匹配度和覆盖度 **示例**: ``` 原始问题:"川西秋季风光摄影素材" - 对象层:摄影素材 - 场景层:川西 + 秋季 + 风光 Sug词条:"摄影素材" - 对象层:摄影素材 ✓ - 场景层:无 - 覆盖度:1/4 = 25% - 评分:0.50(对象匹配但缺失所有限定) ``` ### 情况4:sug词条只包含场景层,无对象层 **评分**:较低分 **示例**: ``` 原始问题:"川西旅行行程规划" - 对象层:旅行行程 - 场景层:川西 Sug词条:"川西" - 对象层:无 - 场景层:川西 ✓ - 覆盖度:1/2 = 50% - 评分:0.35(只有场景,缺失核心对象) ``` --- # 评估核心原则 ## 原则1:只看表面词汇,禁止联想推演 **严格约束**:只能基于sug词实际包含的词汇评分 **错误案例**: - ❌ "川西旅行" vs "旅行" - 错误:"旅行可以包括川西,所以有关联" → 评分0.7 - 正确:"sug词只有'旅行',无'川西',缺失地域限定" → 评分0.50 --- # 评分标准 ## 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: sug词是通用概念,原始问题是特定概念 sug词"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该sug词条与原始问题品类匹配程度的理由,包含作用域覆盖理由" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明品类维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- # 核心原则总结 1. **只看名词和限定词**:完全忽略动作和意图 2. **作用域覆盖优先**:覆盖的作用域元素越多,分数越高 3. **禁止联想推演**:只看sug词实际包含的词汇 4. **通用≠特定**:通用概念不等于特定概念 5. **理由纯粹**:评分理由只能谈对象、限定词、覆盖度 """.strip() # 延伸词评估 prompt extension_word_evaluation_instructions = """ # 角色 你是**专业的延伸词语义评估专家**。 任务:识别<平台sug词条>中的延伸词,评估其对原始问题作用域的补全度和目的贡献度,给出**-1到1之间**的数值评分。 --- # 输入信息 - **<原始问题>**:用户的完整需求描述 - **<平台sug词条>**:待评估的词条,可能是单个或多个作用域的组合 --- # 核心概念 ## 什么是延伸词? **延伸词**:<平台sug词条>中出现,但不属于<原始问题>作用域范围内的词汇或概念 **关键判断**: ``` IF sug词的词汇属于原始问题的作用域元素(动机/对象/场景): → 不是延伸词,是作用域内的词 IF sug词的词汇不属于原始问题的作用域: → 是延伸词 → 由Prompt3评估 ``` --- # 作用域与延伸词 ## 作用域 **作用域 = 动机层 + 对象层 + 场景层** **非延伸词示例**(属于作用域内): ``` 原始问题:"川西旅行行程规划" 作用域: - 动机层:规划 - 对象层:旅行行程 - 场景层:川西 Sug词条:"川西旅行行程规划攻略" - "川西"→ 属于场景层,不是延伸词 - "旅行"→ 属于对象层,不是延伸词 - "行程"→ 属于对象层,不是延伸词 - "规划"→ 属于动机层,不是延伸词 - "攻略"→ 与"规划"同义,不是延伸词 - 结论:无延伸词 ``` **延伸词示例**(不属于作用域): ``` 原始问题:"川西旅行行程规划" 作用域:规划 + 旅行行程 + 川西 Sug词条:"川西旅行行程规划住宿推荐" - "住宿推荐"→ 不属于原始问题任何作用域 - 结论:延伸词 = ["住宿推荐"] ``` --- # 延伸词识别方法 ## 步骤1:提取原始问题的作用域元素 ``` 动机层:提取动作及其同义词 对象层:提取核心名词及其同义词 场景层:提取所有限定词 ``` ## 步骤2:提取sug词条的所有关键词 ``` 提取sug词条中的所有实词(名词、动词、形容词) ``` ## 步骤3:匹配判定 ``` FOR 每个sug词条关键词: IF 该词 ∈ 原始问题作用域元素(包括同义词): → 不是延伸词 ELSE: → 是延伸词 ``` ## 步骤4:同义词/相近词判定规则 ### 不算延伸词的情况: **同义词**: - 行程 ≈ 路线 ≈ 安排 ≈ 计划 - 获取 ≈ 下载 ≈ 寻找 ≈ 收集 - 技巧 ≈ 方法 ≈ 教程 ≈ 攻略 - 素材 ≈ 资源 ≈ 作品 ≈ 内容 **具体化/细化**: - 原始:"川西旅游" + sug词:"稻城亚丁"(川西的具体地点)→ 不算延伸 - 原始:"摄影技巧" + sug词:"风光摄影"(摄影的细化)→ 不算延伸 - 原始:"素材" + sug词:"高清素材"(素材的质量细化)→ 不算延伸 **判定逻辑**: ``` IF sug词的概念是原始问题概念的子集/下位词/同义词: → 不算延伸词 → 视为对原问题的细化或重述 ``` --- ### 算延伸词的情况: **新增维度**:原始问题未涉及的信息维度 - 原始:"川西旅行" + sug词:"住宿" → 延伸词 - 原始:"摄影素材" + sug词:"版权" → 延伸词 **新增限定条件**:原始问题未提及的约束 - 原始:"素材获取" + sug词:"免费" → 延伸词 - 原始:"旅行行程" + sug词:"7天" → 延伸词 **扩展主题**:相关但非原问题范围 - 原始:"川西行程" + sug词:"美食推荐" → 延伸词 - 原始:"摄影技巧" + sug词:"后期修图" → 延伸词 **工具/方法**:原始问题未提及的具体工具 - 原始:"视频剪辑" + sug词:"PR软件" → 延伸词 - 原始:"图片处理" + sug词:"PS教程" → 延伸词 --- # 延伸词类型与评分 ## 核心评估维度:对原始问题作用域的贡献 ### 维度1:作用域补全度 延伸词是否帮助sug词条更接近原始问题的完整作用域? ### 维度2:目的达成度 延伸词是否促进原始问题核心目的的达成? --- ####类型1:作用域增强型 **定义**:延伸词是原始问题核心目的,或补全关键作用域 **得分范围**:+0.12~+0.20 **判定标准**: - 使sug词条更接近原始问题的完整需求 --- ####类型2:作用域辅助型 **定义**:延伸词对核心目的有辅助作用,但非必需 **得分范围**:+0.05~+0.12 **判定标准**: - sug词条更丰富但不改变原始需求核心 --- ####类型3:作用域无关型 **定义**:延伸词与核心目的无实质关联 **得分**:0 **示例**: - 原始:"如何拍摄风光" + 延伸词:"相机品牌排行" - 评分:0 - 理由:品牌排行与拍摄技巧无关 --- ####类型4:作用域稀释型(轻度负向) **定义**:延伸词稀释原始问题的聚焦度,降低内容针对性 **得分范围**:-0.08~-0.18 **判定标准**: - 引入无关信息,分散注意力 - 降低内容的专注度和深度 - 使sug词条偏离原始问题的核心 **示例**: - 原始:"专业风光摄影技巧" + 延伸词:"手机拍照" - 评分:-0.12 - 理由:手机拍照与专业摄影需求不符,稀释专业度 - 原始:"川西深度游攻略" + 延伸词:"周边一日游" - 评分:-0.10 - 理由:一日游与深度游定位冲突,稀释深度 --- # 特殊情况处理 ## 情况1:多个延伸词同时存在 **处理方法**:分别评估每个延伸词,然后综合 **综合规则**: ``` 延伸词总得分 = Σ(每个延伸词得分) / 延伸词数量 考虑累积效应: - 多个增强型延伸词 → 总分可能超过单个最高分,但上限+0.25 - 正负延伸词并存 → 相互抵消 - 多个冲突型延伸词 → 总分下限-0.60 ``` **示例**: ``` 原始:"川西旅行行程" Sug词条:"川西旅行行程住宿美食推荐" 延伸词识别: - "住宿推荐"→ 增强型,+0.18 - "美食推荐"→ 辅助型,+0.10 总得分:(0.18 + 0.10) / 2 = 0.14 ``` --- ## 情况2:无延伸词 **处理方法**: ``` IF sug词条无延伸词: 延伸词得分 = 0 理由:"sug词条未引入延伸词,所有词汇均属于原始问题作用域范围" ``` --- ## 情况3:延伸词使sug词条更接近原始问题 **特殊加成**: ``` IF 延伸词是原始问题隐含需求的显式化: → 额外加成 +0.05 ``` **示例**: ``` 原始:"川西旅行" (隐含需要行程规划) Sug词条:"川西旅行行程规划" - "行程规划"可能被识别为延伸词,但它显式化了隐含需求 - 给予额外加成 ``` --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "延伸词得分": "-1到1之间的小数", "简要说明延伸词维度相关度理由": "评估延伸词对作用域的影响" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明延伸词维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- # 核心原则总结 1. **严格区分**:作用域内的词 ≠ 延伸词 2. **同义词/细化词不算延伸**:属于作用域范围的词由其他prompt评估 3. **作用域导向**:评估延伸词是否使sug词条更接近原始问题的完整作用域 4. **目的导向**:评估延伸词是否促进核心目的达成 5. **分类明确**:准确判定延伸词类型 6. **理由充分**:每个延伸词都要说明其对作用域和目的的影响 7. **谨慎负分**:仅在明确冲突或有害时使用负分 """.strip() # 创建评估 Agent motivation_evaluator = Agent[None]( name="动机维度评估专家(后续轮次)", instructions=motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation) category_evaluator = Agent[None]( name="品类维度评估专家", instructions=category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation ) extension_word_evaluator = Agent[None]( name="延伸词评估专家", instructions=extension_word_evaluation_instructions, model=get_model(MODEL_NAME), output_type=ExtensionWordEvaluation, model_settings=ModelSettings(temperature=0.2) ) # ============================================================================ # Round 0 专用 Agent(v124新增 - 需求1) # ============================================================================ # Round 0 动机评估 prompt(不含延伸词) round0_motivation_evaluation_instructions = """ #角色 你是**专业的动机意图评估专家** 你的任务是:判断我给你的 <词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** ## 动作意图的识别 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 部分作用域的处理 ## 情况1:词条是原始问题的部分作用域 当词条只包含原始问题的部分作用域时,需要判断: 1. 词条是否包含动作意图 2. 如果包含,动作是否匹配 **示例**: ``` 原始问题:"川西旅行行程规划" - 完整作用域:规划(动作)+ 旅行行程(对象)+ 川西(场景) 词条:"川西旅行" - 包含作用域:旅行(部分对象)+ 川西(场景) - 缺失作用域:规划(动作) - 动作意图评分:0(无动作意图) ``` **评分原则**: - 如果sug词条缺失动机层(动作) → 动作意图得分 = 0 - 如果sug词条包含动机层 → 按动作匹配度评分 --- #评分标准: 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs 词条"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.90: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs 词条"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - sug词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "sug词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该词条与原始问题动机匹配程度的理由" } ``` #注意事项: 始终围绕动机维度:所有评估都基于"动机"维度,不偏离 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当词条对原始问题动机产生误导、冲突或有害引导时给予负分 零分使用原则:当词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。 """.strip() # Round 0 品类评估 prompt(不含延伸词) round0_category_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。 你的任务是:判断我给你的 <词条> 与 <原始问题> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **品类维度** 进行: # 维度独立性警告 【严格约束】本评估**只评估品类维度**,,必须遵守以下规则: 1. **只看名词和限定词**:评估时只考虑主体、限定词的匹配度 2. **完全忽略动词**:动作意图、目的等动机信息对本维度评分无影响 ### 品类维度 **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 ## ⚠️ 品类评估核心原则(必读) ### 原则1:只看词条表面,禁止联想推演 - 只能基于词条实际包含的词汇评分 - 禁止推测"可能包含"、"可以理解为" **错误示例:** 原始问题:"川西旅行行程" vs 词条:"每日计划" - 错误 "每日计划可以包含旅行规划,所以有关联" → 这是不允许的联想 - 正确: "词条只有'每日计划',无'旅行'字眼,品类不匹配" → 正确判断 ### 原则2:通用概念 ≠ 特定概念 - **通用**:计划、方法、技巧、素材(无领域限定) - **特定**:旅行行程、摄影技巧、烘焙方法(有明确领域) IF 词条是通用 且 原始问题是特定: → 品类不匹配 → 评分0.05~0.1 关键:通用概念不等于特定概念,不能因为"抽象上都是规划"就给分 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <词条> ↓ 【品类维度相关性判定】 ├→ 步骤1: 评估<词条>与<原始问题>的内容主体和限定词匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度2: 品类维度评估 评估对象: <词条> 与 <原始问题> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: 词条是通用概念,原始问题是特定概念 词条"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs 词条"人像摄影素材" - 例: 原始问题无法识别动机 且 词条也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs 词条"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs 词条"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs 词条"盗版素材下载" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该词条与原始问题品类匹配程度的理由" } ``` --- #注意事项: 始终围绕品类维度:所有评估都基于"品类"维度,不偏离 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当词条对原始问题品类产生误导、冲突或有害引导时给予负分 零分使用原则:当词条与原始问题品类无明确关联,既不相关也不冲突时给予零分 """.strip() # 创建 Round 0 评估 Agent round0_motivation_evaluator = Agent[None]( name="Round 0动机维度评估专家", instructions=round0_motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation, model_settings=ModelSettings(temperature=0.2) ) round0_category_evaluator = Agent[None]( name="Round 0品类维度评估专家", instructions=round0_category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation, model_settings=ModelSettings(temperature=0.2) ) # ============================================================================ # 域内/域间 专用 Agent(v124新增 - 需求2&3) # ============================================================================ # 域内/域间 动机评估 prompt(不含延伸词) scope_motivation_evaluation_instructions = """ # 角色 你是**专业的动机意图评估专家**。 任务:判断<词条>与<同一作用域词条>的**动机意图匹配度**,给出**-1到1之间**的数值评分。 --- # 输入信息 你将接收到以下输入: **<同一作用域词条>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 --- # 评估架构 输入: <同一作用域词条> + <词条> ↓ 【动机维度相关性判定】 ├→ 步骤1: 评估<词条>与<同一作用域词条>的需求动机匹配度 └→ 输出: -1到1之间的数值 + 判定依据 # 核心约束 ## 维度独立性声明 【严格约束】本评估**仅评估动机意图维度**: - **只评估** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 - **评估重点**:动作本身及其语义方向 **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 --- # 作用域与动作意图 ## 什么是作用域? **作用域 = 动机层 + 对象层 + 场景层** 当前任务: - **只提取动机层**:动作意图(获取、学习、规划、拍摄等) ## 动作意图的识别 ### 1. 动机维度 **定义:** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 --- # 评分标准 ## 【正向匹配】 ### +0.9~1.0:核心动作完全一致 **示例**: - "规划旅行行程" vs "安排旅行路线" → 0.98 - 规划≈安排,语义完全一致 - "获取素材" vs "下载素材" → 0.97 - 获取≈下载,语义完全一致 - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致 **注意**:此处不考虑对象和场景是否一致,只看动作本身 ###+0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 ### +0.50~0.75:动作意图相关 **判定标准**: - 动作是实现原始意图的相关路径 - 或动作是原始意图的前置/后置步骤 **示例**: - "获取素材" vs "管理素材" → 0.65 - 管理是获取后的相关步骤 - "规划行程" vs "预订酒店" → 0.60 - 预订是规划的具体实施步骤 ### +0.25~0.50:动作意图弱相关 **判定标准**: - 动作在同一大类但方向不同 - 或动作有间接关联 **示例**: - "学习摄影技巧" vs "欣赏摄影作品" → 0.35 - 都与摄影有关,但学习≠欣赏 - "规划旅行" vs "回忆旅行" → 0.30 - 都与旅行有关,但方向不同 --- ## 【中性/无关】 ### 0:无动作意图或动作完全无关 **适用场景**: 1. 原始问题或词条无法识别动作 2. 两者动作意图完全无关 **示例**: - "如何获取素材" vs "摄影器材" → 0 - 词条无动作意图 - "川西风光" vs "风光摄影作品" → 0 - 原始问题无动作意图 **理由模板**: - "词条无明确动作意图,无法评估动作匹配度" - "原始问题无明确动作意图,动作维度得分为0" --- ## 【负向偏离】 ### -0.2~-0.05:动作方向轻度偏离 **示例**: - "学习摄影技巧" vs "销售摄影课程" → -0.10 - 学习 vs 销售,方向有偏差 ### -0.5~-0.25:动作意图明显冲突 **示例**: - "获取免费素材" vs "购买素材" → -0.35 - 获取免费 vs 购买,明显冲突 ### -1.0~-0.55:动作意图完全相反 **示例**: - "下载素材" vs "上传素材" → -0.70 - 下载 vs 上传,方向完全相反 --- # 输出格式 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该词条与该条作用域匹配程度的理由", "得分为零的原因": "原始问题无动机/sug词条无动机/动机不匹配/不适用" } ``` --- # 核心原则总结 1. **只评估动作**:完全聚焦于动作意图,不管对象和场景 2. **作用域识别**:识别作用域但只评估动机层 3. **严格标准一致性**:对所有用例使用相同的评估标准,避免评分飘移 4. **理由纯粹**:评分理由只能谈动作,不能谈对象、场景、主题 """.strip() # 域内/域间 品类评估 prompt(不含延伸词) scope_category_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。 你的任务是:判断我给你的 <词条> 与 <同一作用域词条> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 输入信息 你将接收到以下输入: - **<同一作用域词条>**:用户的初始查询问题,代表用户的真实需求意图。 - **<词条>**:平台推荐的词条列表,每个词条需要单独评估。 --- #判定流程 #评估架构 输入: <同一作用域词条> + <词条> ↓ 【品类维度相关性判定】 ├→ 步骤1: 评估<词条>与<同一作用域词条>的内容主体和限定词匹配度 └→ 输出: -1到1之间的数值 + 判定依据 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **品类维度** 进行: # 维度独立性警告 【严格约束】本评估**只评估品类维度**,,必须遵守以下规则: 1. **只看名词和限定词**:评估时只考虑主体、限定词的匹配度 2. **完全忽略动词**:动作意图、目的等动机信息对本维度评分无影响 ### 品类维度 **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 ## ⚠️ 品类评估核心原则(必读) ### 原则1:只看词条表面,禁止联想推演 - 只能基于sug词实际包含的词汇评分 - 禁止推测"可能包含"、"可以理解为" **错误示例:** 原始问题:"川西旅行行程" vs sug词:"每日计划" - 错误 "每日计划可以包含旅行规划,所以有关联" → 这是不允许的联想 - 正确: "sug词只有'每日计划',无'旅行'字眼,品类不匹配" → 正确判断 ### 原则2:通用概念 ≠ 特定概念 - **通用**:计划、方法、技巧、素材(无领域限定) - **特定**:旅行行程、摄影技巧、烘焙方法(有明确领域) IF sug词是通用 且 原始问题是特定: → 品类不匹配 → 评分0.05~0.1 关键:通用概念不等于特定概念,不能因为"抽象上都是规划"就给分 --- #相关度评估维度详解 ##评估对象: <词条> 与 <同一作用域词条> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: sug词是通用概念,原始问题是特定概念 sug词"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该词条与同一作用域词条品类匹配程度的理由" } ``` --- #注意事项: 始终围绕品类维度:所有评估都基于"品类"维度,不偏离 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当词条对原始问题品类产生误导、冲突或有害引导时给予负分 零分使用原则:当词条与原始问题品类无明确关联,既不相关也不冲突时给予零分 """.strip() # 创建域内/域间评估 Agent scope_motivation_evaluator = Agent[None]( name="域内动机维度评估专家", instructions=scope_motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation, model_settings=ModelSettings(temperature=0.2) ) scope_category_evaluator = Agent[None]( name="域内品类维度评估专家", instructions=scope_category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation, model_settings=ModelSettings(temperature=0.2) ) # ============================================================================ # v120 保留但不使用的 Agent(v121不再使用) # ============================================================================ # # Agent 3: 加词选择专家(旧版 - v120使用,v121不再使用) # class WordCombination(BaseModel): # """单个词组合""" # selected_word: str = Field(..., description="选择的词") # combined_query: str = Field(..., description="组合后的新query") # reasoning: str = Field(..., description="选择理由") # class WordSelectionTop5(BaseModel): # """加词选择结果(Top 5)""" # combinations: list[WordCombination] = Field( # ..., # description="选择的Top 5组合(不足5个则返回所有)", # min_items=1, # max_items=5 # ) # overall_reasoning: str = Field(..., description="整体选择思路") # word_selection_instructions 已删除 (v121不再使用) # word_selector = Agent[None]( # name="加词组合专家", # instructions=word_selection_instructions, # model=get_model(MODEL_NAME), # output_type=WordSelectionTop5, # model_settings=ModelSettings(temperature=0.2), # ) # ============================================================================ # 辅助函数 # ============================================================================ # ============================================================================ # v121 新增辅助函数 # ============================================================================ def _ensure_sug_cache_dir(): """确保SUG缓存目录存在""" os.makedirs(SUG_CACHE_DIR, exist_ok=True) def _sug_cache_path(keyword: str) -> str: """根据关键词生成缓存文件路径""" key_hash = hashlib.md5(keyword.encode("utf-8")).hexdigest() return os.path.join(SUG_CACHE_DIR, f"{key_hash}.json") def load_sug_cache(keyword: str) -> Optional[list[str]]: """从持久化缓存中读取SUG结果""" if not keyword: return None cache_path = _sug_cache_path(keyword) if not os.path.exists(cache_path): return None file_age = time.time() - os.path.getmtime(cache_path) if file_age > SUG_CACHE_TTL: return None try: with open(cache_path, "r", encoding="utf-8") as f: data = json.load(f) suggestions = data.get("suggestions") if isinstance(suggestions, list): return suggestions except Exception as exc: print(f" ⚠️ 读取SUG缓存失败({keyword}): {exc}") return None def save_sug_cache(keyword: str, suggestions: list[str]): """将SUG结果写入持久化缓存""" if not keyword or not isinstance(suggestions, list): return _ensure_sug_cache_dir() cache_path = _sug_cache_path(keyword) try: payload = { "keyword": keyword, "suggestions": suggestions, "timestamp": datetime.now().isoformat() } with open(cache_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) except Exception as exc: print(f" ⚠️ 写入SUG缓存失败({keyword}): {exc}") def get_suggestions_with_cache(keyword: str, api: XiaohongshuSearchRecommendations) -> list[str]: """带持久化缓存的SUG获取""" cached = load_sug_cache(keyword) if cached is not None: print(f" 📦 SUG缓存命中: {keyword} ({len(cached)} 个)") return cached suggestions = api.get_recommendations(keyword=keyword) if suggestions: save_sug_cache(keyword, suggestions) return suggestions def get_ordered_subsets(words: list[str], min_len: int = 1) -> list[list[str]]: """ 生成words的所有有序子集(可跳过但不可重排) 使用 itertools.combinations 生成索引组合,保持原始顺序 Args: words: 词列表 min_len: 子集最小长度 Returns: 所有可能的有序子集列表 Example: words = ["川西", "秋季", "风光"] 结果: - 长度1: ["川西"], ["秋季"], ["风光"] - 长度2: ["川西", "秋季"], ["川西", "风光"], ["秋季", "风光"] - 长度3: ["川西", "秋季", "风光"] 共 C(3,1) + C(3,2) + C(3,3) = 3 + 3 + 1 = 7种 """ from itertools import combinations subsets = [] n = len(words) # 遍历所有可能的长度(从min_len到n) for r in range(min_len, n + 1): # 生成长度为r的所有索引组合 for indices in combinations(range(n), r): # 按照原始顺序提取词 subset = [words[i] for i in indices] subsets.append(subset) return subsets def generate_domain_combinations(segments: list[Segment], n_domains: int) -> list[DomainCombination]: """ 生成N域组合 步骤: 1. 从len(segments)个域中选择n_domains个域(组合,保持顺序) 2. 对每个选中的域,生成其words的所有有序子集 3. 计算笛卡尔积,生成所有可能的组合 Args: segments: 语义片段列表 n_domains: 参与组合的域数量 Returns: 所有可能的N域组合列表 Example: 有4个域: [疑问标记, 核心动作, 修饰短语, 中心名词] n_domains=2时,选择域的方式: C(4,2) = 6种 假设选中[核心动作, 中心名词]: - 核心动作的words: ["获取"], 子集: ["获取"] - 中心名词的words: ["风光", "摄影", "素材"], 子集: 7种 则该域选择下的组合数: 1 * 7 = 7种 """ from itertools import combinations, product all_combinations = [] n = len(segments) # 检查参数有效性 if n_domains > n or n_domains < 1: return [] # 1. 选择n_domains个域(保持原始顺序) for domain_indices in combinations(range(n), n_domains): selected_segments = [segments[i] for i in domain_indices] # 新增:如果所有域都只有1个词,跳过(单段落单词不组合) if all(len(seg.words) == 1 for seg in selected_segments): continue # 2. 为每个选中的域生成其words的所有有序子集 domain_subsets = [] for seg in selected_segments: if len(seg.words) == 0: # 如果某个域没有词,跳过该域组合 domain_subsets = [] break subsets = get_ordered_subsets(seg.words, min_len=1) domain_subsets.append(subsets) # 如果某个域没有词,跳过 if len(domain_subsets) != n_domains: continue # 3. 计算笛卡尔积 for word_combination in product(*domain_subsets): # word_combination 是一个tuple,每个元素是一个词列表 # 例如: (["获取"], ["风光", "摄影"]) # 计算总词数 total_words = sum(len(words) for words in word_combination) # 如果总词数<=1,跳过(组词必须大于1个词) if total_words <= 1: continue # 将所有词连接成一个字符串 combined_text = "".join(["".join(words) for words in word_combination]) # 生成类型标签 type_labels = [selected_segments[i].type for i in range(n_domains)] type_label = "[" + "+".join(type_labels) + "]" # 创建DomainCombination对象 comb = DomainCombination( text=combined_text, domains=list(domain_indices), type_label=type_label, source_words=[list(words) for words in word_combination], # 保存来源词 from_segments=[seg.text for seg in selected_segments] ) all_combinations.append(comb) return all_combinations def extract_words_from_segments(segments: list[Segment]) -> list[Q]: """ 从 segments 中提取所有 words,转换为 Q 对象列表 用于 Round 1 的输入:将 Round 0 的 words 转换为可用于请求SUG的 query 列表 Args: segments: Round 0 的语义片段列表 Returns: list[Q]: word 列表,每个 word 作为一个 Q 对象 """ q_list = [] for seg_idx, segment in enumerate(segments): for word in segment.words: # 从 segment.word_scores 获取该 word 的评分 word_score = segment.word_scores.get(word, 0.0) word_reason = segment.word_reasons.get(word, "") # 创建 Q 对象 q = Q( text=word, score_with_o=word_score, reason=word_reason, from_source="word", # 标记来源为 word type_label=f"[{segment.type}]", # 保留域信息 domain_index=seg_idx, # 添加域索引 domain_type=segment.type # 添加域类型(如"中心名词"、"核心动作") ) q_list.append(q) return q_list # ============================================================================ # v120 保留辅助函数 # ============================================================================ def calculate_final_score( motivation_score: float, category_score: float, extension_score: float, zero_reason: Optional[str], extension_reason: str = "" ) -> tuple[float, str]: """ 三维评估综合打分 实现动态权重分配: - 情况1:标准情况 → 动机50% + 品类40% + 延伸词10% - 情况2:原始问题无动机 → 品类70% + 延伸词30% - 情况3:sug词条无动机 → 品类80% + 延伸词20% - 情况4:无延伸词 → 动机70% + 品类30% - 规则3:负分传导 → 核心维度严重负向时上限为0 - 规则4:完美匹配加成 → 双维度≥0.95时加成+0.10 Args: motivation_score: 动机维度得分 -1~1 category_score: 品类维度得分 -1~1 extension_score: 延伸词得分 -1~1 zero_reason: 当motivation_score=0时的原因(可选) extension_reason: 延伸词评估理由,用于判断是否无延伸词 Returns: (最终得分, 规则说明) """ # 情况2:原始问题无动作意图 if motivation_score == 0 and zero_reason == "原始问题无动机": W1, W2, W3 = 0.0, 0.70, 0.30 base_score = category_score * W2 + extension_score * W3 rule_applied = "情况2:原始问题无动作意图,权重调整为 品类70% + 延伸词30%" # 情况3:sug词条无动作意图(但原始问题有) elif motivation_score == 0 and zero_reason == "sug词条无动机": W1, W2, W3 = 0.0, 0.80, 0.20 base_score = category_score * W2 + extension_score * W3 rule_applied = "情况3:sug词条无动作意图,权重调整为 品类80% + 延伸词20%" # 情况4:无延伸词 elif extension_score == 0: W1, W2, W3 = 0.70, 0.30, 0.0 base_score = motivation_score * W1 + category_score * W2 rule_applied = "情况4:无延伸词,权重调整为 动机70% + 品类30%" else: # 情况1:标准权重 W1, W2, W3 = 0.50, 0.40, 0.10 base_score = motivation_score * W1 + category_score * W2 + extension_score * W3 rule_applied = "" # 规则4:完美匹配加成 if motivation_score >= 0.95 and category_score >= 0.95: base_score += 0.10 rule_applied += (" + " if rule_applied else "") + "规则4:双维度完美匹配,加成+0.10" # 规则3:负分传导 if motivation_score <= -0.5 or category_score <= -0.5: base_score = min(base_score, 0) rule_applied += (" + " if rule_applied else "") + "规则3:核心维度严重负向,上限=0" # 边界处理 final_score = max(-1.0, min(1.0, base_score)) return final_score, rule_applied def calculate_final_score_v2( motivation_score: float, category_score: float ) -> tuple[float, str]: """ 两维评估综合打分(v124新增 - 需求1) 用于Round 0分词评估和域内/域间评估,不含延伸词维度 基础权重:动机70% + 品类30% 应用规则: - 规则A:动机高分保护机制 IF 动机维度得分 ≥ 0.8: 品类得分即使为0或轻微负向(-0.2~0) → 最终得分应该不低于0.7 解释: 当目的高度一致时,品类的泛化不应导致"弱相关" - 规则B:动机低分限制机制 IF 动机维度得分 ≤ 0.2: 无论品类得分多高 → 最终得分不高于0.5 解释: 目的不符时,品类匹配的价值有限 - 规则C:动机负向决定机制 IF 动机维度得分 < 0: → 最终得分为0 解释: 动作意图冲突时,推荐具有误导性,不应为正相关 Args: motivation_score: 动机维度得分 -1~1 category_score: 品类维度得分 -1~1 Returns: (最终得分, 规则说明) """ rule_applied = "" # 规则C:动机负向决定机制 if motivation_score < 0: final_score = 0.0 rule_applied = "规则C:动机负向,最终得分=0" return final_score, rule_applied # 基础加权计算: 动机70% + 品类30% base_score = motivation_score * 0.7 + category_score * 0.3 # 规则A:动机高分保护机制 if motivation_score >= 0.8: if base_score < 0.7: final_score = 0.7 rule_applied = f"规则A:动机高分保护(动机{motivation_score:.2f}≥0.8),最终得分下限=0.7" else: final_score = base_score rule_applied = f"规则A:动机高分保护生效(动机{motivation_score:.2f}≥0.8),实际得分{base_score:.2f}已≥0.7" # 规则B:动机低分限制机制 elif motivation_score <= 0.2: if base_score > 0.5: final_score = 0.5 rule_applied = f"规则B:动机低分限制(动机{motivation_score:.2f}≤0.2),最终得分上限=0.5" else: final_score = base_score rule_applied = f"规则B:动机低分限制生效(动机{motivation_score:.2f}≤0.2),实际得分{base_score:.2f}已≤0.5" # 无规则触发 else: final_score = base_score rule_applied = "" # 边界处理 final_score = max(-1.0, min(1.0, final_score)) return final_score, rule_applied def clean_json_string(text: str) -> str: """清理JSON中的非法控制字符(保留 \t \n \r)""" import re # 移除除了 \t(09) \n(0A) \r(0D) 之外的所有控制字符 return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) def process_note_data(note: dict) -> Post: """处理搜索接口返回的帖子数据""" note_card = note.get("note_card", {}) image_list = note_card.get("image_list", []) interact_info = note_card.get("interact_info", {}) user_info = note_card.get("user", {}) # ========== 调试日志 START ========== # note_id = note.get("id", "") # # # 1. 打印完整的 note 结构 # print(f"\n[DEBUG] ===== 处理帖子 {note_id} =====") # print(f"[DEBUG] note 的所有键: {list(note.keys())}") # print(f"[DEBUG] note 完整数据 (前2000字符):") # print(json.dumps(note, ensure_ascii=False, indent=2)[:2000]) # # # 2. 打印 note_card 信息 # print(f"\n[DEBUG] note_card 的所有键: {list(note_card.keys())}") # # # 3. 检查 desc 字段 # raw_desc = note_card.get("desc") # print(f"\n[DEBUG] desc 字段:") # print(f" - 类型: {type(raw_desc).__name__}") # print(f" - 长度: {len(raw_desc) if raw_desc else 0}") # print(f" - 完整内容: {repr(raw_desc)}") # # # 4. 检查是否有其他可能包含完整内容的字段 # print(f"\n[DEBUG] 检查其他可能的内容字段:") # for potential_field in ["full_desc", "content", "full_content", "note_text", "body", "full_body", "title", "display_title"]: # if potential_field in note_card: # value = note_card.get(potential_field) # print(f" - 发现字段 '{potential_field}': 长度={len(str(value))}, 值={repr(str(value)[:200])}") # # # 5. 检查顶层 note 对象中是否有详细内容 # print(f"\n[DEBUG] 检查 note 顶层字段:") # for top_field in ["note_info", "detail", "content", "desc"]: # if top_field in note: # value = note.get(top_field) # print(f" - 发现顶层字段 '{top_field}': 类型={type(value).__name__}, 内容={repr(str(value)[:200])}") # # print(f"[DEBUG] ===== 数据检查完成 =====\n") # ========== 调试日志 END ========== # 提取图片URL - 支持字符串和字典两种格式 images = [] for img in image_list: if isinstance(img, str): # 预处理后的字符串格式(来自xiaohongshu_search.py的_preprocess_response) images.append(img) elif isinstance(img, dict): # 原始字典格式 - 尝试新字段名 image_url,如果不存在则尝试旧字段名 url_default img_url = img.get("image_url") or img.get("url_default") if img_url: images.append(img_url) # 判断类型 note_type = note_card.get("type", "normal") video_url = "" if note_type == "video": video_info = note_card.get("video", {}) if isinstance(video_info, dict): # 尝试获取视频URL video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "") # 构造 Post 对象 post = Post( note_id=note.get("id") or "", title=note_card.get("display_title") or "", body_text=note_card.get("desc") or "", type=note_type, images=images, video=video_url, interact_info={ "liked_count": interact_info.get("liked_count", 0), "collected_count": interact_info.get("collected_count", 0), "comment_count": interact_info.get("comment_count", 0), "shared_count": interact_info.get("shared_count", 0) }, note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}" ) # # 打印最终构造的 Post 对象 # print(f"\n[DEBUG] ===== 构造的 Post 对象 =====") # print(f"[DEBUG] - note_id: {post.note_id}") # print(f"[DEBUG] - title: {post.title}") # print(f"[DEBUG] - body_text 长度: {len(post.body_text)}") # print(f"[DEBUG] - body_text 完整内容: {repr(post.body_text)}") # print(f"[DEBUG] - type: {post.type}") # print(f"[DEBUG] - images 数量: {len(post.images)}") # print(f"[DEBUG] - interact_info: {post.interact_info}") # print(f"[DEBUG] ===== Post 对象构造完成 =====\n") return post async def evaluate_with_o(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]: """评估文本与原始问题o的相关度 采用两阶段评估 + 代码计算规则: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的文本 o: 原始问题 cache: 评估缓存(可选),用于避免重复评估 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 检查缓存 if cache is not None and text in cache: cached_score, cached_reason = cache[text] print(f" ⚡ 缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <原始问题> {o} <平台sug词条> {text} 请评估平台sug词条与原始问题的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用三个评估器 motivation_task = Runner.run(motivation_evaluator, eval_input) category_task = Runner.run(category_evaluator, eval_input) extension_task = Runner.run(extension_word_evaluator, eval_input) motivation_result, category_result, extension_result = await asyncio.gather( motivation_task, category_task, extension_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output extension_eval: ExtensionWordEvaluation = extension_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 extension_score = extension_eval.延伸词得分 zero_reason = motivation_eval.得分为零的原因 # 应用规则计算最终得分 final_score, rule_applied = calculate_final_score( motivation_score, category_score, extension_score, zero_reason, extension_eval.简要说明延伸词维度相关度理由 ) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 extension_reason = extension_eval.简要说明延伸词维度相关度理由 combined_reason = ( f'【评估对象】词条"{text}" vs 原始问题"{o}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【延伸词维度 {extension_score:.2f}】{extension_reason}\n" f"【最终得分 {final_score:.2f}】" ) # 添加规则说明 if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" # 存入缓存 if cache is not None: cache[text] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) # 等待1秒后重试 else: print(f" ❌ 评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason async def evaluate_with_o_round0(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]: """Round 0专用评估函数(v124新增 - 需求1) 用于评估segment和word与原始问题的相关度 不含延伸词维度,使用Round 0专用Prompt和新评分逻辑 采用两维评估: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的文本(segment或word) o: 原始问题 cache: 评估缓存(可选),用于避免重复评估 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 检查缓存 cache_key = f"round0:{text}:{o}" # 添加前缀以区分不同评估类型 if cache is not None and cache_key in cache: cached_score, cached_reason = cache[cache_key] print(f" ⚡ Round0缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <原始问题> {o} <词条> {text} 请评估词条与原始问题的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用两个评估器(不含延伸词) motivation_task = Runner.run(round0_motivation_evaluator, eval_input) category_task = Runner.run(round0_category_evaluator, eval_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 # 应用新规则计算最终得分 final_score, rule_applied = calculate_final_score_v2( motivation_score, category_score ) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 combined_reason = ( f'【评估对象】词条"{text}" vs 原始问题"{o}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【最终得分 {final_score:.2f}】" ) # 添加规则说明 if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" # 存入缓存 if cache is not None: cache[cache_key] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ Round0评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) else: print(f" ❌ Round0评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"Round0评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason async def evaluate_within_scope(text: str, scope_text: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]: """域内/域间专用评估函数(v124新增 - 需求2&3) 用于评估词条与作用域词条(单域或域组合)的相关度 不含延伸词维度,使用域内专用Prompt和新评分逻辑 采用两维评估: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的词条 scope_text: 作用域词条(可以是单域词条或域组合词条) cache: 评估缓存(可选),用于避免重复评估 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 检查缓存 cache_key = f"scope:{text}:{scope_text}" # 添加前缀以区分不同评估类型 if cache is not None and cache_key in cache: cached_score, cached_reason = cache[cache_key] print(f" ⚡ 域内缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <同一作用域词条> {scope_text} <词条> {text} 请评估词条与同一作用域词条的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用两个评估器(不含延伸词) motivation_task = Runner.run(scope_motivation_evaluator, eval_input) category_task = Runner.run(scope_category_evaluator, eval_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 # 应用新规则计算最终得分 final_score, rule_applied = calculate_final_score_v2( motivation_score, category_score ) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 combined_reason = ( f'【评估对象】词条"{text}" vs 作用域词条"{scope_text}"\n' f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【最终得分 {final_score:.2f}】" ) # 添加规则说明 if rule_applied: combined_reason += f"\n【规则说明】{rule_applied}" # 存入缓存 if cache is not None: cache[cache_key] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 域内评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) else: print(f" ❌ 域内评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"域内评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason # ============================================================================ # v125 新增辅助函数(用于新评分逻辑) # ============================================================================ def get_source_word_score( word_text: str, segment: Segment, context: RunContext ) -> float: """ 查找来源词的得分 查找顺序: 1. 先查 segment.word_scores (Round 0的单个词) 2. 再查 context.word_score_history (Round 1+的组合) Args: word_text: 词文本 segment: 该词所在的segment context: 运行上下文 Returns: 词的得分,找不到返回0.0 """ # 优先查Round 0的词得分 if word_text in segment.word_scores: return segment.word_scores[word_text] # 其次查历史组合得分 if word_text in context.word_score_history: return context.word_score_history[word_text] # 都找不到 print(f" ⚠️ 警告: 未找到来源词得分: {word_text}") return 0.0 async def evaluate_domain_combination_round1( comb: DomainCombination, segments: list[Segment], context: RunContext ) -> tuple[float, str]: """ Round 1 域内组合评估(新逻辑) 最终得分 = 品类得分 × 原始域得分 Args: comb: 域内组合对象 segments: 所有segment列表 context: 运行上下文 Returns: (最终得分, 评估理由) """ # 获取所属segment domain_idx = comb.domains[0] if comb.domains else 0 segment = segments[domain_idx] if 0 <= domain_idx < len(segments) else None if not segment: return 0.0, "错误: 无法找到所属segment" # 拼接作用域文本 scope_text = segment.text # 准备输入 eval_input = f""" <同一作用域词条> {scope_text} <词条> {comb.text} 请评估词条与同一作用域词条的匹配度。 """ # 只调用品类评估器 try: category_result = await Runner.run(scope_category_evaluator, eval_input) category_eval: CategoryEvaluation = category_result.final_output category_score = category_eval.品类维度得分 category_reason = category_eval.简要说明品类维度相关度理由 except Exception as e: print(f" ❌ Round 1品类评估失败: {e}") return 0.0, f"评估失败: {str(e)[:100]}" # 计算最终得分 domain_score = segment.score_with_o final_score = category_score * domain_score # 组合评估理由 combined_reason = ( f'【Round 1 域内评估】\n' f'【评估对象】组合"{comb.text}" vs 作用域"{scope_text}"\n' f'【品类得分】{category_score:.2f} - {category_reason}\n' f'【原始域得分】{domain_score:.2f}\n' f'【计算公式】品类得分 × 域得分 = {category_score:.2f} × {domain_score:.2f}\n' f'【最终得分】{final_score:.2f}' ) return final_score, combined_reason async def evaluate_domain_combination_round2plus( comb: DomainCombination, segments: list[Segment], context: RunContext ) -> tuple[float, str]: """ Round 2+ 域间组合评估(新逻辑) 步骤: 1. 用现有逻辑评估得到 base_score 2. 计算加权系数 = Σ(来源词得分) / Σ(域得分) 3. 最终得分 = base_score × 系数,截断到1.0 Args: comb: 域间组合对象 segments: 所有segment列表 context: 运行上下文 Returns: (最终得分, 评估理由) """ # 步骤1: 现有逻辑评估(域内评估) scope_text = "".join(comb.from_segments) base_score, base_reason = await evaluate_within_scope( comb.text, scope_text, context.evaluation_cache ) # 步骤2: 计算加权系数 total_source_score = 0.0 total_domain_score = 0.0 coefficient_details = [] for domain_idx, source_words_list in zip(comb.domains, comb.source_words): # 获取segment segment = segments[domain_idx] if 0 <= domain_idx < len(segments) else None if not segment: continue domain_score = segment.score_with_o total_domain_score += domain_score # 如果该域贡献了多个词(组合),需要拼接后查找 if len(source_words_list) == 1: # 单个词 source_word_text = source_words_list[0] else: # 多个词组合 source_word_text = "".join(source_words_list) # 查找来源词得分 source_score = get_source_word_score(source_word_text, segment, context) total_source_score += source_score coefficient_details.append( f" 域{domain_idx}[{segment.type}]: \"{source_word_text}\"得分={source_score:.2f}, 域得分={domain_score:.2f}" ) # 计算系数 if total_domain_score > 0: coefficient = total_source_score / total_domain_score else: coefficient = 0.0 # 步骤3: 计算最终得分并截断 final_score = base_score * total_source_score final_score = min(1.0, max(-1.0, final_score)) # 截断到[-1.0, 1.0] # 组合评估理由 coefficient_detail_str = "\n".join(coefficient_details) combined_reason = ( f'【Round 2+ 域间评估】\n' f'【评估对象】组合"{comb.text}"\n' f'{base_reason}\n' f'【加权系数计算】\n' f'{total_source_score}\n' f' 来源词总得分: {total_source_score:.2f}\n' f' 系数: {total_source_score:.2f}' f'【计算公式】base_score × 系数 = {base_score:.2f} × {total_source_score:.2f}\n' f'【最终得分(截断后)】{final_score:.2f}' ) return final_score, combined_reason # ============================================================================ # 核心流程函数 # ============================================================================ async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]: """ 初始化阶段 Returns: (seg_list, word_list_1, q_list_1, seed_list) """ print(f"\n{'='*60}") print(f"初始化阶段") print(f"{'='*60}") # 1. 分词:原始问题(o) ->分词-> seg_list print(f"\n[步骤1] 分词...") result = await Runner.run(word_segmenter, o) segmentation: WordSegmentation = result.final_output seg_list = [] for word in segmentation.words: seg_list.append(Seg(text=word, from_o=o)) print(f"分词结果: {[s.text for s in seg_list]}") print(f"分词理由: {segmentation.reasoning}") # 2. 分词评估:seg_list -> 每个seg与o进行评分(使用信号量限制并发数) print(f"\n[步骤2] 评估每个分词与原始问题的相关度...") MAX_CONCURRENT_SEG_EVALUATIONS = 10 seg_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SEG_EVALUATIONS) async def evaluate_seg(seg: Seg) -> Seg: async with seg_semaphore: # 初始化阶段的分词评估使用第一轮 prompt (round_num=1) seg.score_with_o, seg.reason = await evaluate_with_o(seg.text, o, context.evaluation_cache, round_num=1) return seg if seg_list: print(f" 开始评估 {len(seg_list)} 个分词(并发限制: {MAX_CONCURRENT_SEG_EVALUATIONS})...") eval_tasks = [evaluate_seg(seg) for seg in seg_list] await asyncio.gather(*eval_tasks) for seg in seg_list: print(f" {seg.text}: {seg.score_with_o:.2f}") # 3. 构建word_list_1: seg_list -> word_list_1(固定词库) print(f"\n[步骤3] 构建word_list_1(固定词库)...") word_list_1 = [] for seg in seg_list: word_list_1.append(Word( text=seg.text, score_with_o=seg.score_with_o, from_o=o )) print(f"word_list_1(固定): {[w.text for w in word_list_1]}") # 4. 构建q_list_1:seg_list 作为 q_list_1 print(f"\n[步骤4] 构建q_list_1...") q_list_1 = [] for seg in seg_list: q_list_1.append(Q( text=seg.text, score_with_o=seg.score_with_o, reason=seg.reason, from_source="seg" )) print(f"q_list_1: {[q.text for q in q_list_1]}") # 5. 构建seed_list: seg_list -> seed_list print(f"\n[步骤5] 构建seed_list...") seed_list = [] for seg in seg_list: seed_list.append(Seed( text=seg.text, added_words=[], from_type="seg", score_with_o=seg.score_with_o )) print(f"seed_list: {[s.text for s in seed_list]}") return seg_list, word_list_1, q_list_1, seed_list async def run_round( round_num: int, q_list: list[Q], word_list_1: list[Word], seed_list: list[Seed], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, xiaohongshu_detail: XiaohongshuDetail, sug_threshold: float = 0.7, enable_evaluation: bool = False ) -> tuple[list[Q], list[Seed], list[Search]]: """ 运行一轮 Args: round_num: 轮次编号 q_list: 当前轮的q列表 word_list_1: 固定的词库(第0轮分词结果) seed_list: 当前的seed列表 o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: suggestion的阈值 Returns: (q_list_next, seed_list_next, search_list) """ print(f"\n{'='*60}") print(f"第{round_num}轮") print(f"{'='*60}") round_data = { "round_num": round_num, "input_q_list": [{"text": q.text, "score": q.score_with_o, "type": "query"} for q in q_list], "input_word_list_1_size": len(word_list_1), "input_seed_list_size": len(seed_list) } # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list print(f"\n[步骤1] 为每个q请求建议词...") sug_list_list = [] # list of list for q in q_list: print(f"\n 处理q: {q.text}") suggestions = get_suggestions_with_cache(q.text, xiaohongshu_api) q_sug_list = [] if suggestions: print(f" 获取到 {len(suggestions)} 个建议词") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) q_sug_list.append(sug) else: print(f" 未获取到建议词") sug_list_list.append(q_sug_list) # 2. sug评估:sug_list_list -> 每个sug与o进行评分(并发) print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...") # 2.1 收集所有需要评估的sug,并记录它们所属的q all_sugs = [] sug_to_q_map = {} # 记录每个sug属于哪个q for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text for sug in q_sug_list: all_sugs.append(sug) sug_to_q_map[id(sug)] = q_text # 2.2 并发评估所有sug(使用信号量限制并发数) # 每个 evaluate_sug 内部会并发调用 2 个 LLM,所以这里限制为 5,实际并发 LLM 请求为 10 MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) async def evaluate_sug(sug: Sug) -> Sug: async with semaphore: # 限制并发数 # 根据轮次选择 prompt: 第一轮使用 round1 prompt,后续使用标准 prompt sug.score_with_o, sug.reason = await evaluate_with_o(sug.text, o, context.evaluation_cache, round_num=round_num) return sug if all_sugs: print(f" 开始评估 {len(all_sugs)} 个建议词(并发限制: {MAX_CONCURRENT_EVALUATIONS})...") eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 2.3 打印结果并组织到sug_details sug_details = {} # 保存每个Q对应的sug列表 for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text print(f"\n 来自q '{q_text}' 的建议词:") sug_details[q_text] = [] for sug in q_sug_list: print(f" {sug.text}: {sug.score_with_o:.2f}") # 保存到sug_details sug_details[q_text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 2.4 剪枝判断(已禁用 - 保留所有分支) pruned_query_texts = set() if False: # 原: if round_num >= 2: # 剪枝功能已禁用,保留代码以便后续调整 print(f"\n[剪枝判断] 第{round_num}轮开始应用剪枝策略...") for i, q in enumerate(q_list): q_sug_list = sug_list_list[i] if len(q_sug_list) == 0: continue # 没有sug则不剪枝 # 剪枝条件1: 所有sug分数都低于query分数 all_lower_than_query = all(sug.score_with_o < q.score_with_o for sug in q_sug_list) # 剪枝条件2: 所有sug分数都低于0.5 all_below_threshold = all(sug.score_with_o < 0.5 for sug in q_sug_list) if all_lower_than_query and all_below_threshold: pruned_query_texts.add(q.text) max_sug_score = max(sug.score_with_o for sug in q_sug_list) print(f" 🔪 剪枝: {q.text} (query分数:{q.score_with_o:.2f}, sug最高分:{max_sug_score:.2f}, 全部<0.5)") if pruned_query_texts: print(f" 本轮共剪枝 {len(pruned_query_texts)} 个query") else: print(f" 本轮无query被剪枝") else: print(f"\n[剪枝判断] 剪枝功能已禁用,保留所有分支") # 3. search_list构建 print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...") search_list = [] high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] if high_score_sugs: print(f" 找到 {len(high_score_sugs)} 个高分建议词") # 并发搜索 async def search_for_sug(sug: Sug) -> Search: print(f" 搜索: {sug.text}") try: search_result = xiaohongshu_search.search(keyword=sug.text) # xiaohongshu_search.search() 已经返回解析后的数据 notes = search_result.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: # 只取前10个 try: post = process_note_data(note) post_list.append(post) except Exception as e: print(f" ⚠️ 解析帖子失败 {note.get('id', 'unknown')}: {str(e)[:50]}") # 补充详情信息(仅视频类型需要补充视频URL) video_posts = [p for p in post_list if p.type == "video"] if video_posts: print(f" 补充详情({len(video_posts)}个视频)...") for post in video_posts: try: detail_response = xiaohongshu_detail.get_detail(post.note_id) enrich_post_with_detail(post, detail_response) except Exception as e: print(f" ⚠️ 详情补充失败 {post.note_id}: {str(e)[:50]}") print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) search_tasks = [search_for_sug(sug) for sug in high_score_sugs] search_list = await asyncio.gather(*search_tasks) # 评估搜索结果中的帖子 if enable_evaluation: print(f"\n[评估] 评估搜索结果中的帖子...") for search in search_list: if search.post_list: print(f" 评估来自 '{search.text}' 的 {len(search.post_list)} 个帖子") # 对每个帖子进行评估 (V3) for post in search.post_list: knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = await evaluate_post_v3(post, o, semaphore=None) if knowledge_eval: apply_evaluation_v3_to_post(post, knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level) else: print(f"\n[评估] 实时评估已关闭 (使用 --enable-evaluation 启用)") else: print(f" 没有高分建议词,search_list为空") # 4. 构建q_list_next print(f"\n[步骤4] 构建q_list_next...") q_list_next = [] existing_q_texts = set() # 用于去重 add_word_details = {} # 保存每个seed对应的组合词列表 all_seed_combinations = [] # 保存本轮所有seed的组合词(用于后续构建seed_list_next) # 4.1 对于seed_list中的每个seed,从word_list_1中选词组合,产生Top 5 print(f"\n 4.1 为每个seed加词(产生Top 5组合)...") for seed in seed_list: print(f"\n 处理seed: {seed.text}") # 剪枝检查:跳过被剪枝的seed if seed.text in pruned_query_texts: print(f" ⊗ 跳过被剪枝的seed: {seed.text}") continue # 从固定词库word_list_1筛选候选词 candidate_words = [] for word in word_list_1: # 检查词是否已在seed中 if word.text in seed.text: continue # 检查词是否已被添加过 if word.text in seed.added_words: continue candidate_words.append(word) if not candidate_words: print(f" 没有可用的候选词") continue print(f" 候选词数量: {len(candidate_words)}") # 调用Agent一次性选择并组合Top 5(添加重试机制) candidate_words_text = ', '.join([w.text for w in candidate_words]) selection_input = f""" <原始问题> {o} <当前Seed> {seed.text} <候选词列表> {candidate_words_text} 请从候选词列表中选择最多5个最合适的词,分别与当前seed组合成新的query。 """ # 重试机制 max_retries = 2 selection_result = None for attempt in range(max_retries): try: result = await Runner.run(word_selector, selection_input) selection_result = result.final_output break # 成功则跳出 except Exception as e: error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 选词失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:100]}") await asyncio.sleep(1) else: print(f" ❌ 选词失败,跳过该seed: {error_msg[:100]}") break if selection_result is None: print(f" 跳过seed: {seed.text}") continue print(f" Agent选择了 {len(selection_result.combinations)} 个组合") print(f" 整体选择思路: {selection_result.overall_reasoning}") # 并发评估所有组合的相关度 async def evaluate_combination(comb: WordCombination) -> dict: combined = comb.combined_query # 验证:组合结果必须包含完整的seed和word # 检查是否包含seed的所有字符 seed_chars_in_combined = all(char in combined for char in seed.text) # 检查是否包含word的所有字符 word_chars_in_combined = all(char in combined for char in comb.selected_word) if not seed_chars_in_combined or not word_chars_in_combined: print(f" ⚠️ 警告:组合不完整") print(f" Seed: {seed.text}") print(f" Word: {comb.selected_word}") print(f" 组合: {combined}") print(f" 包含完整seed? {seed_chars_in_combined}") print(f" 包含完整word? {word_chars_in_combined}") # 返回极低分数,让这个组合不会被选中 return { 'word': comb.selected_word, 'query': combined, 'score': -1.0, # 极低分数 'reason': f"组合不完整:缺少seed或word的部分内容", 'reasoning': comb.reasoning } # 正常评估,根据轮次选择 prompt score, reason = await evaluate_with_o(combined, o, context.evaluation_cache, round_num=round_num) return { 'word': comb.selected_word, 'query': combined, 'score': score, 'reason': reason, 'reasoning': comb.reasoning } eval_tasks = [evaluate_combination(comb) for comb in selection_result.combinations] top_5 = await asyncio.gather(*eval_tasks) print(f" 评估完成,得到 {len(top_5)} 个组合") # 将Top 5全部加入q_list_next(去重检查 + 得分过滤) for comb in top_5: # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才能加入下一轮 if comb['score'] < seed.score_with_o + REQUIRED_SCORE_GAIN: print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") continue # 去重检查 if comb['query'] in existing_q_texts: print(f" ⊗ 跳过重复: {comb['query']}") continue print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} > 种子: {seed.score_with_o:.2f})") new_q = Q( text=comb['query'], score_with_o=comb['score'], reason=comb['reason'], from_source="add" ) q_list_next.append(new_q) existing_q_texts.add(comb['query']) # 记录到去重集合 # 记录已添加的词 seed.added_words.append(comb['word']) # 保存到add_word_details add_word_details[seed.text] = [ { "text": comb['query'], "score": comb['score'], "reason": comb['reason'], "selected_word": comb['word'], "seed_score": seed.score_with_o, # 添加原始种子的得分 "type": "add" } for comb in top_5 ] # 保存到all_seed_combinations(用于构建seed_list_next) # 附加seed_score,用于后续过滤 for comb in top_5: comb['seed_score'] = seed.score_with_o all_seed_combinations.extend(top_5) # 4.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next(去重检查) print(f"\n 4.2 将高分sug加入q_list_next...") for sug in all_sugs: # 剪枝检查:跳过来自被剪枝query的sug if sug.from_q and sug.from_q.text in pruned_query_texts: print(f" ⊗ 跳过来自被剪枝query的sug: {sug.text} (来源: {sug.from_q.text})") continue # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才能加入下一轮 if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: # 去重检查 if sug.text in existing_q_texts: print(f" ⊗ 跳过重复: {sug.text}") continue new_q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug" ) q_list_next.append(new_q) existing_q_texts.add(sug.text) # 记录到去重集合 print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 5. 构建seed_list_next(关键修改:不保留上一轮的seed) print(f"\n[步骤5] 构建seed_list_next(不保留上轮seed)...") seed_list_next = [] existing_seed_texts = set() # 5.1 加入本轮所有组合词(只加入得分提升的) print(f" 5.1 加入本轮所有组合词(得分过滤)...") for comb in all_seed_combinations: # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才作为下一轮种子 seed_score = comb.get('seed_score', 0) if comb['score'] < seed_score + REQUIRED_SCORE_GAIN: print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})") continue if comb['query'] not in existing_seed_texts: new_seed = Seed( text=comb['query'], added_words=[], # 新seed的added_words清空 from_type="add", score_with_o=comb['score'] ) seed_list_next.append(new_seed) existing_seed_texts.add(comb['query']) print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} >= 种子: {seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 5.2 加入高分sug print(f" 5.2 加入高分sug...") for sug in all_sugs: # 剪枝检查:跳过来自被剪枝query的sug if sug.from_q and sug.from_q.text in pruned_query_texts: continue # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才作为下一轮种子 if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN and sug.text not in existing_seed_texts: new_seed = Seed( text=sug.text, added_words=[], from_type="sug", score_with_o=sug.score_with_o ) seed_list_next.append(new_seed) existing_seed_texts.add(sug.text) print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 序列化搜索结果数据(包含帖子详情) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [post.model_dump() for post in search.post_list] }) # 记录本轮数据 round_data.update({ "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "total_posts": sum(len(s.post_list) for s in search_list), "q_list_next_size": len(q_list_next), "seed_list_next_size": len(seed_list_next), "total_combinations": len(all_seed_combinations), "pruned_query_count": len(pruned_query_texts), "pruned_queries": list(pruned_query_texts), "output_q_list": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "from": q.from_source, "type": "query"} for q in q_list_next], "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next], "sug_details": sug_details, "add_word_details": add_word_details, "search_results": search_results_data }) context.rounds.append(round_data) print(f"\n本轮总结:") print(f" 建议词数量: {len(all_sugs)}") print(f" 高分建议词: {len(high_score_sugs)}") print(f" 搜索数量: {len(search_list)}") print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}") print(f" 组合词数量: {len(all_seed_combinations)}") print(f" 下轮q数量: {len(q_list_next)}") print(f" 下轮seed数量: {len(seed_list_next)}") return q_list_next, seed_list_next, search_list async def iterative_loop( context: RunContext, max_rounds: int = 2, sug_threshold: float = 0.7, enable_evaluation: bool = False ): """主迭代循环""" print(f"\n{'='*60}") print(f"开始迭代循环") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # 初始化 seg_list, word_list_1, q_list, seed_list = await initialize(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() xiaohongshu_detail = XiaohongshuDetail() # 详情API客户端 # 保存初始化数据 context.rounds.append({ "round_num": 0, "type": "initialization", "seg_list": [{"text": s.text, "score": s.score_with_o, "reason": s.reason, "type": "seg"} for s in seg_list], "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list_1], "q_list_1": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "type": "query"} for q in q_list], "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o, "type": "seed"} for s in seed_list] }) # 收集所有搜索结果 all_search_list = [] # 迭代 round_num = 1 while q_list and round_num <= max_rounds: q_list, seed_list, search_list = await run_round( round_num=round_num, q_list=q_list, word_list_1=word_list_1, # 传递固定词库 seed_list=seed_list, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, sug_threshold=sug_threshold, enable_evaluation=enable_evaluation ) all_search_list.extend(search_list) round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 总轮数: {round_num - 1}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") print(f"{'='*60}") return all_search_list # ============================================================================ # v121 新架构核心流程函数 # ============================================================================ async def initialize_v2(o: str, context: RunContext) -> list[Segment]: """ v121 Round 0 初始化阶段 流程: 1. 语义分段: 调用 semantic_segmenter 将原始问题拆分成语义片段 2. 拆词: 对每个segment调用 word_segmenter 进行拆词 3. 评估: 对每个segment和词进行评估 4. 不进行组合(Round 0只分段和拆词) Returns: 语义片段列表 (Segment) """ print(f"\n{'='*60}") print(f"Round 0: 初始化阶段(语义分段 + 拆词)") print(f"{'='*60}") # 1. 语义分段 print(f"\n[步骤1] 语义分段...") result = await Runner.run(semantic_segmenter, o) segmentation: SemanticSegmentation = result.final_output print(f"语义分段结果: {len(segmentation.segments)} 个片段") print(f"整体分段思路: {segmentation.overall_reasoning}") segment_list = [] for seg_item in segmentation.segments: segment = Segment( text=seg_item.segment_text, type=seg_item.segment_type, from_o=o ) segment_list.append(segment) print(f" - [{segment.type}] {segment.text}") # 2. 对每个segment拆词并评估 print(f"\n[步骤2] 对每个segment拆词并评估...") MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) async def process_segment(segment: Segment) -> Segment: """处理单个segment: 拆词 + 评估segment + 评估词""" async with semaphore: # 2.1 拆词 word_result = await Runner.run(word_segmenter, segment.text) word_segmentation: WordSegmentation = word_result.final_output segment.words = word_segmentation.words # 2.2 评估segment与原始问题的相关度(使用Round 0专用评估) segment.score_with_o, segment.reason = await evaluate_with_o_round0( segment.text, o, context.evaluation_cache ) # 2.3 评估每个词与原始问题的相关度(使用Round 0专用评估) word_eval_tasks = [] for word in segment.words: async def eval_word(w: str) -> tuple[str, float, str]: score, reason = await evaluate_with_o_round0(w, o, context.evaluation_cache) return w, score, reason word_eval_tasks.append(eval_word(word)) word_results = await asyncio.gather(*word_eval_tasks) for word, score, reason in word_results: segment.word_scores[word] = score segment.word_reasons[word] = reason return segment if segment_list: print(f" 开始处理 {len(segment_list)} 个segment(并发限制: {MAX_CONCURRENT_EVALUATIONS})...") process_tasks = [process_segment(seg) for seg in segment_list] await asyncio.gather(*process_tasks) # 打印步骤1结果 print(f"\n[步骤1: 分段及拆词 结果]") for segment in segment_list: print(f" [{segment.type}] {segment.text} (分数: {segment.score_with_o:.2f})") print(f" 拆词: {segment.words}") for word in segment.words: score = segment.word_scores.get(word, 0.0) print(f" - {word}: {score:.2f}") # 保存到context(保留旧格式以兼容) context.segments = [ { "text": seg.text, "type": seg.type, "score": seg.score_with_o, "reason": seg.reason, "words": seg.words, "word_scores": seg.word_scores, "word_reasons": seg.word_reasons } for seg in segment_list ] # 保存 Round 0 到 context.rounds(新格式用于可视化) context.rounds.append({ "round_num": 0, "type": "initialization", "segments": [ { "text": seg.text, "type": seg.type, "domain_index": idx, "score": seg.score_with_o, "reason": seg.reason, "words": [ { "text": word, "score": seg.word_scores.get(word, 0.0), "reason": seg.word_reasons.get(word, "") } for word in seg.words ] } for idx, seg in enumerate(segment_list) ] }) # 🆕 存储Round 0的所有word得分到历史记录 print(f"\n[存储Round 0词得分到历史记录]") for segment in segment_list: for word, score in segment.word_scores.items(): context.word_score_history[word] = score print(f" {word}: {score:.2f}") print(f"\n[Round 0 完成]") print(f" 分段数: {len(segment_list)}") total_words = sum(len(seg.words) for seg in segment_list) print(f" 总词数: {total_words}") return segment_list async def run_round_v2( round_num: int, query_input: list[Q], segments: list[Segment], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, xiaohongshu_detail: XiaohongshuDetail, sug_threshold: float = 0.7, enable_evaluation: bool = False ) -> tuple[list[Q], list[Search], dict]: """ v121 Round N 执行 正确的流程顺序: 1. 为 query_input 请求SUG 2. 评估SUG 3. 高分SUG搜索(含多模态提取) 4. N域组合(从segments生成) 5. 评估组合 6. 生成 q_list_next(组合 + 高分SUG) Args: round_num: 轮次编号 (1-4) query_input: 本轮的输入query列表(Round 1是words,Round 2+是上轮输出) segments: 语义片段列表(用于组合) o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: SUG搜索阈值 Returns: (q_list_next, search_list, extraction_results) """ print(f"\n{'='*60}") print(f"Round {round_num}: {round_num}域组合") print(f"{'='*60}") round_data = { "round_num": round_num, "n_domains": round_num, "input_query_count": len(query_input) } MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) # 步骤1: 为 query_input 请求SUG print(f"\n[步骤1] 为{len(query_input)}个输入query请求SUG...") all_sugs = [] sug_details = {} for q in query_input: suggestions = get_suggestions_with_cache(q.text, xiaohongshu_api) if suggestions: print(f" {q.text}: 获取到 {len(suggestions)} 个SUG") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) all_sugs.append(sug) else: print(f" {q.text}: 未获取到SUG") print(f" 共获取 {len(all_sugs)} 个SUG") # 步骤2: 评估SUG if len(all_sugs) > 0: print(f"\n[步骤2] 评估{len(all_sugs)}个SUG...") async def evaluate_sug(sug: Sug) -> Sug: async with semaphore: sug.score_with_o, sug.reason = await evaluate_with_o( sug.text, o, context.evaluation_cache ) return sug eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 打印结果 for sug in all_sugs: print(f" {sug.text}: {sug.score_with_o:.2f}") if sug.from_q: if sug.from_q.text not in sug_details: sug_details[sug.from_q.text] = [] sug_details[sug.from_q.text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 步骤3: 搜索高分SUG print(f"\n[步骤3] 搜索高分SUG(阈值 > {sug_threshold})...") high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] print(f" 找到 {len(high_score_sugs)} 个高分SUG") search_list = [] # extraction_results = {} # 内容提取流程已断开 if len(high_score_sugs) > 0: async def search_for_sug(sug: Sug) -> Search: """返回Search结果""" print(f" 搜索: {sug.text}") # post_extractions = {} # 内容提取流程已断开 try: search_result = xiaohongshu_search.search(keyword=sug.text) # xiaohongshu_search.search() 已经返回解析后的数据 notes = search_result.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: try: post = process_note_data(note) # # 🆕 多模态提取(搜索后立即处理) - 内容提取流程已断开 # if post.type == "normal" and len(post.images) > 0: # extraction = await extract_post_images(post) # if extraction: # post_extractions[post.note_id] = extraction post_list.append(post) except Exception as e: print(f" ⚠️ 解析帖子失败 {note.get('id', 'unknown')}: {str(e)[:50]}") # 补充详情信息(仅视频类型需要补充视频URL) video_posts = [p for p in post_list if p.type == "video"] if video_posts: print(f" 补充详情({len(video_posts)}个视频)...") for post in video_posts: try: detail_response = xiaohongshu_detail.get_detail(post.note_id) enrich_post_with_detail(post, detail_response) except Exception as e: print(f" ⚠️ 详情补充失败 {post.note_id}: {str(e)[:50]}") print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) # , post_extractions # 内容提取流程已断开 except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) # , {} # 内容提取流程已断开 search_tasks = [search_for_sug(sug) for sug in high_score_sugs] results = await asyncio.gather(*search_tasks) # 收集搜索结果 for search in results: search_list.append(search) # extraction_results.update(extractions) # 内容提取流程已断开 # 评估搜索结果中的帖子 if enable_evaluation: print(f"\n[评估] 评估搜索结果中的帖子...") for search in search_list: if search.post_list: print(f" 评估来自 '{search.text}' 的 {len(search.post_list)} 个帖子") # 对每个帖子进行评估 (V3) for post in search.post_list: knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level = await evaluate_post_v3(post, o, semaphore=None) if knowledge_eval: apply_evaluation_v3_to_post(post, knowledge_eval, content_eval, purpose_eval, category_eval, final_score, match_level) else: print(f"\n[评估] 实时评估已关闭 (使用 --enable-evaluation 启用)") # 步骤4: 生成N域组合 print(f"\n[步骤4] 生成{round_num}域组合...") domain_combinations = generate_domain_combinations(segments, round_num) print(f" 生成了 {len(domain_combinations)} 个组合") if len(domain_combinations) == 0: print(f" 无法生成{round_num}域组合") # 即使无法组合,也返回高分SUG作为下轮输入 q_list_next = [] for sug in all_sugs: if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug", type_label="" ) q_list_next.append(q) round_data.update({ "domain_combinations_count": 0, "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "sug_details": sug_details, "q_list_next_size": len(q_list_next) }) context.rounds.append(round_data) return q_list_next, search_list # 步骤5: 评估所有组合 print(f"\n[步骤5] 评估{len(domain_combinations)}个组合...") async def evaluate_combination(comb: DomainCombination) -> DomainCombination: async with semaphore: # 🆕 根据轮次选择评估逻辑 if round_num == 1: # Round 1: 域内评估(新逻辑) comb.score_with_o, comb.reason = await evaluate_domain_combination_round1( comb, segments, context ) else: # Round 2+: 域间评估(新逻辑) comb.score_with_o, comb.reason = await evaluate_domain_combination_round2plus( comb, segments, context ) # 🆕 存储组合得分到历史记录 context.word_score_history[comb.text] = comb.score_with_o return comb eval_tasks = [evaluate_combination(comb) for comb in domain_combinations] await asyncio.gather(*eval_tasks) # 排序 - 已注释,保持原始顺序 # domain_combinations.sort(key=lambda x: x.score_with_o, reverse=True) # 打印所有组合(保持原始顺序) evaluation_strategy = 'Round 1 域内评估(品类×域得分)' if round_num == 1 else 'Round 2+ 域间评估(加权系数调整)' print(f" 评估完成,共{len(domain_combinations)}个组合 [策略: {evaluation_strategy}]") for i, comb in enumerate(domain_combinations, 1): print(f" {i}. {comb.text} {comb.type_label} (分数: {comb.score_with_o:.2f})") # 为每个组合补充来源词分数信息,并判断是否超过所有来源词得分 for comb in domain_combinations: word_details = [] flat_scores: list[float] = [] for domain_index, words in zip(comb.domains, comb.source_words): segment = segments[domain_index] if 0 <= domain_index < len(segments) else None segment_type = segment.type if segment else "" segment_text = segment.text if segment else "" items = [] for word in words: score = 0.0 if segment and word in segment.word_scores: score = segment.word_scores[word] items.append({ "text": word, "score": score }) flat_scores.append(score) word_details.append({ "domain_index": domain_index, "segment_type": segment_type, "segment_text": segment_text, "words": items }) comb.source_word_details = word_details comb.source_scores = flat_scores comb.max_source_score = max(flat_scores) if flat_scores else None comb.is_above_source_scores = bool(flat_scores) and all( comb.score_with_o > score for score in flat_scores ) # 步骤6: 构建 q_list_next(组合 + 高分SUG) print(f"\n[步骤6] 生成下轮输入...") q_list_next: list[Q] = [] # 6.1 添加高增益SUG(满足增益条件),并按分数排序 sug_candidates: list[tuple[Q, Sug]] = [] for sug in all_sugs: if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug", type_label="" ) sug_candidates.append((q, sug)) sug_candidates.sort(key=lambda item: item[0].score_with_o, reverse=True) q_list_next.extend([item[0] for item in sug_candidates]) high_gain_sugs = [item[1] for item in sug_candidates] print(f" 添加 {len(high_gain_sugs)} 个高增益SUG(增益 ≥ {REQUIRED_SCORE_GAIN:.2f})") # 6.2 添加高分组合(需超过所有来源词得分),并按分数排序 combination_candidates: list[tuple[Q, DomainCombination]] = [] for comb in domain_combinations: if comb.is_above_source_scores and comb.score_with_o > 0: domains_str = ','.join([f'D{d}' for d in comb.domains]) if comb.domains else '' q = Q( text=comb.text, score_with_o=comb.score_with_o, reason=comb.reason, from_source="domain_comb", type_label=comb.type_label, domain_type=domains_str # 添加域信息 ) combination_candidates.append((q, comb)) combination_candidates.sort(key=lambda item: item[0].score_with_o, reverse=True) q_list_next.extend([item[0] for item in combination_candidates]) high_score_combinations = [item[1] for item in combination_candidates] print(f" 添加 {len(high_score_combinations)} 个高分组合(组合得分 > 所有来源词)") # 保存round数据(包含完整帖子信息) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [post.model_dump() for post in search.post_list] }) round_data.update({ "input_queries": [{"text": q.text, "score": q.score_with_o, "from_source": q.from_source, "type": "input", "domain_index": q.domain_index, "domain_type": q.domain_type} for q in query_input], "domain_combinations_count": len(domain_combinations), "domain_combinations": [ { "text": comb.text, "type_label": comb.type_label, "score": comb.score_with_o, "reason": comb.reason, "domains": comb.domains, "source_words": comb.source_words, "from_segments": comb.from_segments, "source_word_details": comb.source_word_details, "source_scores": comb.source_scores, "is_above_source_scores": comb.is_above_source_scores, "max_source_score": comb.max_source_score } for comb in domain_combinations ], "high_score_combinations": [ { "text": item[0].text, "score": item[0].score_with_o, "type_label": item[0].type_label, "type": "combination", "is_above_source_scores": item[1].is_above_source_scores } for item in combination_candidates ], "sug_count": len(all_sugs), "sug_details": sug_details, "high_score_sug_count": len(high_score_sugs), "high_gain_sugs": [{"text": q.text, "score": q.score_with_o, "type": "sug"} for q in q_list_next if q.from_source == "sug"], "search_count": len(search_list), "search_results": search_results_data, "q_list_next_size": len(q_list_next), "q_list_next_sections": { "sugs": [ { "text": item[0].text, "score": item[0].score_with_o, "from_source": "sug" } for item in sug_candidates ], "domain_combinations": [ { "text": item[0].text, "score": item[0].score_with_o, "from_source": "domain_comb", "is_above_source_scores": item[1].is_above_source_scores } for item in combination_candidates ] } }) context.rounds.append(round_data) print(f"\nRound {round_num} 总结:") print(f" 输入Query数: {len(query_input)}") print(f" 域组合数: {len(domain_combinations)}") print(f" 高分组合: {len(high_score_combinations)}") print(f" SUG数: {len(all_sugs)}") print(f" 高分SUG数: {len(high_score_sugs)}") print(f" 高增益SUG: {len(high_gain_sugs)}") print(f" 搜索数: {len(search_list)}") # print(f" 提取帖子数: {len(extraction_results)}") # 内容提取流程已断开 print(f" 下轮Query数: {len(q_list_next)}") return q_list_next, search_list # 不再返回提取结果 async def iterative_loop_v2( context: RunContext, max_rounds: int = 4, sug_threshold: float = 0.7, enable_evaluation: bool = False ): """v121 主迭代循环""" print(f"\n{'='*60}") print(f"开始v121迭代循环(语义分段跨域组词版)") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # Round 0: 初始化(语义分段 + 拆词) segments = await initialize_v2(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() xiaohongshu_detail = XiaohongshuDetail() # 详情API客户端 # 收集所有搜索结果 all_search_list = [] # all_extraction_results = {} # 内容提取流程已断开 # 准备 Round 1 的输入:从 segments 提取所有 words query_input = extract_words_from_segments(segments) print(f"\n提取了 {len(query_input)} 个词作为 Round 1 的输入") # Round 1-N: 迭代循环 num_segments = len(segments) actual_max_rounds = min(max_rounds, num_segments) round_num = 1 while query_input and round_num <= actual_max_rounds: query_input, search_list = await run_round_v2( # 不再接收提取结果 round_num=round_num, query_input=query_input, # 传递上一轮的输出 segments=segments, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, xiaohongshu_detail=xiaohongshu_detail, sug_threshold=sug_threshold, enable_evaluation=enable_evaluation ) all_search_list.extend(search_list) # all_extraction_results.update(extraction_results) # 内容提取流程已断开 # 如果没有新的query,提前结束 if not query_input: print(f"\n第{round_num}轮后无新query生成,提前结束迭代") break round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 实际轮数: {round_num}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") # print(f" 提取帖子数: {len(all_extraction_results)}") # 内容提取流程已断开 print(f"{'='*60}") return all_search_list # 不再返回提取结果 # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False, enable_evaluation: bool = False): """主函数""" current_time, log_url = set_trace() # 读取输入 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') c = read_file_as_string(input_context_file) # 原始需求 o = read_file_as_string(input_q_file) # 原始问题 # 版本信息 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) # 创建运行上下文 run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, c=c, o=o, log_dir=log_dir, log_url=log_url, ) # 创建日志目录 os.makedirs(run_context.log_dir, exist_ok=True) # 配置日志文件 log_file_path = os.path.join(run_context.log_dir, "run.log") log_file = open(log_file_path, 'w', encoding='utf-8') # 重定向stdout到TeeLogger(同时输出到控制台和文件) original_stdout = sys.stdout sys.stdout = TeeLogger(original_stdout, log_file) try: print(f"📝 日志文件: {log_file_path}") print(f"{'='*60}\n") # 执行迭代 (v121: 使用新架构) all_search_list = await iterative_loop_v2( # 不再接收提取结果 run_context, max_rounds=max_rounds, sug_threshold=sug_threshold, enable_evaluation=enable_evaluation ) # 格式化输出 output = f"原始需求:{run_context.c}\n" output += f"原始问题:{run_context.o}\n" output += f"总搜索次数:{len(all_search_list)}\n" output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n" # output += f"提取帖子数:{len(all_extraction_results)}\n" # 内容提取流程已断开 output += "\n" + "="*60 + "\n" if all_search_list: output += "【搜索结果】\n\n" for idx, search in enumerate(all_search_list, 1): output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n" output += f" 帖子数: {len(search.post_list)}\n" if search.post_list: for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个 output += f" {post_idx}) {post.title}\n" output += f" URL: {post.note_url}\n" output += "\n" else: output += "未找到搜索结果\n" run_context.final_output = output print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(output) # 保存上下文文件 context_file_path = os.path.join(run_context.log_dir, "run_context.json") context_dict = run_context.model_dump() with open(context_file_path, "w", encoding="utf-8") as f: json.dump(context_dict, f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") # 保存详细的搜索结果 search_results_path = os.path.join(run_context.log_dir, "search_results.json") search_results_data = [s.model_dump() for s in all_search_list] with open(search_results_path, "w", encoding="utf-8") as f: json.dump(search_results_data, f, ensure_ascii=False, indent=2) print(f"Search results saved to: {search_results_path}") # # 🆕 保存图片提取结果 - 内容提取流程已断开 # if all_extraction_results: # extraction_path = os.path.join(run_context.log_dir, "search_extract.json") # extraction_data = { # note_id: extraction.model_dump() # for note_id, extraction in all_extraction_results.items() # } # with open(extraction_path, "w", encoding="utf-8") as f: # json.dump(extraction_data, f, ensure_ascii=False, indent=2) # print(f"Image extractions saved to: {extraction_path}") # print(f" 提取了 {len(all_extraction_results)} 个帖子的图片内容") # 可视化 if visualize: import subprocess output_html = os.path.join(run_context.log_dir, "visualization.html") print(f"\n🎨 生成可视化HTML...") # 获取绝对路径 abs_context_file = os.path.abspath(context_file_path) abs_output_html = os.path.abspath(output_html) # 运行可视化脚本 result = subprocess.run([ "node", "visualization/knowledge_search_traverse/index.js", abs_context_file, abs_output_html ]) if result.returncode == 0: print(f"✅ 可视化已生成: {output_html}") else: print(f"❌ 可视化生成失败") finally: # 恢复stdout sys.stdout = original_stdout log_file.close() print(f"\n📝 运行日志已保存: {log_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.121 语义分段跨域组词版") parser.add_argument( "--input-dir", type=str, default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?", help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?" ) parser.add_argument( "--max-rounds", type=int, default=4, help="最大轮数,默认: 4" ) parser.add_argument( "--sug-threshold", type=float, default=0.7, help="suggestion阈值,默认: 0.7" ) parser.add_argument( "--visualize", action="store_true", default=True, help="运行完成后自动生成可视化HTML" ) parser.add_argument( "--enable-evaluation", action="store_true", default=False, help="是否启用实时评估功能,默认: 关闭" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize, enable_evaluation=args.enable_evaluation))