import asyncio import json import os import sys import argparse from datetime import datetime from typing import Literal from agents import Agent, Runner from lib.my_trace import set_trace from pydantic import BaseModel, Field from lib.utils import read_file_as_string from lib.client import get_model MODEL_NAME = "google/gemini-2.5-flash" from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from script.search.xiaohongshu_search import XiaohongshuSearch # ============================================================================ # 数据模型 # ============================================================================ class Seg(BaseModel): """分词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_o: str = "" # 原始问题 class Word(BaseModel): """词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_o: str = "" # 原始问题 class QFromQ(BaseModel): """Q来源信息(用于Sug中记录)""" text: str score_with_o: float = 0.0 class Q(BaseModel): """查询""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_source: str = "" # seg/sug/add(加词) class Sug(BaseModel): """建议词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 reason: str = "" # 评分理由 from_q: QFromQ | None = None # 来自的q class Seed(BaseModel): """种子""" text: str added_words: list[str] = Field(default_factory=list) # 已经增加的words from_type: str = "" # seg/sug/add score_with_o: float = 0.0 # 与原始问题的评分 class Post(BaseModel): """帖子""" title: str = "" body_text: str = "" type: str = "normal" # video/normal images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面 video: str = "" # 视频url interact_info: dict = Field(default_factory=dict) # 互动信息 note_id: str = "" note_url: str = "" class Search(Sug): """搜索结果(继承Sug)""" post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表 class RunContext(BaseModel): """运行上下文""" version: str input_files: dict[str, str] c: str # 原始需求 o: str # 原始问题 log_url: str log_dir: str # 每轮的数据 rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据 # 最终结果 final_output: str | None = None # 评估缓存:避免重复评估相同文本 evaluation_cache: dict[str, tuple[float, str]] = Field(default_factory=dict) # key: 文本, value: (score, reason) # ============================================================================ # Agent 定义 # ============================================================================ # Agent 1: 分词专家 class WordSegmentation(BaseModel): """分词结果""" words: list[str] = Field(..., description="分词结果列表") reasoning: str = Field(..., description="分词理由") word_segmentation_instructions = """ 你是分词专家。给定一个query,将其拆分成有意义的最小单元。 ## 分词原则 1. 保留有搜索意义的词汇 2. 拆分成独立的概念 3. 保留专业术语的完整性 4. 去除虚词(的、吗、呢等) ## 输出要求 返回分词列表和分词理由。 """.strip() word_segmenter = Agent[None]( name="分词专家", instructions=word_segmentation_instructions, model=get_model(MODEL_NAME), output_type=WordSegmentation, ) # Agent 2: 动机维度评估专家 + 品类维度评估专家(两阶段评估) # 动机评估的嵌套模型 class CoreMotivationExtraction(BaseModel): """核心动机提取""" 简要说明核心动机: str = Field(..., description="核心动机说明") class MotivationEvaluation(BaseModel): """动机维度评估""" 原始问题核心动机提取: CoreMotivationExtraction = Field(..., description="原始问题核心动机提取") 动机维度得分: float = Field(..., description="动机维度得分 -1~1") 简要说明动机维度相关度理由: str = Field(..., description="动机维度相关度理由") class CategoryEvaluation(BaseModel): """品类维度评估""" 品类维度得分: float = Field(..., description="品类维度得分 -1~1") 简要说明品类维度相关度理由: str = Field(..., description="品类维度相关度理由") # 动机评估 prompt motivation_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **动机维度** 进行: ### 1. 动机维度 **定义:** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 --- ## 如何识别原始问题的核心动机 **核心动机必须是动词**,识别方法如下: ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 示例: 例: "川西秋天风光摄影" → 隐含动作="拍摄" → 需结合上下文判断 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 在此情况下,动机维度得分应为 0。 示例: "摄影" → 无法识别动机,动机维度得分 = 0 "川西风光" → 无法识别动机,动机维度得分 = 0 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <平台sug词条> ↓ 【动机维度相关性判定】 ├→ 步骤1: 评估与<原始问题>的需求动机匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度1: 动机维度评估 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性 评分标准: 【正向匹配】 +0.95~1.0: 核心动作完全一致 - 例: 原始问题"如何获取素材" vs sug词"素材获取方法" - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 · 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致) +0.75~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"如何下载素材" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 +0.5~0.75: 核心动作相关但非直接对应(相关实现路径) - 例: 原始问题"如何获取素材" vs sug词"素材管理整理" +0.2~0.45: 核心动作弱相关(同领域不同动作) - 例: 原始问题"如何拍摄风光" vs sug词"风光摄影欣赏" 【中性/无关】 0: 没有明确目的,动作意图无明确关联 - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 - 如果原始问题无法识别动机,则动机维度得分为0。 【负向偏离】 -0.2~-0.05: 动作意图轻度冲突或误导 - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知" -0.5~-0.25: 动作意图明显对立 - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材" -1.0~-0.55: 动作意图完全相反或产生严重负面引导 - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "原始问题核心动机提取": { "简要说明核心动机": "" }, "动机维度得分": "-1到1之间的小数", "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 #注意事项: 始终围绕动机维度:所有评估都基于"动机"维度,不偏离 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当sug词条对原始问题动机产生误导、冲突或有害引导时给予负分 零分使用原则:当sug词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。 """.strip() # 品类评估 prompt category_evaluation_instructions = """ #角色 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 评估维度 本评估系统围绕 **品类维度** 进行: ### 2. 品类维度 **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 --- # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <平台sug词条> ↓ 【品类维度相关性判定】 ├→ 步骤1: 评估与<原始问题>的内容主体和限定词匹配度 └→ 输出: -1到1之间的数值 + 判定依据 相关度评估维度详解 维度2: 品类维度评估 评估对象: <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +0.95~1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,存在限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.2~0.5: 主体词不匹配,限定词缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.2: 主体词不匹配,品类不同 - 例: 原始问题"风光摄影素材" vs sug词"人文摄影素材" 【中性/无关】 0: 类别明显不同,没有明确目的,无明确关联 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: ```json { "品类维度得分": "-1到1之间的小数", "简要说明品类维度相关度理由": "评估该sug词条与原始问题品类匹配程度的理由" } ``` **输出约束(非常重要)**: 1. **字符串长度限制**:\"简要说明品类维度相关度理由\"字段必须控制在**150字以内** 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号 --- #注意事项: 始终围绕品类维度:所有评估都基于"品类"维度,不偏离 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当sug词条对原始问题品类产生误导、冲突或有害引导时给予负分 零分使用原则:当sug词条与原始问题品类无明确关联,既不相关也不冲突时给予零分 """.strip() # 创建两个评估 Agent motivation_evaluator = Agent[None]( name="动机维度评估专家", instructions=motivation_evaluation_instructions, model=get_model(MODEL_NAME), output_type=MotivationEvaluation, ) category_evaluator = Agent[None]( name="品类维度评估专家", instructions=category_evaluation_instructions, model=get_model(MODEL_NAME), output_type=CategoryEvaluation, ) # Agent 3: 加词选择专家 class WordCombination(BaseModel): """单个词组合""" selected_word: str = Field(..., description="选择的词") combined_query: str = Field(..., description="组合后的新query") reasoning: str = Field(..., description="选择理由") class WordSelectionTop5(BaseModel): """加词选择结果(Top 5)""" combinations: list[WordCombination] = Field( ..., description="选择的Top 5组合(不足5个则返回所有)", min_items=1, max_items=5 ) overall_reasoning: str = Field(..., description="整体选择思路") word_selection_instructions = """ 你是加词组合专家。 ## 任务 从候选词列表中选择5个最合适的词,分别与当前seed组合成新的query。如果候选词不足5个,则返回所有。 ## 选择原则 1. **相关性**:选择与当前seed最相关的词 2. **语义通顺**:组合后的query要符合搜索习惯 3. **扩展范围**:优先选择能扩展搜索范围的词 4. **多样性**:5个词应该覆盖不同的方面(如:时间、地点、类型、用途等) ## 组合约束 1. **只能使用seed和word的原始文本** 2. **不能添加任何连接词**(如"的"、"和"、"与"、"在"等) 3. **不能添加任何额外的词** 4. **组合方式**:seed+word 或 word+seed,或者word插入seed中间,选择更符合搜索习惯的顺序 ## 错误示例 ✗ "川西" + "秋季" → "川西的秋季"(加了"的") ✗ "川西" + "秋季" → "川西秋季风光"(加了额外的"风光") ✗ "摄影" + "技巧" → "摄影拍摄技巧"(加了"拍摄") ## 输出要求 - 最多返回5个组合(如果候选词不足5个,返回所有) - 每个组合包含: * selected_word: 选择的词(必须在候选词列表中) * combined_query: 组合后的新query(只包含seed和word的原始文本) * reasoning: 选择理由(说明为什么选这个词) - overall_reasoning: 整体选择思路(说明这5个词的选择逻辑) ## JSON输出规范 1. **格式要求**:必须输出标准的、完整的JSON格式 2. **字符限制**:不要在JSON中使用任何不可见的特殊字符或控制字符 3. **引号规范**:字符串中如需表达引用或强调,使用书名号《》或单书名号「」,不要使用英文引号或中文引号"" 4. **编码规范**:所有文本使用UTF-8编码,不要包含二进制或转义序列 5. **完整性**:确保JSON的开始和结束括号完整匹配,所有字段都正确闭合 """.strip() word_selector = Agent[None]( name="加词组合专家", instructions=word_selection_instructions, model=get_model(MODEL_NAME), output_type=WordSelectionTop5, ) # ============================================================================ # 辅助函数 # ============================================================================ def calculate_final_score(motivation_score: float, category_score: float) -> float: """ 应用依存性规则计算最终得分 步骤1: 基础加权计算 base_score = motivation_score * 0.7 + category_score * 0.3 步骤2: 极值保护规则 Args: motivation_score: 动机维度得分 -1~1 category_score: 品类维度得分 -1~1 Returns: 最终得分 -1~1 """ # 基础加权得分 base_score = motivation_score * 0.7 + category_score * 0.3 # 规则C: 动机负向决定机制(最高优先级) if motivation_score < 0: return 0.0 # 规则A: 动机高分保护机制 if motivation_score >= 0.8: # 当目的高度一致时,品类的泛化不应导致"弱相关" return max(base_score, 0.7) # 规则B: 动机低分限制机制 if motivation_score <= 0.2: # 目的不符时,品类匹配的价值有限 return min(base_score, 0.5) # 无规则调整,返回基础得分 return base_score def clean_json_string(text: str) -> str: """清理JSON中的非法控制字符(保留 \t \n \r)""" import re # 移除除了 \t(09) \n(0A) \r(0D) 之外的所有控制字符 return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) def process_note_data(note: dict) -> Post: """处理搜索接口返回的帖子数据""" note_card = note.get("note_card", {}) image_list = note_card.get("image_list", []) interact_info = note_card.get("interact_info", {}) user_info = note_card.get("user", {}) # ========== 调试日志 START ========== note_id = note.get("id", "") raw_title = note_card.get("display_title") # 不提供默认值 raw_body = note_card.get("desc") raw_type = note_card.get("type") # 打印原始值类型和内容 print(f"\n[DEBUG] 处理帖子 {note_id}:") print(f" raw_title 类型: {type(raw_title).__name__}, 值: {repr(raw_title)}") print(f" raw_body 类型: {type(raw_body).__name__}, 值: {repr(raw_body)[:100] if raw_body else repr(raw_body)}") print(f" raw_type 类型: {type(raw_type).__name__}, 值: {repr(raw_type)}") # 检查是否为 None if raw_title is None: print(f" ⚠️ WARNING: display_title 是 None!") if raw_body is None: print(f" ⚠️ WARNING: desc 是 None!") if raw_type is None: print(f" ⚠️ WARNING: type 是 None!") # ========== 调试日志 END ========== # 提取图片URL - 使用新的字段名 image_url images = [] for img in image_list: if isinstance(img, dict): # 尝试新字段名 image_url,如果不存在则尝试旧字段名 url_default img_url = img.get("image_url") or img.get("url_default") if img_url: images.append(img_url) # 判断类型 note_type = note_card.get("type", "normal") video_url = "" if note_type == "video": video_info = note_card.get("video", {}) if isinstance(video_info, dict): # 尝试获取视频URL video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "") return Post( note_id=note.get("id") or "", title=note_card.get("display_title") or "", body_text=note_card.get("desc") or "", type=note_type, images=images, video=video_url, interact_info={ "liked_count": interact_info.get("liked_count", 0), "collected_count": interact_info.get("collected_count", 0), "comment_count": interact_info.get("comment_count", 0), "shared_count": interact_info.get("shared_count", 0) }, note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}" ) async def evaluate_with_o(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]: """评估文本与原始问题o的相关度 采用两阶段评估 + 代码计算规则: 1. 动机维度评估(权重70%) 2. 品类维度评估(权重30%) 3. 应用规则A/B/C调整得分 Args: text: 待评估的文本 o: 原始问题 cache: 评估缓存(可选),用于避免重复评估 Returns: tuple[float, str]: (最终相关度分数, 综合评估理由) """ # 检查缓存 if cache is not None and text in cache: cached_score, cached_reason = cache[text] print(f" ⚡ 缓存命中: {text} -> {cached_score:.2f}") return cached_score, cached_reason # 准备输入 eval_input = f""" <原始问题> {o} <平台sug词条> {text} 请评估平台sug词条与原始问题的匹配度。 """ # 添加重试机制 max_retries = 2 last_error = None for attempt in range(max_retries): try: # 并发调用两个评估器 motivation_task = Runner.run(motivation_evaluator, eval_input) category_task = Runner.run(category_evaluator, eval_input) motivation_result, category_result = await asyncio.gather( motivation_task, category_task ) # 获取评估结果 motivation_eval: MotivationEvaluation = motivation_result.final_output category_eval: CategoryEvaluation = category_result.final_output # 提取得分 motivation_score = motivation_eval.动机维度得分 category_score = category_eval.品类维度得分 # 计算基础得分 base_score = motivation_score * 0.7 + category_score * 0.3 # 应用规则计算最终得分 final_score = calculate_final_score(motivation_score, category_score) # 组合评估理由 core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机 motivation_reason = motivation_eval.简要说明动机维度相关度理由 category_reason = category_eval.简要说明品类维度相关度理由 combined_reason = ( f"【核心动机】{core_motivation}\n" f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n" f"【品类维度 {category_score:.2f}】{category_reason}\n" f"【基础得分 {base_score:.2f}】= 动机({motivation_score:.2f})*0.7 + 品类({category_score:.2f})*0.3\n" f"【最终得分 {final_score:.2f}】" ) # 如果应用了规则,添加规则说明 if final_score != base_score: if motivation_score < 0: combined_reason += "(应用规则C:动机负向决定机制)" elif motivation_score >= 0.8: combined_reason += "(应用规则A:动机高分保护机制)" elif motivation_score <= 0.2: combined_reason += "(应用规则B:动机低分限制机制)" # 存入缓存 if cache is not None: cache[text] = (final_score, combined_reason) return final_score, combined_reason except Exception as e: last_error = e error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}") print(f" 正在重试...") await asyncio.sleep(1) # 等待1秒后重试 else: print(f" ❌ 评估失败 (已达最大重试次数): {error_msg[:150]}") # 所有重试失败后,返回默认值 fallback_reason = f"评估失败(重试{max_retries}次): {str(last_error)[:200]}" print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...") return 0.0, fallback_reason # ============================================================================ # 核心流程函数 # ============================================================================ async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]: """ 初始化阶段 Returns: (seg_list, word_list_1, q_list_1, seed_list) """ print(f"\n{'='*60}") print(f"初始化阶段") print(f"{'='*60}") # 1. 分词:原始问题(o) ->分词-> seg_list print(f"\n[步骤1] 分词...") result = await Runner.run(word_segmenter, o) segmentation: WordSegmentation = result.final_output seg_list = [] for word in segmentation.words: seg_list.append(Seg(text=word, from_o=o)) print(f"分词结果: {[s.text for s in seg_list]}") print(f"分词理由: {segmentation.reasoning}") # 2. 分词评估:seg_list -> 每个seg与o进行评分(使用信号量限制并发数) print(f"\n[步骤2] 评估每个分词与原始问题的相关度...") MAX_CONCURRENT_SEG_EVALUATIONS = 5 seg_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SEG_EVALUATIONS) async def evaluate_seg(seg: Seg) -> Seg: async with seg_semaphore: seg.score_with_o, seg.reason = await evaluate_with_o(seg.text, o, context.evaluation_cache) return seg if seg_list: print(f" 开始评估 {len(seg_list)} 个分词(并发限制: {MAX_CONCURRENT_SEG_EVALUATIONS})...") eval_tasks = [evaluate_seg(seg) for seg in seg_list] await asyncio.gather(*eval_tasks) for seg in seg_list: print(f" {seg.text}: {seg.score_with_o:.2f}") # 3. 构建word_list_1: seg_list -> word_list_1(固定词库) print(f"\n[步骤3] 构建word_list_1(固定词库)...") word_list_1 = [] for seg in seg_list: word_list_1.append(Word( text=seg.text, score_with_o=seg.score_with_o, from_o=o )) print(f"word_list_1(固定): {[w.text for w in word_list_1]}") # 4. 构建q_list_1:seg_list 作为 q_list_1 print(f"\n[步骤4] 构建q_list_1...") q_list_1 = [] for seg in seg_list: q_list_1.append(Q( text=seg.text, score_with_o=seg.score_with_o, reason=seg.reason, from_source="seg" )) print(f"q_list_1: {[q.text for q in q_list_1]}") # 5. 构建seed_list: seg_list -> seed_list print(f"\n[步骤5] 构建seed_list...") seed_list = [] for seg in seg_list: seed_list.append(Seed( text=seg.text, added_words=[], from_type="seg", score_with_o=seg.score_with_o )) print(f"seed_list: {[s.text for s in seed_list]}") return seg_list, word_list_1, q_list_1, seed_list async def run_round( round_num: int, q_list: list[Q], word_list_1: list[Word], seed_list: list[Seed], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, sug_threshold: float = 0.7 ) -> tuple[list[Q], list[Seed], list[Search]]: """ 运行一轮 Args: round_num: 轮次编号 q_list: 当前轮的q列表 word_list_1: 固定的词库(第0轮分词结果) seed_list: 当前的seed列表 o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: suggestion的阈值 Returns: (q_list_next, seed_list_next, search_list) """ print(f"\n{'='*60}") print(f"第{round_num}轮") print(f"{'='*60}") round_data = { "round_num": round_num, "input_q_list": [{"text": q.text, "score": q.score_with_o, "type": "query"} for q in q_list], "input_word_list_1_size": len(word_list_1), "input_seed_list_size": len(seed_list) } # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list print(f"\n[步骤1] 为每个q请求建议词...") sug_list_list = [] # list of list for q in q_list: print(f"\n 处理q: {q.text}") suggestions = xiaohongshu_api.get_recommendations(keyword=q.text) q_sug_list = [] if suggestions: print(f" 获取到 {len(suggestions)} 个建议词") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) q_sug_list.append(sug) else: print(f" 未获取到建议词") sug_list_list.append(q_sug_list) # 2. sug评估:sug_list_list -> 每个sug与o进行评分(并发) print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...") # 2.1 收集所有需要评估的sug,并记录它们所属的q all_sugs = [] sug_to_q_map = {} # 记录每个sug属于哪个q for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text for sug in q_sug_list: all_sugs.append(sug) sug_to_q_map[id(sug)] = q_text # 2.2 并发评估所有sug(使用信号量限制并发数) # 每个 evaluate_sug 内部会并发调用 2 个 LLM,所以这里限制为 5,实际并发 LLM 请求为 10 MAX_CONCURRENT_EVALUATIONS = 5 semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS) async def evaluate_sug(sug: Sug) -> Sug: async with semaphore: # 限制并发数 sug.score_with_o, sug.reason = await evaluate_with_o(sug.text, o, context.evaluation_cache) return sug if all_sugs: print(f" 开始评估 {len(all_sugs)} 个建议词(并发限制: {MAX_CONCURRENT_EVALUATIONS})...") eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 2.3 打印结果并组织到sug_details sug_details = {} # 保存每个Q对应的sug列表 for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text print(f"\n 来自q '{q_text}' 的建议词:") sug_details[q_text] = [] for sug in q_sug_list: print(f" {sug.text}: {sug.score_with_o:.2f}") # 保存到sug_details sug_details[q_text].append({ "text": sug.text, "score": sug.score_with_o, "reason": sug.reason, "type": "sug" }) # 3. search_list构建 print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...") search_list = [] high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] if high_score_sugs: print(f" 找到 {len(high_score_sugs)} 个高分建议词") # 并发搜索 async def search_for_sug(sug: Sug) -> Search: print(f" 搜索: {sug.text}") try: search_result = xiaohongshu_search.search(keyword=sug.text) result_str = search_result.get("result", "{}") if isinstance(result_str, str): result_data = json.loads(result_str) else: result_data = result_str notes = result_data.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: # 只取前10个 post = process_note_data(note) post_list.append(post) print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) search_tasks = [search_for_sug(sug) for sug in high_score_sugs] search_list = await asyncio.gather(*search_tasks) else: print(f" 没有高分建议词,search_list为空") # 4. 构建q_list_next print(f"\n[步骤4] 构建q_list_next...") q_list_next = [] existing_q_texts = set() # 用于去重 add_word_details = {} # 保存每个seed对应的组合词列表 all_seed_combinations = [] # 保存本轮所有seed的组合词(用于后续构建seed_list_next) # 4.1 对于seed_list中的每个seed,从word_list_1中选词组合,产生Top 5 print(f"\n 4.1 为每个seed加词(产生Top 5组合)...") for seed in seed_list: print(f"\n 处理seed: {seed.text}") # 从固定词库word_list_1筛选候选词 candidate_words = [] for word in word_list_1: # 检查词是否已在seed中 if word.text in seed.text: continue # 检查词是否已被添加过 if word.text in seed.added_words: continue candidate_words.append(word) if not candidate_words: print(f" 没有可用的候选词") continue print(f" 候选词数量: {len(candidate_words)}") # 调用Agent一次性选择并组合Top 5(添加重试机制) candidate_words_text = ', '.join([w.text for w in candidate_words]) selection_input = f""" <原始问题> {o} <当前Seed> {seed.text} <候选词列表> {candidate_words_text} 请从候选词列表中选择最多5个最合适的词,分别与当前seed组合成新的query。 """ # 重试机制 max_retries = 2 selection_result = None for attempt in range(max_retries): try: result = await Runner.run(word_selector, selection_input) selection_result = result.final_output break # 成功则跳出 except Exception as e: error_msg = str(e) if attempt < max_retries - 1: print(f" ⚠️ 选词失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:100]}") await asyncio.sleep(1) else: print(f" ❌ 选词失败,跳过该seed: {error_msg[:100]}") break if selection_result is None: print(f" 跳过seed: {seed.text}") continue print(f" Agent选择了 {len(selection_result.combinations)} 个组合") print(f" 整体选择思路: {selection_result.overall_reasoning}") # 并发评估所有组合的相关度 async def evaluate_combination(comb: WordCombination) -> dict: score, reason = await evaluate_with_o(comb.combined_query, o, context.evaluation_cache) return { 'word': comb.selected_word, 'query': comb.combined_query, 'score': score, 'reason': reason, 'reasoning': comb.reasoning } eval_tasks = [evaluate_combination(comb) for comb in selection_result.combinations] top_5 = await asyncio.gather(*eval_tasks) print(f" 评估完成,得到 {len(top_5)} 个组合") # 将Top 5全部加入q_list_next(去重检查) for comb in top_5: # 去重检查 if comb['query'] in existing_q_texts: print(f" ⊗ 跳过重复: {comb['query']}") continue print(f" ✓ {comb['query']} (分数: {comb['score']:.2f})") new_q = Q( text=comb['query'], score_with_o=comb['score'], reason=comb['reason'], from_source="add" ) q_list_next.append(new_q) existing_q_texts.add(comb['query']) # 记录到去重集合 # 记录已添加的词 seed.added_words.append(comb['word']) # 保存到add_word_details add_word_details[seed.text] = [ { "text": comb['query'], "score": comb['score'], "reason": comb['reason'], "selected_word": comb['word'], "seed_score": seed.score_with_o, # 添加原始种子的得分 "type": "add" } for comb in top_5 ] # 保存到all_seed_combinations(用于构建seed_list_next) all_seed_combinations.extend(top_5) # 4.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next(去重检查) print(f"\n 4.2 将高分sug加入q_list_next...") for sug in all_sugs: if sug.from_q and sug.score_with_o > sug.from_q.score_with_o: # 去重检查 if sug.text in existing_q_texts: print(f" ⊗ 跳过重复: {sug.text}") continue new_q = Q( text=sug.text, score_with_o=sug.score_with_o, reason=sug.reason, from_source="sug" ) q_list_next.append(new_q) existing_q_texts.add(sug.text) # 记录到去重集合 print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} > {sug.from_q.score_with_o:.2f})") # 5. 构建seed_list_next(关键修改:不保留上一轮的seed) print(f"\n[步骤5] 构建seed_list_next(不保留上轮seed)...") seed_list_next = [] existing_seed_texts = set() # 5.1 加入本轮所有组合词 print(f" 5.1 加入本轮所有组合词...") for comb in all_seed_combinations: if comb['query'] not in existing_seed_texts: new_seed = Seed( text=comb['query'], added_words=[], # 新seed的added_words清空 from_type="add", score_with_o=comb['score'] ) seed_list_next.append(new_seed) existing_seed_texts.add(comb['query']) print(f" ✓ {comb['query']} (分数: {comb['score']:.2f})") # 5.2 加入高分sug print(f" 5.2 加入高分sug...") for sug in all_sugs: # sug分数 > 对应query分数 if sug.from_q and sug.score_with_o > sug.from_q.score_with_o and sug.text not in existing_seed_texts: new_seed = Seed( text=sug.text, added_words=[], from_type="sug", score_with_o=sug.score_with_o ) seed_list_next.append(new_seed) existing_seed_texts.add(sug.text) print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} > 来源query: {sug.from_q.score_with_o:.2f})") # 序列化搜索结果数据(包含帖子详情) search_results_data = [] for search in search_list: search_results_data.append({ "text": search.text, "score_with_o": search.score_with_o, "post_list": [ { "note_id": post.note_id, "note_url": post.note_url, "title": post.title, "body_text": post.body_text, "images": post.images, "interact_info": post.interact_info } for post in search.post_list ] }) # 记录本轮数据 round_data.update({ "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "total_posts": sum(len(s.post_list) for s in search_list), "q_list_next_size": len(q_list_next), "seed_list_next_size": len(seed_list_next), "total_combinations": len(all_seed_combinations), "output_q_list": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "from": q.from_source, "type": "query"} for q in q_list_next], "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next], "sug_details": sug_details, "add_word_details": add_word_details, "search_results": search_results_data }) context.rounds.append(round_data) print(f"\n本轮总结:") print(f" 建议词数量: {len(all_sugs)}") print(f" 高分建议词: {len(high_score_sugs)}") print(f" 搜索数量: {len(search_list)}") print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}") print(f" 组合词数量: {len(all_seed_combinations)}") print(f" 下轮q数量: {len(q_list_next)}") print(f" 下轮seed数量: {len(seed_list_next)}") return q_list_next, seed_list_next, search_list async def iterative_loop( context: RunContext, max_rounds: int = 2, sug_threshold: float = 0.7 ): """主迭代循环""" print(f"\n{'='*60}") print(f"开始迭代循环") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # 初始化 seg_list, word_list_1, q_list, seed_list = await initialize(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() # 保存初始化数据 context.rounds.append({ "round_num": 0, "type": "initialization", "seg_list": [{"text": s.text, "score": s.score_with_o, "reason": s.reason, "type": "seg"} for s in seg_list], "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list_1], "q_list_1": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "type": "query"} for q in q_list], "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o, "type": "seed"} for s in seed_list] }) # 收集所有搜索结果 all_search_list = [] # 迭代 round_num = 1 while q_list and round_num <= max_rounds: q_list, seed_list, search_list = await run_round( round_num=round_num, q_list=q_list, word_list_1=word_list_1, # 传递固定词库 seed_list=seed_list, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, sug_threshold=sug_threshold ) all_search_list.extend(search_list) round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 总轮数: {round_num - 1}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") print(f"{'='*60}") return all_search_list # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False): """主函数""" current_time, log_url = set_trace() # 读取输入 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') c = read_file_as_string(input_context_file) # 原始需求 o = read_file_as_string(input_q_file) # 原始问题 # 版本信息 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) # 创建运行上下文 run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, c=c, o=o, log_dir=log_dir, log_url=log_url, ) # 执行迭代 all_search_list = await iterative_loop( run_context, max_rounds=max_rounds, sug_threshold=sug_threshold ) # 格式化输出 output = f"原始需求:{run_context.c}\n" output += f"原始问题:{run_context.o}\n" output += f"总搜索次数:{len(all_search_list)}\n" output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n" output += "\n" + "="*60 + "\n" if all_search_list: output += "【搜索结果】\n\n" for idx, search in enumerate(all_search_list, 1): output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n" output += f" 帖子数: {len(search.post_list)}\n" if search.post_list: for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个 output += f" {post_idx}) {post.title}\n" output += f" URL: {post.note_url}\n" output += "\n" else: output += "未找到搜索结果\n" run_context.final_output = output print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(output) # 保存日志 os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") context_dict = run_context.model_dump() with open(context_file_path, "w", encoding="utf-8") as f: json.dump(context_dict, f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") # 保存详细的搜索结果 search_results_path = os.path.join(run_context.log_dir, "search_results.json") search_results_data = [s.model_dump() for s in all_search_list] with open(search_results_path, "w", encoding="utf-8") as f: json.dump(search_results_data, f, ensure_ascii=False, indent=2) print(f"Search results saved to: {search_results_path}") # 可视化 if visualize: import subprocess output_html = os.path.join(run_context.log_dir, "visualization.html") print(f"\n🎨 生成可视化HTML...") # 获取绝对路径 abs_context_file = os.path.abspath(context_file_path) abs_output_html = os.path.abspath(output_html) # 运行可视化脚本 result = subprocess.run([ "node", "visualization/sug_v6_1_2_8/index.js", abs_context_file, abs_output_html ]) if result.returncode == 0: print(f"✅ 可视化已生成: {output_html}") else: print(f"❌ 可视化生成失败") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.115 广度遍历版") parser.add_argument( "--input-dir", type=str, default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?", help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?" ) parser.add_argument( "--max-rounds", type=int, default=4, help="最大轮数,默认: 4" ) parser.add_argument( "--sug-threshold", type=float, default=0.7, help="suggestion阈值,默认: 0.7" ) parser.add_argument( "--visualize", action="store_true", default=True, help="运行完成后自动生成可视化HTML" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize))