import asyncio import json import os import sys import argparse from datetime import datetime from typing import Literal from agents import Agent, Runner from lib.my_trace import set_trace from pydantic import BaseModel, Field from lib.utils import read_file_as_string from lib.client import get_model MODEL_NAME = "google/gemini-2.5-flash" from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations from script.search.xiaohongshu_search import XiaohongshuSearch # ============================================================================ # 数据模型 # ============================================================================ class Seg(BaseModel): """分词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_o: str = "" # 原始问题 class Word(BaseModel): """词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_o: str = "" # 原始问题 class QFromQ(BaseModel): """Q来源信息(用于Sug中记录)""" text: str score_with_o: float = 0.0 class Q(BaseModel): """查询""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_source: str = "" # seg/sug/add(加词) class Sug(BaseModel): """建议词""" text: str score_with_o: float = 0.0 # 与原始问题的评分 from_q: QFromQ | None = None # 来自的q class Seed(BaseModel): """种子""" text: str added_words: list[str] = Field(default_factory=list) # 已经增加的words from_type: str = "" # seg/sug score_with_o: float = 0.0 # 与原始问题的评分 class Post(BaseModel): """帖子""" title: str = "" body_text: str = "" type: str = "normal" # video/normal images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面 video: str = "" # 视频url interact_info: dict = Field(default_factory=dict) # 互动信息 note_id: str = "" note_url: str = "" class Search(Sug): """搜索结果(继承Sug)""" post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表 class RunContext(BaseModel): """运行上下文""" version: str input_files: dict[str, str] c: str # 原始需求 o: str # 原始问题 log_url: str log_dir: str # 每轮的数据 rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据 # 最终结果 final_output: str | None = None # ============================================================================ # Agent 定义 # ============================================================================ # Agent 1: 分词专家 class WordSegmentation(BaseModel): """分词结果""" words: list[str] = Field(..., description="分词结果列表") reasoning: str = Field(..., description="分词理由") word_segmentation_instructions = """ 你是分词专家。给定一个query,将其拆分成有意义的最小单元。 ## 分词原则 1. 保留有搜索意义的词汇 2. 拆分成独立的概念 3. 保留专业术语的完整性 4. 去除虚词(的、吗、呢等) ## 输出要求 返回分词列表和分词理由。 """.strip() word_segmenter = Agent[None]( name="分词专家", instructions=word_segmentation_instructions, model=get_model(MODEL_NAME), output_type=WordSegmentation, ) # Agent 2: 相关度评估专家 class RelevanceEvaluation(BaseModel): """相关度评估""" reason: str = Field(..., description="评估理由") relevance_score: float = Field(..., description="相关性分数 -1~1") relevance_evaluation_instructions = """ # 角色定义 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的相关度满足度,给出 **-1 到 1 之间** 的数值评分。 --- # 核心概念与方法论 ## 两大评估维度 本评估系统始终围绕 **两个核心维度** 进行: ### 1. 动机维度(权重70%) **定义:** 用户"想要做什么",即原始问题的行为意图和目的 - 核心是 **动词**:获取、学习、拍摄、制作、寻找等 - 包括:核心动作 + 使用场景 + 最终目的 ### 2. 品类维度(权重30%) **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词 - 核心是 **名词+限定词**:川西秋季风光摄影素材 - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等 --- ## 如何识别原始问题的核心动机 **核心动机必须是动词**,识别方法如下: ### 方法1: 显性动词直接提取 当原始问题明确包含动词时,直接提取 示例: "如何获取素材" → 核心动机 = "获取" "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习") "制作视频教程" → 核心动机 = "制作" ### 方法2: 隐性动词语义推理 当原始问题没有显性动词时,需要结合上下文推理 示例: 例: "川西秋天风光摄影" → 隐含动作="拍摄" → 需结合上下文判断 如果原始问题是纯名词短语,无任何动作线索: → 核心动机 = 无法识别 → 初始权重 = 0 → 相关度评估以品类匹配为主 示例: "摄影" → 无法识别动机,初始权重=0 "川西风光" → 无法识别动机,初始权重=0 # 输入信息 你将接收到以下输入: - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。 - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。 #判定流程 #评估架构 输入: <原始问题> + <平台sug词条> ↓ 【综合相关性判定】 ├→ 步骤1: 评估与<原始问题>的相关度 └→ 输出: -1到1之间的数值 + 分维度得分 + 判定依据 相关度评估维度详解 维度1: 动机维度评估(权重70%) 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性 评分标准: 【正向匹配】 +1.0: 核心动作完全一致 - 例: 原始问题"如何获取素材" vs sug词"素材获取方法" - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致 · 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致) +0.8~0.95: 核心动作语义相近或为同义表达 - 例: 原始问题"如何获取素材" vs sug词"素材下载教程" - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略 +0.5~0.75: 核心动作相关但非直接对应(相关实现路径) - 例: 原始问题"如何获取素材" vs sug词"素材管理整理" +0.2~0.45: 核心动作弱相关(同领域不同动作) - 例: 原始问题"如何拍摄风光" vs sug词"风光摄影欣赏" 【中性/无关】 0: 没有明确目的,动作意图无明确关联 - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐" - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0 【负向偏离】 -0.2~-0.05: 动作意图轻度冲突或误导 - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知" -0.5~-0.25: 动作意图明显对立 - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材" -1.0~-0.55: 动作意图完全相反或产生严重负面引导 - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销" 维度2: 品类维度评估(权重30%) 评估对象: <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度 评分标准: 【正向匹配】 +1.0: 核心主体+所有关键限定词完全匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品" +0.75~0.95: 核心主体匹配,大部分限定词匹配 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季") +0.5~0.7: 核心主体匹配,少量限定词匹配或合理泛化 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影" +0.2~0.45: 仅主体词匹配,限定词全部缺失或错位 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门" +0.05~0.15: 主题领域相关但品类不同 - 例: 原始问题"风光摄影素材" vs sug词"人文摄影素材" 【中性/无关】 0: 主体词部分相关但类别明显不同 - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材" 【负向偏离】 -0.2~-0.05: 主体词或限定词存在误导性 - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库" -0.5~-0.25: 主体词明显错位或品类冲突 - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程" -1.0~-0.55: 完全错误的品类或有害引导 - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载" 综合得分计算与规则调整 步骤1: 应用依存性规则 规则A: 动机高分保护机制 如果 动机维度得分 ≥ 0.8: → 品类得分即使为0或轻微负向(-0.2~0) → 最终得分 = max(初步得分, 0.55) 解释: 当目的高度一致时,品类的泛化不应导致"弱相关" 规则B: 动机低分限制机制 如果 动机维度得分 ≤ 0.2: → 无论品类得分多高 → 最终得分 = min(初步得分, 0.4) 解释: 目的不符时,品类匹配的价值有限 规则C: 动机负向决定机制 如果 动机维度得分 < 0: → 最终得分 = min(初步得分, 0) 解释: 动作意图冲突时,推荐具有误导性,不应为正相关 步骤3: 输出最终得分 #基础加权计算 应用规则后的调整得分 = 目的动机维度得分 × 0.7 + 品类维度得分 × 0.3 取值范围: -1.0 ~ +1.0 --- # 得分档位解释 高度相关】+0.8 ~ +1.0 相关性高度契合,用户可直接使用 动机和品类均高度匹配 典型场景: 动机≥0.85 且 品类≥0.7 【中高相关】+0.6 ~ +0.79 相关性较好,用户基本满意 动机匹配但品类有泛化,或反之 典型场景: 动机≥0.8 且 品类≥0.3 【中度相关】+0.3 ~ +0.59 部分相关,用户需要调整搜索策略 动机或品类存在一定偏差 典型场景: 动机0.4-0.7 且 品类0.3-0.7 【弱相关】+0.01 ~ +0.29 关联微弱,参考价值有限 仅有表层词汇重叠 【无关】0 无明确关联 原始问题无法识别动机 且 sug词无明确动作 没有目的性且没有品类匹配 【轻度负向】-0.29 ~ -0.01 产生轻微误导或干扰 【中度负向】-0.69 ~ -0.3 存在明显冲突或误导 【严重负向】-1.0 ~ -0.7 完全违背意图或产生有害引导 --- # 输出要求 输出结果必须为一个 **JSON 格式**,包含以下内容: #注意事项: 始终围绕两个核心维度:所有评估都基于"动机"和"品类"两个维度,不偏离 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移 负分使用原则:仅当sug词条对原始问题产生误导、冲突或有害引导时给予负分 零分使用原则:当sug词条与原始问题无明确关联,既不相关也不冲突时给予零分 分维度独立评分: 先提取原始问题核心动机 分别计算动机维度(含两个子维度)和品类维度得分 按70:30加权得到初步得分 应用规则调整得到最终得分 动机优先原则:当动机高度一致时,品类的合理泛化或具体化不应导致低评分 技巧类需求特殊对待:包含"技巧/方法/教程"等词的需求,对动作一致性要求更严格 ## 输出 - reason: 详细理由 - relevance_score: 0-1的相关性分数 """.strip() relevance_evaluator = Agent[None]( name="相关度评估专家", instructions=relevance_evaluation_instructions, model=get_model(MODEL_NAME), output_type=RelevanceEvaluation, ) # Agent 3: 加词选择专家 class WordSelection(BaseModel): """加词选择结果""" selected_word: str = Field(..., description="选择的词") combined_query: str = Field(..., description="组合后的新query") reasoning: str = Field(..., description="选择理由") word_selection_instructions = """ 你是加词选择专家。 ## 任务 从候选词列表中选择一个最合适的词,与当前seed组合成新的query。 ## 原则 1. 选择与当前seed最相关的词 2. 组合后的query要语义通顺 3. 符合搜索习惯 4. 优先选择能扩展搜索范围的词 ## 输出 - selected_word: 选中的词 - combined_query: 组合后的新query - reasoning: 选择理由 """.strip() word_selector = Agent[None]( name="加词选择专家", instructions=word_selection_instructions, model=get_model(MODEL_NAME), output_type=WordSelection, ) # ============================================================================ # 辅助函数 # ============================================================================ def process_note_data(note: dict) -> Post: """处理搜索接口返回的帖子数据""" note_card = note.get("note_card", {}) image_list = note_card.get("image_list", []) interact_info = note_card.get("interact_info", {}) user_info = note_card.get("user", {}) # 提取图片URL images = [] for img in image_list: if isinstance(img, dict) and "url_default" in img: images.append(img["url_default"]) # 判断类型 note_type = note_card.get("type", "normal") video_url = "" if note_type == "video": video_info = note_card.get("video", {}) if isinstance(video_info, dict): # 尝试获取视频URL video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "") return Post( note_id=note.get("id", ""), title=note_card.get("display_title", ""), body_text=note_card.get("desc", ""), type=note_type, images=images, video=video_url, interact_info={ "liked_count": interact_info.get("liked_count", 0), "collected_count": interact_info.get("collected_count", 0), "comment_count": interact_info.get("comment_count", 0), "shared_count": interact_info.get("shared_count", 0) }, note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}" ) async def evaluate_with_o(text: str, o: str) -> float: """评估文本与原始问题o的相关度""" eval_input = f""" <原始问题> {o} <当前文本> {text} 请评估当前文本与原始问题的相关度。 """ result = await Runner.run(relevance_evaluator, eval_input) evaluation: RelevanceEvaluation = result.final_output return evaluation.relevance_score # ============================================================================ # 核心流程函数 # ============================================================================ async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]: """ 初始化阶段 Returns: (seg_list, word_list_1, q_list_1, seed_list) """ print(f"\n{'='*60}") print(f"初始化阶段") print(f"{'='*60}") # 1. 分词:原始问题(o) ->分词-> seg_list print(f"\n[步骤1] 分词...") result = await Runner.run(word_segmenter, o) segmentation: WordSegmentation = result.final_output seg_list = [] for word in segmentation.words: seg_list.append(Seg(text=word, from_o=o)) print(f"分词结果: {[s.text for s in seg_list]}") print(f"分词理由: {segmentation.reasoning}") # 2. 分词评估:seg_list -> 每个seg与o进行评分(并发) print(f"\n[步骤2] 评估每个分词与原始问题的相关度...") async def evaluate_seg(seg: Seg) -> Seg: seg.score_with_o = await evaluate_with_o(seg.text, o) return seg if seg_list: eval_tasks = [evaluate_seg(seg) for seg in seg_list] await asyncio.gather(*eval_tasks) for seg in seg_list: print(f" {seg.text}: {seg.score_with_o:.2f}") # 3. 构建word_list_1: seg_list -> word_list_1 print(f"\n[步骤3] 构建word_list_1...") word_list_1 = [] for seg in seg_list: word_list_1.append(Word( text=seg.text, score_with_o=seg.score_with_o, from_o=o )) print(f"word_list_1: {[w.text for w in word_list_1]}") # 4. 构建q_list_1:seg_list 作为 q_list_1 print(f"\n[步骤4] 构建q_list_1...") q_list_1 = [] for seg in seg_list: q_list_1.append(Q( text=seg.text, score_with_o=seg.score_with_o, from_source="seg" )) print(f"q_list_1: {[q.text for q in q_list_1]}") # 5. 构建seed_list: seg_list -> seed_list print(f"\n[步骤5] 构建seed_list...") seed_list = [] for seg in seg_list: seed_list.append(Seed( text=seg.text, added_words=[], from_type="seg", score_with_o=seg.score_with_o )) print(f"seed_list: {[s.text for s in seed_list]}") return seg_list, word_list_1, q_list_1, seed_list async def run_round( round_num: int, q_list: list[Q], word_list: list[Word], seed_list: list[Seed], o: str, context: RunContext, xiaohongshu_api: XiaohongshuSearchRecommendations, xiaohongshu_search: XiaohongshuSearch, sug_threshold: float = 0.7 ) -> tuple[list[Word], list[Q], list[Seed], list[Search]]: """ 运行一轮 Args: round_num: 轮次编号 q_list: 当前轮的q列表 word_list: 当前的word列表 seed_list: 当前的seed列表 o: 原始问题 context: 运行上下文 xiaohongshu_api: 建议词API xiaohongshu_search: 搜索API sug_threshold: suggestion的阈值 Returns: (word_list_next, q_list_next, seed_list_next, search_list) """ print(f"\n{'='*60}") print(f"第{round_num}轮") print(f"{'='*60}") round_data = { "round_num": round_num, "input_q_list": [{"text": q.text, "score": q.score_with_o} for q in q_list], "input_word_list_size": len(word_list), "input_seed_list_size": len(seed_list) } # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list print(f"\n[步骤1] 为每个q请求建议词...") sug_list_list = [] # list of list for q in q_list: print(f"\n 处理q: {q.text}") suggestions = xiaohongshu_api.get_recommendations(keyword=q.text) q_sug_list = [] if suggestions: print(f" 获取到 {len(suggestions)} 个建议词") for sug_text in suggestions: sug = Sug( text=sug_text, from_q=QFromQ(text=q.text, score_with_o=q.score_with_o) ) q_sug_list.append(sug) else: print(f" 未获取到建议词") sug_list_list.append(q_sug_list) # 2. sug评估:sug_list_list -> 每个sug与o进评分(并发) print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...") # 2.1 收集所有需要评估的sug,并记录它们所属的q all_sugs = [] sug_to_q_map = {} # 记录每个sug属于哪个q for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text for sug in q_sug_list: all_sugs.append(sug) sug_to_q_map[id(sug)] = q_text # 2.2 并发评估所有sug async def evaluate_sug(sug: Sug) -> Sug: sug.score_with_o = await evaluate_with_o(sug.text, o) return sug if all_sugs: eval_tasks = [evaluate_sug(sug) for sug in all_sugs] await asyncio.gather(*eval_tasks) # 2.3 打印结果并组织到sug_details sug_details = {} # 保存每个Q对应的sug列表 for i, q_sug_list in enumerate(sug_list_list): if q_sug_list: q_text = q_list[i].text print(f"\n 来自q '{q_text}' 的建议词:") sug_details[q_text] = [] for sug in q_sug_list: print(f" {sug.text}: {sug.score_with_o:.2f}") # 保存到sug_details sug_details[q_text].append({ "text": sug.text, "score": sug.score_with_o }) # 3. search_list构建 print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...") search_list = [] high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold] if high_score_sugs: print(f" 找到 {len(high_score_sugs)} 个高分建议词") # 并发搜索 async def search_for_sug(sug: Sug) -> Search: print(f" 搜索: {sug.text}") try: search_result = xiaohongshu_search.search(keyword=sug.text) result_str = search_result.get("result", "{}") if isinstance(result_str, str): result_data = json.loads(result_str) else: result_data = result_str notes = result_data.get("data", {}).get("data", []) post_list = [] for note in notes[:10]: # 只取前10个 post = process_note_data(note) post_list.append(post) print(f" → 找到 {len(post_list)} 个帖子") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=post_list ) except Exception as e: print(f" ✗ 搜索失败: {e}") return Search( text=sug.text, score_with_o=sug.score_with_o, from_q=sug.from_q, post_list=[] ) search_tasks = [search_for_sug(sug) for sug in high_score_sugs] search_list = await asyncio.gather(*search_tasks) else: print(f" 没有高分建议词,search_list为空") # 4. 构建word_list_next: word_list -> word_list_next(先直接复制) print(f"\n[步骤4] 构建word_list_next(暂时直接复制)...") word_list_next = word_list.copy() # 5. 构建q_list_next print(f"\n[步骤5] 构建q_list_next...") q_list_next = [] add_word_details = {} # 保存每个seed对应的组合词列表 # 5.1 对于seed_list中的每个seed,从word_list_next中选一个未加过的词 print(f"\n 5.1 为每个seed加词...") for seed in seed_list: print(f"\n 处理seed: {seed.text}") # 简单过滤:找出不在seed.text中且未被添加过的词 candidate_words = [] for word in word_list_next: # 检查词是否已在seed中 if word.text in seed.text: continue # 检查词是否已被添加过 if word.text in seed.added_words: continue candidate_words.append(word) if not candidate_words: print(f" 没有可用的候选词") continue print(f" 候选词: {[w.text for w in candidate_words]}") # 使用Agent选择最合适的词 selection_input = f""" <原始问题> {o} <当前Seed> {seed.text} <候选词列表> {', '.join([w.text for w in candidate_words])} 请从候选词中选择一个最合适的词,与当前seed组合成新的query。 """ result = await Runner.run(word_selector, selection_input) selection: WordSelection = result.final_output # 验证选择的词是否在候选列表中 if selection.selected_word not in [w.text for w in candidate_words]: print(f" ✗ Agent选择的词 '{selection.selected_word}' 不在候选列表中,跳过") continue print(f" ✓ 选择词: {selection.selected_word}") print(f" ✓ 新query: {selection.combined_query}") print(f" 理由: {selection.reasoning}") # 评估新query new_q_score = await evaluate_with_o(selection.combined_query, o) print(f" 新query评分: {new_q_score:.2f}") # 创建新的q new_q = Q( text=selection.combined_query, score_with_o=new_q_score, from_source="add" ) q_list_next.append(new_q) # 更新seed的added_words seed.added_words.append(selection.selected_word) # 保存到add_word_details if seed.text not in add_word_details: add_word_details[seed.text] = [] add_word_details[seed.text].append({ "text": selection.combined_query, "score": new_q_score, "selected_word": selection.selected_word }) # 5.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next print(f"\n 5.2 将高分sug加入q_list_next...") for sug in all_sugs: if sug.from_q and sug.score_with_o > sug.from_q.score_with_o: new_q = Q( text=sug.text, score_with_o=sug.score_with_o, from_source="sug" ) q_list_next.append(new_q) print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} > {sug.from_q.score_with_o:.2f})") # 6. 更新seed_list print(f"\n[步骤6] 更新seed_list...") seed_list_next = seed_list.copy() # 保留原有的seed # 对于sug_list_list中,每个sug分数大于来源query分数的,且没在seed_list中出现过的,加入 existing_seed_texts = {seed.text for seed in seed_list_next} for sug in all_sugs: # 新逻辑:sug分数 > 对应query分数 if sug.from_q and sug.score_with_o > sug.from_q.score_with_o and sug.text not in existing_seed_texts: new_seed = Seed( text=sug.text, added_words=[], from_type="sug", score_with_o=sug.score_with_o ) seed_list_next.append(new_seed) existing_seed_texts.add(sug.text) print(f" ✓ 新seed: {sug.text} (分数: {sug.score_with_o:.2f} > 来源query: {sug.from_q.score_with_o:.2f})") # 记录本轮数据 round_data.update({ "sug_count": len(all_sugs), "high_score_sug_count": len(high_score_sugs), "search_count": len(search_list), "total_posts": sum(len(s.post_list) for s in search_list), "q_list_next_size": len(q_list_next), "seed_list_next_size": len(seed_list_next), "word_list_next_size": len(word_list_next), "output_q_list": [{"text": q.text, "score": q.score_with_o, "from": q.from_source} for q in q_list_next], "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next], # 下一轮种子列表 "sug_details": sug_details, # 每个Q对应的sug列表 "add_word_details": add_word_details # 每个seed对应的组合词列表 }) context.rounds.append(round_data) print(f"\n本轮总结:") print(f" 建议词数量: {len(all_sugs)}") print(f" 高分建议词: {len(high_score_sugs)}") print(f" 搜索数量: {len(search_list)}") print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}") print(f" 下轮q数量: {len(q_list_next)}") print(f" seed数量: {len(seed_list_next)}") return word_list_next, q_list_next, seed_list_next, search_list async def iterative_loop( context: RunContext, max_rounds: int = 2, sug_threshold: float = 0.7 ): """主迭代循环""" print(f"\n{'='*60}") print(f"开始迭代循环") print(f"最大轮数: {max_rounds}") print(f"sug阈值: {sug_threshold}") print(f"{'='*60}") # 初始化 seg_list, word_list, q_list, seed_list = await initialize(context.o, context) # API实例 xiaohongshu_api = XiaohongshuSearchRecommendations() xiaohongshu_search = XiaohongshuSearch() # 保存初始化数据 context.rounds.append({ "round_num": 0, "type": "initialization", "seg_list": [{"text": s.text, "score": s.score_with_o} for s in seg_list], "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list], "q_list_1": [{"text": q.text, "score": q.score_with_o} for q in q_list], "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o} for s in seed_list] }) # 收集所有搜索结果 all_search_list = [] # 迭代 round_num = 1 while q_list and round_num <= max_rounds: word_list, q_list, seed_list, search_list = await run_round( round_num=round_num, q_list=q_list, word_list=word_list, seed_list=seed_list, o=context.o, context=context, xiaohongshu_api=xiaohongshu_api, xiaohongshu_search=xiaohongshu_search, sug_threshold=sug_threshold ) all_search_list.extend(search_list) round_num += 1 print(f"\n{'='*60}") print(f"迭代完成") print(f" 总轮数: {round_num - 1}") print(f" 总搜索次数: {len(all_search_list)}") print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}") print(f"{'='*60}") return all_search_list # ============================================================================ # 主函数 # ============================================================================ async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False): """主函数""" current_time, log_url = set_trace() # 读取输入 input_context_file = os.path.join(input_dir, 'context.md') input_q_file = os.path.join(input_dir, 'q.md') c = read_file_as_string(input_context_file) # 原始需求 o = read_file_as_string(input_q_file) # 原始问题 # 版本信息 version = os.path.basename(__file__) version_name = os.path.splitext(version)[0] # 日志目录 log_dir = os.path.join(input_dir, "output", version_name, current_time) # 创建运行上下文 run_context = RunContext( version=version, input_files={ "input_dir": input_dir, "context_file": input_context_file, "q_file": input_q_file, }, c=c, o=o, log_dir=log_dir, log_url=log_url, ) # 执行迭代 all_search_list = await iterative_loop( run_context, max_rounds=max_rounds, sug_threshold=sug_threshold ) # 格式化输出 output = f"原始需求:{run_context.c}\n" output += f"原始问题:{run_context.o}\n" output += f"总搜索次数:{len(all_search_list)}\n" output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n" output += "\n" + "="*60 + "\n" if all_search_list: output += "【搜索结果】\n\n" for idx, search in enumerate(all_search_list, 1): output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n" output += f" 帖子数: {len(search.post_list)}\n" if search.post_list: for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个 output += f" {post_idx}) {post.title}\n" output += f" URL: {post.note_url}\n" output += "\n" else: output += "未找到搜索结果\n" run_context.final_output = output print(f"\n{'='*60}") print("最终结果") print(f"{'='*60}") print(output) # 保存日志 os.makedirs(run_context.log_dir, exist_ok=True) context_file_path = os.path.join(run_context.log_dir, "run_context.json") context_dict = run_context.model_dump() with open(context_file_path, "w", encoding="utf-8") as f: json.dump(context_dict, f, ensure_ascii=False, indent=2) print(f"\nRunContext saved to: {context_file_path}") # 保存详细的搜索结果 search_results_path = os.path.join(run_context.log_dir, "search_results.json") search_results_data = [s.model_dump() for s in all_search_list] with open(search_results_path, "w", encoding="utf-8") as f: json.dump(search_results_data, f, ensure_ascii=False, indent=2) print(f"Search results saved to: {search_results_path}") # 可视化 if visualize: import subprocess output_html = os.path.join(run_context.log_dir, "visualization.html") print(f"\n🎨 生成可视化HTML...") # 获取绝对路径 abs_context_file = os.path.abspath(context_file_path) abs_output_html = os.path.abspath(output_html) # 运行可视化脚本 result = subprocess.run([ "node", "visualization/sug_v6_1_2_8/index.js", abs_context_file, abs_output_html ]) if result.returncode == 0: print(f"✅ 可视化已生成: {output_html}") else: print(f"❌ 可视化生成失败") if __name__ == "__main__": parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.8 轮次迭代版") parser.add_argument( "--input-dir", type=str, default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?", help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?" ) parser.add_argument( "--max-rounds", type=int, default=3, help="最大轮数,默认: 2" ) parser.add_argument( "--sug-threshold", type=float, default=0.7, help="suggestion阈值,默认: 0.7" ) parser.add_argument( "--visualize", action="store_true", default=True, help="运行完成后自动生成可视化HTML" ) args = parser.parse_args() asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize))