import asyncio import json import os import sys import argparse from datetime import datetime from typing import Literal from agents import Agent, Runner, ModelSettings from lib.my_trace import set_trace from pydantic import BaseModel, Field from lib.utils import read_file_as_string from lib.client import get_model MODEL_NAME = "google/gemini-2.5-flash" # 得分提升阈值:sug或组合词必须比来源query提升至少此幅度才能进入下一轮 REQUIRED_SCORE_GAIN = 0.05 from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from script.search.xiaohongshu_search import XiaohongshuSearch # ============================================================================ # 日志工具类 # ============================================================================ class TeeLogger: """同时输出到控制台和日志文件的工具类""" def __init__(self, stdout, log_file): self.stdout = stdout self.log_file = log_file def write(self, message): self.stdout.write(message) self.log_file.write(message) self.log_file.flush() # 实时写入,避免丢失日志 def flush(self): self.stdout.flush() self.log_file.flush() # ============================================================================ # 数据模型 # ============================================================================ class Seg(BaseModel): """分词(旧版)- v120使用""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 # ============================================================================ # 新架构数据模型 (v121) # ============================================================================ class Segment(BaseModel): """语义片段(Round 0语义分段结果)""" text: str # 片段文本 type: str # 语义类型: 疑问标记/核心动作/修饰短语/中心名词/逻辑连接 score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 words: list[str] = Field(default_factory=list) # 该片段拆分出的词列表(Round 0拆词结果) word_scores: dict[str, float] = Field(default_factory=dict) # 词的评分 {word: score} word_reasons: dict[str, str] = Field(default_factory=dict) # 词的评分理由 {word: reason} class DomainCombination(BaseModel): """域组合(Round N的N域组合结果)""" text: str # 组合后的文本 domains: list[int] = Field(default_factory=list) # 参与组合的域索引列表(对应segments的索引) type_label: str = "" # 类型标签,如 [疑问标记+核心动作+中心名词] source_words: list[list[str]] = Field(default_factory=list) # 来源词列表,每个元素是一个域的词列表,如 [["猫咪"], ["梗图"]] score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_segments: list[str] = Field(default_factory=list) # 来源segment的文本列表 # ============================================================================ # 旧架构数据模型(保留但不使用) # ============================================================================ # class Word(BaseModel): # """词(旧版)- v120使用,v121不再使用""" # text: str # score_with_o: float = 0.0 # 与原始问题的评分 # from_o: str = "" # 原始问题 class Word(BaseModel): """词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_o: str = "" # 原始问题 class QFromQ(BaseModel): """Q来源信息(用于Sug中记录)""" text: str score_with_o: float = 0.0 class Q(BaseModel): """查询""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_source: str = "" # v120: seg/sug/add; v121新增: segment/domain_comb/sug type_label: str = "" # v121新增:域类型标签(仅用于domain_comb来源) domain_index: int = -1 # v121新增:域索引(word来源时有效,-1表示无域) domain_type: str = "" # v121新增:域类型(word来源时表示所属segment的type,如"中心名词") class Sug(BaseModel): """建议词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_q: QFromQ | None = None # 来自的q class Seed(BaseModel): """种子(旧版)- v120使用,v121不再使用""" text: str added_words: list[str] = Field(default_factory=list) # 已经增加的words from_type: str = "" # seg/sug/add score_with_o: float = 0.0 # 与原始问题的评分 class Post(BaseModel): """帖子""" title: str = "" body_text: str = "" type: str = "normal" # video/normal images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面 video: str = "" # 视频url interact_info: dict = Field(default_factory=dict) # 互动信息 note_id: str = "" note_url: str = "" class Search(Sug): """搜索结果(继承Sug)""" post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表 class RunContext(BaseModel): """运行上下文""" version: str input_files: dict[str, str] c: str # 原始需求 o: str # 原始问题 log_url: str log_dir: str # v121新增:语义分段结果 segments: list[dict] = Field(default_factory=list) # Round 0的语义分段结果 # 每轮的数据 rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据 # 最终结果 final_output: str | None = None # 评估缓存:避免重复评估相同文本 evaluation_cache: dict[str, tuple[float, str]] = Field(default_factory=dict) # key: 文本, value: (score, reason) # ============================================================================ # Agent 定义 # ============================================================================ # ============================================================================ # v121 新增 Agent # ============================================================================ # Agent: 语义分段专家 (Prompt1) class SemanticSegment(BaseModel): """单个语义片段""" segment_text: str = Field(..., description="片段文本") segment_type: str = Field(..., description="语义类型(疑问标记/核心动作/修饰短语/中心名词/逻辑连接)") reasoning: str = Field(..., description="分段理由") class SemanticSegmentation(BaseModel): """语义分段结果""" segments: list[SemanticSegment] = Field(..., description="语义片段列表") overall_reasoning: str = Field(..., description="整体分段思路") semantic_segmentation_instructions = """ 你是语义分段专家。给定一个搜索query,将其拆分成不同语义类型的片段。 ## 语义类型定义 1. 疑问引导:引导查询意图的元素,如疑问词(原理:表示意图类型,如过程求解或信息查询)。 2. 核心动作:核心动作或关系谓词,如动词(原理:谓词是语义框架的核心,定义动作或状态)。 3. 目标对象:动作的目标或实体中心对象,如名词短语(承载谓词的作用对象助词)。 4. 修饰限定:对目标对象的修饰和限定、对核心动作的限定。 ## 分段原则:严格遵守以下规则 1. **语义完整性**:每个片段应该是一个完整的语义单元 2. **类型互斥**:每个片段只能属于一种类型 3. **保留原文**:片段文本必须保留原query中的字符,不得改写 4. **顺序保持**:片段顺序应与原query一致 5. **修饰限定合并规则** - 定义:在同一个"目标对象"之前的所有"修饰限定"片段,如果它们之间没有插入"疑问引导"、"核心动作"或"目标对象",就必须合并为一个片段 - 判断标准: * 步骤1:找到"目标对象"在哪里 * 步骤2:向前查看,把所有修饰和限定这个目标对象的词都合并,修辞和限定词包括数量词、地域词、时间词、描述词、程度词、方式词、助词等 ## 输出要求 - segments: 片段列表 - segment_text: 片段文本(必须来自原query) - segment_type: 语义类型 - reasoning: 为什么这样分段 - overall_reasoning: 整体分段思路 ## JSON输出规范 1. **格式要求**:必须输出标准JSON格式 2. **引号规范**:字符串中如需表达引用,使用书名号《》或「」,不要使用英文引号或中文引号"" """.strip() semantic_segmenter = Agent[None]( name="语义分段专家", instructions=semantic_segmentation_instructions, model=get_model(MODEL_NAME), output_type=SemanticSegmentation, ) # ============================================================================ # v120 保留 Agent # ============================================================================ # Agent 1: 分词专家(v121用于Round 0拆词) class WordSegmentation(BaseModel): """分词结果""" words: list[str] = Field(..., description="分词结果列表") reasoning: str = Field(..., description="分词理由") word_segmentation_instructions = """ 你是分词专家。给定一个query,将其拆分成有意义的最小单元。 ## 分词原则 1. 保留有搜索意义的词汇 2. 拆分成独立的概念 3. 保留专业术语的完整性 4. 去除虚词(的、吗、呢等),但保留疑问词(如何、为什么、怎样等) ## 输出要求 返回分词列表和分词理由。 """.strip() word_segmenter = Agent[None]( name="分词专家", instructions=word_segmentation_instructions, model=get_model(MODEL_NAME), output_type=WordSegmentation, ) # Agent 2: 动机维度评估专家 + 品类维度评估专家(两阶段评估) # 动机评估的嵌套模型 class CoreMotivationExtraction(BaseModel): """核心动机提取""" 简要说明核心动机: str = Field(..., description="核心动机说明") class MotivationEvaluation(BaseModel): """动机维度评估""" 原始问题核心动机提取: CoreMotivationExtraction = Field(..., description="原始问题核心动机提取") 动机维度得分: float = Field(..., description="动机维度得分 -1~1") 简要说明动机维度相关度理由: str = Field(..., description="动机维度相关度理由") class CategoryEvaluation(BaseModel): """品类维度评估""" 品类维度得分: float = Field(..., description="品类维度得分 -1~1") 简要说明品类维度相关度理由: str = Field(..., description="品类维度相关度理由") # 动机评估 prompt - 第一轮版本(来自 sug_v6_1_2_115.py) motivation_evaluation_instructions_round1 = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **动机维度** 进行: ### 1. 动机维度 **定义:** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 --- ## 如何识别原始问题的核心动机 **核心动机必须是动词**,识别方法如下: ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 示例: 例: "川西秋天风光摄影" → 隐含动作="拍摄" → 需结合上下文判断 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <平台sug词条> ↓ 【动机维度相关性判定】 ├→ 步骤1: 评估与<原始问题>的需求动机匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度1: 动机维度评估 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性 评分标准: 【正向匹配】 +0.95~1.0: 核心动作完全一致 - 例: 原始问题"如何获取素材" vs sug词"素材获取方法" - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 · 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致) +0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 +0.5~0.75: 核心动作相关但非直接对应(相关实现路径) - 例: 原始问题"如何获取素材" vs sug词"素材管理整理" +0.2~0.45: 核心动作弱相关(同领域不同动作) - 例: 原始问题"如何拍摄风光" vs sug词"风光摄影欣赏" 【中性/无关】 0: 没有明确目的,动作意图无明确关联 - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 - 如果原始问题无法识别动机,则动机维度得分为0。 【负向偏离】 -0.2~-0.05: 动作意图轻度冲突或误导 - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知" -0.5~-0.25: 动作意图明显对立 - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材" -1.0~-0.55: 动作意图完全相反或产生严重负面引导 - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由" } **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 #注意事项: 始终围绕动机维度:所有评估都基于"动机"维度,不偏离 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当sug词条对原始问题动机产生误导、冲突或有害引导时给予负分 零分使用原则:当sug词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。 """.strip() # 动机评估 prompt - 后续轮次版本(当前 116 版本) motivation_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 动机评估核心原则(必读) ### 动机 = 动作 + 对象 + 场景 评估时必须同时考虑三要素,不能只看动词: - **动作**:制定、规划、获取、拍摄等 - **对象**:旅行行程 vs 每日计划、风光照片 vs 证件照 - **场景**:旅游 vs 日常、摄影 vs 办公 ### 关键判断:动词相同 ≠ 动机匹配 错误:只看动词相同就给高分 - "制定旅行行程" vs "制定每日计划" → 给0.95 错误 - "拍摄风光" vs "拍摄证件照" → 给0.95 错误 正确:检查对象和场景是否匹配 - 对象不同领域 → 降至0.3左右 - 场景不同 → 降至0.3左右 # 核心概念与方法论 ## 评估维度 本评估系统围绕 **动机维度** 进行: # 维度独立性警告 【严格约束】本评估**只评估动机维度**: **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词 ### 1. 动机维度 **定义:** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 --- 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <平台sug词条> ↓ 【动机维度相关性判定】 ├→ 步骤1: 评估与<原始问题>的需求动机匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度1: 动机维度评估 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性 评分标准: 【正向匹配】 +0.95~1.0: 动作+对象+场景完全一致 - 要求:动词、对象、场景都必须匹配,不能只看动词 - "制定旅行行程" vs "制定每日计划" 虽然动词相同,但对象和场景完全不同,不属于高分 - 特殊规则: 如果sug词的核心动作是原始问题动作在动作+对象+场景一致下的**具体化子集**,也判定为完全一致 +0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 +0.5~0.75: 核心动作相关但非直接对应(相关实现路径) - 例: 原始问题"如何获取素材" vs sug词"素材管理整理" +0.25~0.4: 动词相同但对象或场景明显不同(弱相关) - 判断要点:动词一致,但对象不同领域或场景不同 - 关键:不要因为动词相同就给0.95,必须检查对象! 【中性/无关】 0: 没有明确目的,动作意图无明确关联 - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 - 如果原始问题无法识别动机,则动机维度得分为0 特别注意 - 禁止的错误理由: - 禁止: "虽然没有动作,但主题相关,所以给0.2" - 禁止:"内容有参考价值,所以给0.15" - 禁止: "都提到了XX(名词),所以不是完全无关" - 正确理由:"sug词条无动作意图,与原始问题的'XX'动机完全无关" 【负向偏离】 -0.2~-0.05: 动作意图轻度冲突或误导 - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知" -0.5~-0.25: 动作意图明显对立 - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材" -1.0~-0.55: 动作意图完全相反或产生严重负面引导 - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由" } **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 #注意事项: 始终围绕动机维度:所有评估都基于"动机"维度,不偏离 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当sug词条对原始问题动机产生误导、冲突或有害引导时给予负分 零分使用原则:当sug词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。 """.strip() # 品类评估 prompt category_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **品类维度** 进行: # 维度独立性警告 【严格约束】本评估**只评估品类维度**,,必须遵守以下规则: 1. **只看名词和限定词**:评估时只考虑主体、限定词的匹配度 2. **完全忽略动词**:动作意图、目的等动机信息对本维度评分无影响 ### 品类维度 **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 ## ⚠️ 品类评估核心原则(必读) ### 原则1:只看词条表面,禁止联想推演 - 只能基于sug词实际包含的词汇评分 - 禁止推测"可能包含"、"可以理解为" **错误示例:** 原始问题:"川西旅行行程" vs sug词:"每日计划" - 错误 "每日计划可以包含旅行规划,所以有关联" → 这是不允许的联想 - 正确: "sug词只有'每日计划',无'旅行'字眼,品类不匹配" → 正确判断 ### 原则2:通用概念 ≠ 特定概念 - **通用**:计划、方法、技巧、素材(无领域限定) - **特定**:旅行行程、摄影技巧、烘焙方法(有明确领域) IF sug词是通用 且 原始问题是特定: → 品类不匹配 → 评分0.05~0.1 关键:通用概念不等于特定概念,不能因为"抽象上都是规划"就给分 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <平台sug词条> ↓ 【品类维度相关性判定】 ├→ 步骤1: 评估与<原始问题>的内容主体和限定词匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度2: 品类维度评估 评估对象: <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位 - 特别注意"语义身份"差异,主体词出现但上下文语义不同 - 例: · "猫咪的XX行为"(猫咪是行为者) · vs "用猫咪表达XX的梗图"(猫咪是媒介) · 虽都含"猫咪+XX",但语义角色不同 +0.2~0.3: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.2: 主体词过度泛化或仅抽象相似 - 例: sug词是通用概念,原始问题是特定概念 sug词"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定) → 评分:0.08 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该sug词条与原始问题品类匹配程度的理由" } --- **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明品类维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- #注意事项: 始终围绕品类维度:所有评估都基于"品类"维度,不偏离 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当sug词条对原始问题品类产生误导、冲突或有害引导时给予负分 零分使用原则:当sug词条与原始问题品类无明确关联,既不相关也不冲突时给予零分 """.strip() # 创建评估 Agent motivation_evaluator = Agent[None]( name="动机维度评估专家(后续轮次)", instructions=motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation) category_evaluator = Agent[None]( name="品类维度评估专家", instructions=category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation ) # ============================================================================ # v120 保留但不使用的 Agent(v121不再使用) # ============================================================================ # # Agent 3: 加词选择专家(旧版 - v120使用,v121不再使用) # class WordCombination(BaseModel): # """单个词组合""" # selected_word: str = Field(..., description="选择的词") # combined_query: str = Field(..., description="组合后的新query") # reasoning: str = Field(..., description="选择理由") # class WordSelectionTop5(BaseModel): # """加词选择结果(Top 5)""" # combinations: list[WordCombination] = Field( # ..., # description="选择的Top 5组合(不足5个则返回所有)", # min_items=1, # max_items=5 # ) # overall_reasoning: str = Field(..., description="整体选择思路") # word_selection_instructions 已删除 (v121不再使用) # word_selector = Agent[None]( # name="加词组合专家", # instructions=word_selection_instructions, # model=get_model(MODEL_NAME), # output_type=WordSelectionTop5, # model_settings=ModelSettings(temperature=0.2), # ) # ============================================================================ # 辅助函数 # ============================================================================ # ============================================================================ # v121 新增辅助函数 # ============================================================================ def get_ordered_subsets(words: list[str], min_len: int = 1) -> list[list[str]]: """ 生成words的所有有序子集(可跳过但不可重排) 使用 itertools.combinations 生成索引组合,保持原始顺序 Args: words: 词列表 min_len: 子集最小长度 Returns: 所有可能的有序子集列表 Example: words = ["川西", "秋季", "风光"] 结果: - 长度1: ["川西"], ["秋季"], ["风光"] - 长度2: ["川西", "秋季"], ["川西", "风光"], ["秋季", "风光"] - 长度3: ["川西", "秋季", "风光"] 共 C(3,1) + C(3,2) + C(3,3) = 3 + 3 + 1 = 7种 """ from itertools import combinations subsets = [] n = len(words) # 遍历所有可能的长度(从min_len到n) for r in range(min_len, n + 1): # 生成长度为r的所有索引组合 for indices in combinations(range(n), r): # 按照原始顺序提取词 subset = [words[i] for i in indices] subsets.append(subset) return subsets def generate_domain_combinations(segments: list[Segment], n_domains: int) -> list[DomainCombination]: """ 生成N域组合 步骤: 1. 从len(segments)个域中选择n_domains个域(组合,保持顺序) 2. 对每个选中的域,生成其words的所有有序子集 3. 计算笛卡尔积,生成所有可能的组合 Args: segments: 语义片段列表 n_domains: 参与组合的域数量 Returns: 所有可能的N域组合列表 Example: 有4个域: [疑问标记, 核心动作, 修饰短语, 中心名词] n_domains=2时,选择域的方式: C(4,2) = 6种 假设选中[核心动作, 中心名词]: - 核心动作的words: ["获取"], 子集: ["获取"] - 中心名词的words: ["风光", "摄影", "素材"], 子集: 7种 则该域选择下的组合数: 1 * 7 = 7种 """ from itertools import combinations, product all_combinations = [] n = len(segments) # 检查参数有效性 if n_domains > n or n_domains < 1: return [] # 1. 选择n_domains个域(保持原始顺序) for domain_indices in combinations(range(n), n_domains): selected_segments = [segments[i] for i in domain_indices] # 新增:如果所有域都只有1个词,跳过(单段落单词不组合) if all(len(seg.words) == 1 for seg in selected_segments): continue # 2. 为每个选中的域生成其words的所有有序子集 domain_subsets = [] for seg in selected_segments: if len(seg.words) == 0: # 如果某个域没有词,跳过该域组合 domain_subsets = [] break subsets = get_ordered_subsets(seg.words, min_len=1) domain_subsets.append(subsets) # 如果某个域没有词,跳过 if len(domain_subsets) != n_domains: continue # 3. 计算笛卡尔积 for word_combination in product(*domain_subsets): # word_combination 是一个tuple,每个元素是一个词列表 # 例如: (["获取"], ["风光", "摄影"]) # 计算总词数 total_words = sum(len(words) for words in word_combination) # 如果总词数<=1,跳过(组词必须大于1个词) if total_words <= 1: continue # 将所有词连接成一个字符串 combined_text = "".join(["".join(words) for words in word_combination]) # 生成类型标签 type_labels = [selected_segments[i].type for i in range(n_domains)] type_label = "[" + "+".join(type_labels) + "]" # 创建DomainCombination对象 comb = DomainCombination( text=combined_text, domains=list(domain_indices), type_label=type_label, source_words=[list(words) for words in word_combination], # 保存来源词 from_segments=[seg.text for seg in selected_segments] ) all_combinations.append(comb) return all_combinations def extract_words_from_segments(segments: list[Segment]) -> list[Q]: """ 从 segments 中提取所有 words,转换为 Q 对象列表 用于 Round 1 的输入:将 Round 0 的 words 转换为可用于请求SUG的 query 列表 Args: segments: Round 0 的语义片段列表 Returns: list[Q]: word 列表,每个 word 作为一个 Q 对象 """ q_list = [] for seg_idx, segment in enumerate(segments): for word in segment.words: # 从 segment.word_scores 获取该 word 的评分 word_score = segment.word_scores.get(word, 0.0) word_reason = segment.word_reasons.get(word, "") # 创建 Q 对象 q = Q( text=word, score_with_o=word_score, reason=word_reason, from_source="word", # 标记来源为 word type_label=f"[{segment.type}]", # 保留域信息 domain_index=seg_idx, # 添加域索引 domain_type=segment.type # 添加域类型(如"中心名词"、"核心动作") ) q_list.append(q) return q_list # ============================================================================ # v120 保留辅助函数 # ============================================================================ def calculate_final_score(motivation_score: float, category_score: float) -> float: """ 应用依存性规则计算最终得分 步骤1: 基础加权计算 base_score = motivation_score * 0.7 + category_score * 0.3 步骤2: 极值保护规则 Args: motivation_score: 动机维度得分 -1~1 category_score: 品类维度得分 -1~1 Returns: 最终得分 -1~1 """ # 基础加权得分 base_score = motivation_score * 0.7 + category_score * 0.3 # 规则C: 动机负向决定机制(最高优先级) if motivation_score < 0: return 0.0 # 规则A: 动机高分保护机制 if motivation_score >= 0.8: # 当目的高度一致时,品类的泛化不应导致"弱相关" return max(base_score, 0.7) # 规则B: 动机低分限制机制 if motivation_score <= 0.2: # 目的不符时,品类匹配的价值有限 return min(base_score, 0.5) # 无规则调整,返回基础得分 return base_score def clean_json_string(text: str) -> str: """清理JSON中的非法控制字符(保留 \t \n \r)""" import re # 移除除了 \t(09) \n(0A) \r(0D) 之外的所有控制字符 return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) def process_note_data(note: dict) -> Post: """处理搜索接口返回的帖子数据""" note_card = note.get("note_card", {}) image_list = note_card.get("image_list", []) interact_info = note_card.get("interact_info", {}) user_info = note_card.get("user", {}) # ========== 调试日志 START ========== note_id = note.get("id", "") raw_title = note_card.get("display_title") # 不提供默认值 raw_body = note_card.get("desc") raw_type = note_card.get("type") # 打印原始值类型和内容 print(f"\n[DEBUG] 处理帖子 {note_id}:") print(f" raw_title 类型: {type(raw_title).__name__}, 值: {repr(raw_title)}") print(f" raw_body 类型: {type(raw_body).__name__}, 值: {repr(raw_body)[:100] if raw_body else repr(raw_body)}") print(f" raw_type 类型: {type(raw_type).__name__}, 值: {repr(raw_type)}") # 检查是否为 None if raw_title is None: print(f" ⚠️ WARNING: display_title 是 None!") if raw_body is None: print(f" ⚠️ WARNING: desc 是 None!") if raw_type is None: print(f" ⚠️ WARNING: type 是 None!") # ========== 调试日志 END ========== # 提取图片URL - 使用新的字段名 image_url images = [] for img in image_list: if isinstance(img, dict): # 尝试新字段名 image_url,如果不存在则尝试旧字段名 url_default img_url = img.get("image_url") or img.get("url_default") if img_url: images.append(img_url) # 判断类型 note_type = note_card.get("type", "normal") video_url = "" if note_type == "video": video_info = note_card.get("video", {}) if isinstance(video_info, dict): # 尝试获取视频URL video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "") return Post( note_id=note.get("id") or "", title=note_card.get("display_title") or "", body_text=note_card.get("desc") or "", type=note_type, images=images, video=video_url, interact_info={ "liked_count": interact_info.get("liked_count", 0), "collected_count": interact_info.get("collected_count", 0), "comment_count": interact_info.get("comment_count", 0), "shared_count": interact_info.get("shared_count", 0) }, note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}" ) async def evaluate_with_o(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]: """评估文本与原始问题o的相关度 采用两阶段评估 + 代码计算规则: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的文本 o: 原始问题 cache: 评估缓存(可选),用于避免重复评估 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 检查缓存 if cache is not None and text in cache: cached_score, cached_reason = cache[text] print(f" ⚡ 缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <原始问题> {o} <平台sug词条> {text} 请评估平台sug词条与原始问题的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用两个评估器(统一使用标准评估策略) motivation_task = Runner.run(motivation_evaluator, eval_input) category_task = Runner.run(category_evaluator, eval_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 # 计算基础得分 base_score = motivation_score * 0.7 + category_score * 0.3 # 应用规则计算最终得分 final_score = calculate_final_score(motivation_score, category_score) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 combined_reason = ( f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【基础得分 {base_score:.2f}】= 动机({motivation_score:.2f})*0.7 + 品类({category_score:.2f})*0.3\n" f"【最终得分 {final_score:.2f}】" ) # 如果应用了规则,添加规则说明 if final_score != base_score: if motivation_score < 0: combined_reason += "(应用规则C:动机负向决定机制)" elif motivation_score >= 0.8: combined_reason += "(应用规则A:动机高分保护机制)" elif motivation_score <= 0.2: combined_reason += "(应用规则B:动机低分限制机制)" # 存入缓存 if cache is not None: cache[text] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) # 等待1秒后重试 else: print(f" ❌ 评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason # ============================================================================ # 核心流程函数 # ============================================================================ async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]: """ 初始化阶段 Returns: (seg_list, word_list_1, q_list_1, seed_list) """ print(f"\n{'='*60}") print(f"初始化阶段") print(f"{'='*60}") # 1. 分词:原始问题(o) ->分词-> seg_list print(f"\n[步骤1] 分词...") result = await Runner.run(word_segmenter, o) segmentation: WordSegmentation = result.final_output seg_list = [] for word in segmentation.words: seg_list.append(Seg(text=word, from_o=o)) print(f"分词结果: {[s.text for s in seg_list]}") print(f"分词理由: {segmentation.reasoning}") # 2. 分词评估:seg_list -> 每个seg与o进行评分(使用信号量限制并发数) print(f"\n[步骤2] 评估每个分词与原始问题的相关度...") MAX_CONCURRENT_SEG_EVALUATIONS = 10 seg_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SEG_EVALUATIONS) async def evaluate_seg(seg: Seg) -> Seg: async with seg_semaphore: # 初始化阶段的分词评估使用第一轮 prompt (round_num=1) seg.score_with_o, seg.reason = await evaluate_with_o(seg.text, o, context.evaluation_cache, round_num=1) return seg if seg_list: print(f" 开始评估 {len(seg_list)} 个分词(并发限制: {MAX_CONCURRENT_SEG_EVALUATIONS})...") eval_tasks = [evaluate_seg(seg) for seg in seg_list] await asyncio.gather(*eval_tasks) for seg in seg_list: print(f" {seg.text}: {seg.score_with_o:.2f}") # 3. 构建word_list_1: seg_list -> word_list_1(固定词库) print(f"\n[步骤3] 构建word_list_1(固定词库)...") word_list_1 = [] for seg in seg_list: word_list_1.append(Word( text=seg.text, score_with_o=seg.score_with_o, from_o=o )) print(f"word_list_1(固定): {[w.text for w in word_list_1]}") # 4. 构建q_list_1:seg_list 作为 q_list_1 print(f"\n[步骤4] 构建q_list_1...") q_list_1 = [] for seg in seg_list: q_list_1.append(Q( text=seg.text, score_with_o=seg.score_with_o, reason=seg.reason, from_source="seg" )) print(f"q_list_1: {[q.text for q in q_list_1]}") # 5. 构建seed_list: seg_list -> seed_list print(f"\n[步骤5] 构建seed_list...") seed_list = [] for seg in seg_list: seed_list.append(Seed( text=seg.text, added_words=[], from_type="seg", score_with_o=seg.score_with_o )) print(f"seed_list: {[s.text for s in seed_list]}") return seg_list, word_list_1, q_list_1, seed_list async def run_round( round_num: int, q_list: list[Q], word_list_1: list[Word], seed_list: list[Seed], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, sug_threshold: float = 0.7 ) -> tuple[list[Q], list[Seed], list[Search]]: """ 运行一轮 Args: round_num: 轮次编号 q_list: 当前轮的q列表 word_list_1: 固定的词库(第0轮分词结果) seed_list: 当前的seed列表 o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: suggestion的阈值 Returns: (q_list_next, seed_list_next, search_list) """ print(f"\n{'='*60}") print(f"第{round_num}轮") print(f"{'='*60}") round_data = { "round_num": round_num, "input_q_list": [{"text": q.text, "score": q.score_with_o, "type": "query"} for q in q_list], "input_word_list_1_size": len(word_list_1), "input_seed_list_size": len(seed_list) } # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list print(f"\n[步骤1] 为每个q请求建议词...") sug_list_list = [] # list of list for q in q_list: print(f"\n 处理q: {q.text}") suggestions = xiaohongshu_api.get_recommendations(keyword=q.text) q_sug_list = [] if suggestions: print(f" 获取到 {len(suggestions)} 个建议词") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) q_sug_list.append(sug) else: print(f" 未获取到建议词") sug_list_list.append(q_sug_list) # 2. sug评估:sug_list_list -> 每个sug与o进行评分(并发) print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...") # 2.1 收集所有需要评估的sug,并记录它们所属的q all_sugs = [] sug_to_q_map = {} # 记录每个sug属于哪个q for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text for sug in q_sug_list: all_sugs.append(sug) sug_to_q_map[id(sug)] = q_text # 2.2 并发评估所有sug(使用信号量限制并发数) # 每个 evaluate_sug 内部会并发调用 2 个 LLM,所以这里限制为 5,实际并发 LLM 请求为 10 MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) async def evaluate_sug(sug: Sug) -> Sug: async with semaphore: # 限制并发数 # 根据轮次选择 prompt: 第一轮使用 round1 prompt,后续使用标准 prompt sug.score_with_o, sug.reason = await evaluate_with_o(sug.text, o, context.evaluation_cache, round_num=round_num) return sug if all_sugs: print(f" 开始评估 {len(all_sugs)} 个建议词(并发限制: {MAX_CONCURRENT_EVALUATIONS})...") eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 2.3 打印结果并组织到sug_details sug_details = {} # 保存每个Q对应的sug列表 for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text print(f"\n 来自q '{q_text}' 的建议词:") sug_details[q_text] = [] for sug in q_sug_list: print(f" {sug.text}: {sug.score_with_o:.2f}") # 保存到sug_details sug_details[q_text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 2.4 剪枝判断(已禁用 - 保留所有分支) pruned_query_texts = set() if False: # 原: if round_num >= 2: # 剪枝功能已禁用,保留代码以便后续调整 print(f"\n[剪枝判断] 第{round_num}轮开始应用剪枝策略...") for i, q in enumerate(q_list): q_sug_list = sug_list_list[i] if len(q_sug_list) == 0: continue # 没有sug则不剪枝 # 剪枝条件1: 所有sug分数都低于query分数 all_lower_than_query = all(sug.score_with_o < q.score_with_o for sug in q_sug_list) # 剪枝条件2: 所有sug分数都低于0.5 all_below_threshold = all(sug.score_with_o < 0.5 for sug in q_sug_list) if all_lower_than_query and all_below_threshold: pruned_query_texts.add(q.text) max_sug_score = max(sug.score_with_o for sug in q_sug_list) print(f" 🔪 剪枝: {q.text} (query分数:{q.score_with_o:.2f}, sug最高分:{max_sug_score:.2f}, 全部<0.5)") if pruned_query_texts: print(f" 本轮共剪枝 {len(pruned_query_texts)} 个query") else: print(f" 本轮无query被剪枝") else: print(f"\n[剪枝判断] 剪枝功能已禁用,保留所有分支") # 3. search_list构建 print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...") search_list = [] high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] if high_score_sugs: print(f" 找到 {len(high_score_sugs)} 个高分建议词") # 并发搜索 async def search_for_sug(sug: Sug) -> Search: print(f" 搜索: {sug.text}") try: search_result = xiaohongshu_search.search(keyword=sug.text) result_str = search_result.get("result", "{}") if isinstance(result_str, str): result_data = json.loads(result_str) else: result_data = result_str notes = result_data.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: # 只取前10个 post = process_note_data(note) post_list.append(post) print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) search_tasks = [search_for_sug(sug) for sug in high_score_sugs] search_list = await asyncio.gather(*search_tasks) else: print(f" 没有高分建议词,search_list为空") # 4. 构建q_list_next print(f"\n[步骤4] 构建q_list_next...") q_list_next = [] existing_q_texts = set() # 用于去重 add_word_details = {} # 保存每个seed对应的组合词列表 all_seed_combinations = [] # 保存本轮所有seed的组合词(用于后续构建seed_list_next) # 4.1 对于seed_list中的每个seed,从word_list_1中选词组合,产生Top 5 print(f"\n 4.1 为每个seed加词(产生Top 5组合)...") for seed in seed_list: print(f"\n 处理seed: {seed.text}") # 剪枝检查:跳过被剪枝的seed if seed.text in pruned_query_texts: print(f" ⊗ 跳过被剪枝的seed: {seed.text}") continue # 从固定词库word_list_1筛选候选词 candidate_words = [] for word in word_list_1: # 检查词是否已在seed中 if word.text in seed.text: continue # 检查词是否已被添加过 if word.text in seed.added_words: continue candidate_words.append(word) if not candidate_words: print(f" 没有可用的候选词") continue print(f" 候选词数量: {len(candidate_words)}") # 调用Agent一次性选择并组合Top 5(添加重试机制) candidate_words_text = ', '.join([w.text for w in candidate_words]) selection_input = f""" <原始问题> {o} <当前Seed> {seed.text} <候选词列表> {candidate_words_text} 请从候选词列表中选择最多5个最合适的词,分别与当前seed组合成新的query。 """ # 重试机制 max_retries = 2 selection_result = None for attempt in range(max_retries): try: result = await Runner.run(word_selector, selection_input) selection_result = result.final_output break # 成功则跳出 except Exception as e: error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 选词失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:100]}") await asyncio.sleep(1) else: print(f" ❌ 选词失败,跳过该seed: {error_msg[:100]}") break if selection_result is None: print(f" 跳过seed: {seed.text}") continue print(f" Agent选择了 {len(selection_result.combinations)} 个组合") print(f" 整体选择思路: {selection_result.overall_reasoning}") # 并发评估所有组合的相关度 async def evaluate_combination(comb: WordCombination) -> dict: combined = comb.combined_query # 验证:组合结果必须包含完整的seed和word # 检查是否包含seed的所有字符 seed_chars_in_combined = all(char in combined for char in seed.text) # 检查是否包含word的所有字符 word_chars_in_combined = all(char in combined for char in comb.selected_word) if not seed_chars_in_combined or not word_chars_in_combined: print(f" ⚠️ 警告:组合不完整") print(f" Seed: {seed.text}") print(f" Word: {comb.selected_word}") print(f" 组合: {combined}") print(f" 包含完整seed? {seed_chars_in_combined}") print(f" 包含完整word? {word_chars_in_combined}") # 返回极低分数,让这个组合不会被选中 return { 'word': comb.selected_word, 'query': combined, 'score': -1.0, # 极低分数 'reason': f"组合不完整:缺少seed或word的部分内容", 'reasoning': comb.reasoning } # 正常评估,根据轮次选择 prompt score, reason = await evaluate_with_o(combined, o, context.evaluation_cache, round_num=round_num) return { 'word': comb.selected_word, 'query': combined, 'score': score, 'reason': reason, 'reasoning': comb.reasoning } eval_tasks = [evaluate_combination(comb) for comb in selection_result.combinations] top_5 = await asyncio.gather(*eval_tasks) print(f" 评估完成,得到 {len(top_5)} 个组合") # 将Top 5全部加入q_list_next(去重检查 + 得分过滤) for comb in top_5: # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才能加入下一轮 if comb['score'] < seed.score_with_o + REQUIRED_SCORE_GAIN: print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") continue # 去重检查 if comb['query'] in existing_q_texts: print(f" ⊗ 跳过重复: {comb['query']}") continue print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} > 种子: {seed.score_with_o:.2f})") new_q = Q( text=comb['query'], score_with_o=comb['score'], reason=comb['reason'], from_source="add" ) q_list_next.append(new_q) existing_q_texts.add(comb['query']) # 记录到去重集合 # 记录已添加的词 seed.added_words.append(comb['word']) # 保存到add_word_details add_word_details[seed.text] = [ { "text": comb['query'], "score": comb['score'], "reason": comb['reason'], "selected_word": comb['word'], "seed_score": seed.score_with_o, # 添加原始种子的得分 "type": "add" } for comb in top_5 ] # 保存到all_seed_combinations(用于构建seed_list_next) # 附加seed_score,用于后续过滤 for comb in top_5: comb['seed_score'] = seed.score_with_o all_seed_combinations.extend(top_5) # 4.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next(去重检查) print(f"\n 4.2 将高分sug加入q_list_next...") for sug in all_sugs: # 剪枝检查:跳过来自被剪枝query的sug if sug.from_q and sug.from_q.text in pruned_query_texts: print(f" ⊗ 跳过来自被剪枝query的sug: {sug.text} (来源: {sug.from_q.text})") continue # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才能加入下一轮 if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: # 去重检查 if sug.text in existing_q_texts: print(f" ⊗ 跳过重复: {sug.text}") continue new_q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug" ) q_list_next.append(new_q) existing_q_texts.add(sug.text) # 记录到去重集合 print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 5. 构建seed_list_next(关键修改:不保留上一轮的seed) print(f"\n[步骤5] 构建seed_list_next(不保留上轮seed)...") seed_list_next = [] existing_seed_texts = set() # 5.1 加入本轮所有组合词(只加入得分提升的) print(f" 5.1 加入本轮所有组合词(得分过滤)...") for comb in all_seed_combinations: # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才作为下一轮种子 seed_score = comb.get('seed_score', 0) if comb['score'] < seed_score + REQUIRED_SCORE_GAIN: print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})") continue if comb['query'] not in existing_seed_texts: new_seed = Seed( text=comb['query'], added_words=[], # 新seed的added_words清空 from_type="add", score_with_o=comb['score'] ) seed_list_next.append(new_seed) existing_seed_texts.add(comb['query']) print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} >= 种子: {seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 5.2 加入高分sug print(f" 5.2 加入高分sug...") for sug in all_sugs: # 剪枝检查:跳过来自被剪枝query的sug if sug.from_q and sug.from_q.text in pruned_query_texts: continue # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才作为下一轮种子 if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN and sug.text not in existing_seed_texts: new_seed = Seed( text=sug.text, added_words=[], from_type="sug", score_with_o=sug.score_with_o ) seed_list_next.append(new_seed) existing_seed_texts.add(sug.text) print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})") # 序列化搜索结果数据(包含帖子详情) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [ { "note_id": post.note_id, "note_url": post.note_url, "title": post.title, "body_text": post.body_text, "images": post.images, "interact_info": post.interact_info } for post in search.post_list ] }) # 记录本轮数据 round_data.update({ "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "total_posts": sum(len(s.post_list) for s in search_list), "q_list_next_size": len(q_list_next), "seed_list_next_size": len(seed_list_next), "total_combinations": len(all_seed_combinations), "pruned_query_count": len(pruned_query_texts), "pruned_queries": list(pruned_query_texts), "output_q_list": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "from": q.from_source, "type": "query"} for q in q_list_next], "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next], "sug_details": sug_details, "add_word_details": add_word_details, "search_results": search_results_data }) context.rounds.append(round_data) print(f"\n本轮总结:") print(f" 建议词数量: {len(all_sugs)}") print(f" 高分建议词: {len(high_score_sugs)}") print(f" 搜索数量: {len(search_list)}") print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}") print(f" 组合词数量: {len(all_seed_combinations)}") print(f" 下轮q数量: {len(q_list_next)}") print(f" 下轮seed数量: {len(seed_list_next)}") return q_list_next, seed_list_next, search_list async def iterative_loop( context: RunContext, max_rounds: int = 2, sug_threshold: float = 0.7 ): """主迭代循环""" print(f"\n{'='*60}") print(f"开始迭代循环") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # 初始化 seg_list, word_list_1, q_list, seed_list = await initialize(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() # 保存初始化数据 context.rounds.append({ "round_num": 0, "type": "initialization", "seg_list": [{"text": s.text, "score": s.score_with_o, "reason": s.reason, "type": "seg"} for s in seg_list], "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list_1], "q_list_1": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "type": "query"} for q in q_list], "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o, "type": "seed"} for s in seed_list] }) # 收集所有搜索结果 all_search_list = [] # 迭代 round_num = 1 while q_list and round_num <= max_rounds: q_list, seed_list, search_list = await run_round( round_num=round_num, q_list=q_list, word_list_1=word_list_1, # 传递固定词库 seed_list=seed_list, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, sug_threshold=sug_threshold ) all_search_list.extend(search_list) round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 总轮数: {round_num - 1}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") print(f"{'='*60}") return all_search_list # ============================================================================ # v121 新架构核心流程函数 # ============================================================================ async def initialize_v2(o: str, context: RunContext) -> list[Segment]: """ v121 Round 0 初始化阶段 流程: 1. 语义分段: 调用 semantic_segmenter 将原始问题拆分成语义片段 2. 拆词: 对每个segment调用 word_segmenter 进行拆词 3. 评估: 对每个segment和词进行评估 4. 不进行组合(Round 0只分段和拆词) Returns: 语义片段列表 (Segment) """ print(f"\n{'='*60}") print(f"Round 0: 初始化阶段(语义分段 + 拆词)") print(f"{'='*60}") # 1. 语义分段 print(f"\n[步骤1] 语义分段...") result = await Runner.run(semantic_segmenter, o) segmentation: SemanticSegmentation = result.final_output print(f"语义分段结果: {len(segmentation.segments)} 个片段") print(f"整体分段思路: {segmentation.overall_reasoning}") segment_list = [] for seg_item in segmentation.segments: segment = Segment( text=seg_item.segment_text, type=seg_item.segment_type, from_o=o ) segment_list.append(segment) print(f" - [{segment.type}] {segment.text}") # 2. 对每个segment拆词并评估 print(f"\n[步骤2] 对每个segment拆词并评估...") MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) async def process_segment(segment: Segment) -> Segment: """处理单个segment: 拆词 + 评估segment + 评估词""" async with semaphore: # 2.1 拆词 word_result = await Runner.run(word_segmenter, segment.text) word_segmentation: WordSegmentation = word_result.final_output segment.words = word_segmentation.words # 2.2 评估segment与原始问题的相关度 segment.score_with_o, segment.reason = await evaluate_with_o( segment.text, o, context.evaluation_cache ) # 2.3 评估每个词与原始问题的相关度 word_eval_tasks = [] for word in segment.words: async def eval_word(w: str) -> tuple[str, float, str]: score, reason = await evaluate_with_o(w, o, context.evaluation_cache) return w, score, reason word_eval_tasks.append(eval_word(word)) word_results = await asyncio.gather(*word_eval_tasks) for word, score, reason in word_results: segment.word_scores[word] = score segment.word_reasons[word] = reason return segment if segment_list: print(f" 开始处理 {len(segment_list)} 个segment(并发限制: {MAX_CONCURRENT_EVALUATIONS})...") process_tasks = [process_segment(seg) for seg in segment_list] await asyncio.gather(*process_tasks) # 打印步骤1结果 print(f"\n[步骤1: 分段及拆词 结果]") for segment in segment_list: print(f" [{segment.type}] {segment.text} (分数: {segment.score_with_o:.2f})") print(f" 拆词: {segment.words}") for word in segment.words: score = segment.word_scores.get(word, 0.0) print(f" - {word}: {score:.2f}") # 保存到context(保留旧格式以兼容) context.segments = [ { "text": seg.text, "type": seg.type, "score": seg.score_with_o, "reason": seg.reason, "words": seg.words, "word_scores": seg.word_scores, "word_reasons": seg.word_reasons } for seg in segment_list ] # 保存 Round 0 到 context.rounds(新格式用于可视化) context.rounds.append({ "round_num": 0, "type": "initialization", "segments": [ { "text": seg.text, "type": seg.type, "domain_index": idx, "score": seg.score_with_o, "reason": seg.reason, "words": [ { "text": word, "score": seg.word_scores.get(word, 0.0), "reason": seg.word_reasons.get(word, "") } for word in seg.words ] } for idx, seg in enumerate(segment_list) ] }) print(f"\n[Round 0 完成]") print(f" 分段数: {len(segment_list)}") total_words = sum(len(seg.words) for seg in segment_list) print(f" 总词数: {total_words}") return segment_list async def run_round_v2( round_num: int, query_input: list[Q], segments: list[Segment], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, sug_threshold: float = 0.7 ) -> tuple[list[Q], list[Search]]: """ v121 Round N 执行 正确的流程顺序: 1. 为 query_input 请求SUG 2. 评估SUG 3. 高分SUG搜索 4. N域组合(从segments生成) 5. 评估组合 6. 生成 q_list_next(组合 + 高分SUG) Args: round_num: 轮次编号 (1-4) query_input: 本轮的输入query列表(Round 1是words,Round 2+是上轮输出) segments: 语义片段列表(用于组合) o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: SUG搜索阈值 Returns: (q_list_next, search_list) """ print(f"\n{'='*60}") print(f"Round {round_num}: {round_num}域组合") print(f"{'='*60}") round_data = { "round_num": round_num, "n_domains": round_num, "input_query_count": len(query_input) } MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) # 步骤1: 为 query_input 请求SUG print(f"\n[步骤1] 为{len(query_input)}个输入query请求SUG...") all_sugs = [] sug_details = {} for q in query_input: suggestions = xiaohongshu_api.get_recommendations(keyword=q.text) if suggestions: print(f" {q.text}: 获取到 {len(suggestions)} 个SUG") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) all_sugs.append(sug) else: print(f" {q.text}: 未获取到SUG") print(f" 共获取 {len(all_sugs)} 个SUG") # 步骤2: 评估SUG if len(all_sugs) > 0: print(f"\n[步骤2] 评估{len(all_sugs)}个SUG...") async def evaluate_sug(sug: Sug) -> Sug: async with semaphore: sug.score_with_o, sug.reason = await evaluate_with_o( sug.text, o, context.evaluation_cache ) return sug eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 打印结果 for sug in all_sugs: print(f" {sug.text}: {sug.score_with_o:.2f}") if sug.from_q: if sug.from_q.text not in sug_details: sug_details[sug.from_q.text] = [] sug_details[sug.from_q.text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 步骤3: 搜索高分SUG print(f"\n[步骤3] 搜索高分SUG(阈值 > {sug_threshold})...") high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] print(f" 找到 {len(high_score_sugs)} 个高分SUG") search_list = [] if len(high_score_sugs) > 0: async def search_for_sug(sug: Sug) -> Search: print(f" 搜索: {sug.text}") try: search_result = xiaohongshu_search.search(keyword=sug.text) result_str = search_result.get("result", "{}") if isinstance(result_str, str): result_data = json.loads(result_str) else: result_data = result_str notes = result_data.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: post = process_note_data(note) post_list.append(post) print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) search_tasks = [search_for_sug(sug) for sug in high_score_sugs] search_list = await asyncio.gather(*search_tasks) # 步骤4: 生成N域组合 print(f"\n[步骤4] 生成{round_num}域组合...") domain_combinations = generate_domain_combinations(segments, round_num) print(f" 生成了 {len(domain_combinations)} 个组合") if len(domain_combinations) == 0: print(f" 无法生成{round_num}域组合") # 即使无法组合,也返回高分SUG作为下轮输入 q_list_next = [] for sug in all_sugs: if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug", type_label="" ) q_list_next.append(q) round_data.update({ "domain_combinations_count": 0, "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "sug_details": sug_details, "q_list_next_size": len(q_list_next) }) context.rounds.append(round_data) return q_list_next, search_list # 步骤5: 评估所有组合 print(f"\n[步骤5] 评估{len(domain_combinations)}个组合...") async def evaluate_combination(comb: DomainCombination) -> DomainCombination: async with semaphore: comb.score_with_o, comb.reason = await evaluate_with_o( comb.text, o, context.evaluation_cache ) return comb eval_tasks = [evaluate_combination(comb) for comb in domain_combinations] await asyncio.gather(*eval_tasks) # 排序 - 已注释,保持原始顺序 # domain_combinations.sort(key=lambda x: x.score_with_o, reverse=True) # 打印所有组合(保持原始顺序) print(f" 评估完成,共{len(domain_combinations)}个组合:") for i, comb in enumerate(domain_combinations, 1): print(f" {i}. {comb.text} {comb.type_label} (分数: {comb.score_with_o:.2f})") # 步骤6: 构建 q_list_next(组合 + 高分SUG) print(f"\n[步骤6] 生成下轮输入...") q_list_next = [] # 6.1 添加高分组合 high_score_combinations = [comb for comb in domain_combinations if comb.score_with_o > REQUIRED_SCORE_GAIN] for comb in high_score_combinations: # 生成域字符串,如 "D0,D3" domains_str = ','.join([f'D{d}' for d in comb.domains]) if comb.domains else '' q = Q( text=comb.text, score_with_o=comb.score_with_o, reason=comb.reason, from_source="domain_comb", type_label=comb.type_label, domain_type=domains_str # 添加域信息 ) q_list_next.append(q) print(f" 添加 {len(high_score_combinations)} 个高分组合") # 6.2 添加高分SUG(满足增益条件) high_gain_sugs = [] for sug in all_sugs: if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN: q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug", type_label="" ) q_list_next.append(q) high_gain_sugs.append(sug) print(f" 添加 {len(high_gain_sugs)} 个高增益SUG(增益 > {REQUIRED_SCORE_GAIN})") # 保存round数据(包含完整帖子信息) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [ { "note_id": post.note_id, "note_url": post.note_url, "title": post.title, "body_text": post.body_text, "images": post.images, "interact_info": post.interact_info } for post in search.post_list ] }) round_data.update({ "input_queries": [{"text": q.text, "score": q.score_with_o, "from_source": q.from_source, "type": "input", "domain_index": q.domain_index, "domain_type": q.domain_type} for q in query_input], "domain_combinations_count": len(domain_combinations), "domain_combinations": [ { "text": comb.text, "type_label": comb.type_label, "score": comb.score_with_o, "reason": comb.reason, "domains": comb.domains, "source_words": comb.source_words, "from_segments": comb.from_segments } for comb in domain_combinations ], "high_score_combinations": [{"text": q.text, "score": q.score_with_o, "type_label": q.type_label, "type": "combination"} for q in q_list_next if q.from_source == "domain_comb"], "sug_count": len(all_sugs), "sug_details": sug_details, "high_score_sug_count": len(high_score_sugs), "high_gain_sugs": [{"text": q.text, "score": q.score_with_o, "type": "sug"} for q in q_list_next if q.from_source == "sug"], "search_count": len(search_list), "search_results": search_results_data, "q_list_next_size": len(q_list_next) }) context.rounds.append(round_data) print(f"\nRound {round_num} 总结:") print(f" 输入Query数: {len(query_input)}") print(f" 域组合数: {len(domain_combinations)}") print(f" 高分组合: {len(high_score_combinations)}") print(f" SUG数: {len(all_sugs)}") print(f" 高分SUG数: {len(high_score_sugs)}") print(f" 高增益SUG: {len(high_gain_sugs)}") print(f" 搜索数: {len(search_list)}") print(f" 下轮Query数: {len(q_list_next)}") return q_list_next, search_list async def iterative_loop_v2( context: RunContext, max_rounds: int = 4, sug_threshold: float = 0.7 ): """v121 主迭代循环""" print(f"\n{'='*60}") print(f"开始v121迭代循环(语义分段跨域组词版)") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # Round 0: 初始化(语义分段 + 拆词) segments = await initialize_v2(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() # 收集所有搜索结果 all_search_list = [] # 准备 Round 1 的输入:从 segments 提取所有 words query_input = extract_words_from_segments(segments) print(f"\n提取了 {len(query_input)} 个词作为 Round 1 的输入") # Round 1-N: 迭代循环 num_segments = len(segments) actual_max_rounds = min(max_rounds, num_segments) round_num = 1 while query_input and round_num <= actual_max_rounds: query_input, search_list = await run_round_v2( round_num=round_num, query_input=query_input, # 传递上一轮的输出 segments=segments, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, sug_threshold=sug_threshold ) all_search_list.extend(search_list) # 如果没有新的query,提前结束 if not query_input: print(f"\n第{round_num}轮后无新query生成,提前结束迭代") break round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 实际轮数: {round_num}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") print(f"{'='*60}") return all_search_list # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False): """主函数""" current_time, log_url = set_trace() # 读取输入 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') c = read_file_as_string(input_context_file) # 原始需求 o = read_file_as_string(input_q_file) # 原始问题 # 版本信息 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) # 创建运行上下文 run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, c=c, o=o, log_dir=log_dir, log_url=log_url, ) # 创建日志目录 os.makedirs(run_context.log_dir, exist_ok=True) # 配置日志文件 log_file_path = os.path.join(run_context.log_dir, "run.log") log_file = open(log_file_path, 'w', encoding='utf-8') # 重定向stdout到TeeLogger(同时输出到控制台和文件) original_stdout = sys.stdout sys.stdout = TeeLogger(original_stdout, log_file) try: print(f"📝 日志文件: {log_file_path}") print(f"{'='*60}\n") # 执行迭代 (v121: 使用新架构) all_search_list = await iterative_loop_v2( run_context, max_rounds=max_rounds, sug_threshold=sug_threshold ) # 格式化输出 output = f"原始需求:{run_context.c}\n" output += f"原始问题:{run_context.o}\n" output += f"总搜索次数:{len(all_search_list)}\n" output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n" output += "\n" + "="*60 + "\n" if all_search_list: output += "【搜索结果】\n\n" for idx, search in enumerate(all_search_list, 1): output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n" output += f" 帖子数: {len(search.post_list)}\n" if search.post_list: for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个 output += f" {post_idx}) {post.title}\n" output += f" URL: {post.note_url}\n" output += "\n" else: output += "未找到搜索结果\n" run_context.final_output = output print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(output) # 保存上下文文件 context_file_path = os.path.join(run_context.log_dir, "run_context.json") context_dict = run_context.model_dump() with open(context_file_path, "w", encoding="utf-8") as f: json.dump(context_dict, f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") # 保存详细的搜索结果 search_results_path = os.path.join(run_context.log_dir, "search_results.json") search_results_data = [s.model_dump() for s in all_search_list] with open(search_results_path, "w", encoding="utf-8") as f: json.dump(search_results_data, f, ensure_ascii=False, indent=2) print(f"Search results saved to: {search_results_path}") # 可视化 if visualize: import subprocess output_html = os.path.join(run_context.log_dir, "visualization.html") print(f"\n🎨 生成可视化HTML...") # 获取绝对路径 abs_context_file = os.path.abspath(context_file_path) abs_output_html = os.path.abspath(output_html) # 运行可视化脚本 result = subprocess.run([ "node", "visualization/sug_v6_1_2_121/index.js", abs_context_file, abs_output_html ]) if result.returncode == 0: print(f"✅ 可视化已生成: {output_html}") else: print(f"❌ 可视化生成失败") finally: # 恢复stdout sys.stdout = original_stdout log_file.close() print(f"\n📝 运行日志已保存: {log_file_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.121 语义分段跨域组词版") parser.add_argument( "--input-dir", type=str, default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?", help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?" ) parser.add_argument( "--max-rounds", type=int, default=4, help="最大轮数,默认: 4" ) parser.add_argument( "--sug-threshold", type=float, default=0.7, help="suggestion阈值,默认: 0.7" ) parser.add_argument( "--visualize", action="store_true", default=True, help="运行完成后自动生成可视化HTML" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize))