| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193 |
- import asyncio
- import json
- import os
- import sys
- import argparse
- from datetime import datetime
- from typing import Literal
- from agents import Agent, Runner
- from lib.my_trace import set_trace
- from pydantic import BaseModel, Field
- from lib.utils import read_file_as_string
- from lib.client import get_model
- MODEL_NAME = "google/gemini-2.5-flash"
- from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
- from script.search.xiaohongshu_search import XiaohongshuSearch
- # ============================================================================
- # 数据模型
- # ============================================================================
- class Seg(BaseModel):
- """分词"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_o: str = "" # 原始问题
- class Word(BaseModel):
- """词"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- from_o: str = "" # 原始问题
- class QFromQ(BaseModel):
- """Q来源信息(用于Sug中记录)"""
- text: str
- score_with_o: float = 0.0
- class Q(BaseModel):
- """查询"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_source: str = "" # seg/sug/add(加词)
- class Sug(BaseModel):
- """建议词"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_q: QFromQ | None = None # 来自的q
- class Seed(BaseModel):
- """种子"""
- text: str
- added_words: list[str] = Field(default_factory=list) # 已经增加的words
- from_type: str = "" # seg/sug/add
- score_with_o: float = 0.0 # 与原始问题的评分
- class Post(BaseModel):
- """帖子"""
- title: str = ""
- body_text: str = ""
- type: str = "normal" # video/normal
- images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面
- video: str = "" # 视频url
- interact_info: dict = Field(default_factory=dict) # 互动信息
- note_id: str = ""
- note_url: str = ""
- class Search(Sug):
- """搜索结果(继承Sug)"""
- post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表
- class RunContext(BaseModel):
- """运行上下文"""
- version: str
- input_files: dict[str, str]
- c: str # 原始需求
- o: str # 原始问题
- log_url: str
- log_dir: str
- # 每轮的数据
- rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据
- # 最终结果
- final_output: str | None = None
- # ============================================================================
- # Agent 定义
- # ============================================================================
- # Agent 1: 分词专家
- class WordSegmentation(BaseModel):
- """分词结果"""
- words: list[str] = Field(..., description="分词结果列表")
- reasoning: str = Field(..., description="分词理由")
- word_segmentation_instructions = """
- 你是分词专家。给定一个query,将其拆分成有意义的最小单元。
- ## 分词原则
- 1. 保留有搜索意义的词汇
- 2. 拆分成独立的概念
- 3. 保留专业术语的完整性
- 4. 去除虚词(的、吗、呢等)
- ## 输出要求
- 返回分词列表和分词理由。
- """.strip()
- word_segmenter = Agent[None](
- name="分词专家",
- instructions=word_segmentation_instructions,
- model=get_model(MODEL_NAME),
- output_type=WordSegmentation,
- )
- # Agent 2: 相关度评估专家
- class RelevanceEvaluation(BaseModel):
- """相关度评估"""
- reason: str = Field(..., description="评估理由")
- relevance_score: float = Field(..., description="相关性分数 -1~1")
- relevance_evaluation_instructions = """
- # 角色定义
- 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的相关度满足度,给出 **-1 到 1 之间** 的数值评分。
- ---
- # 核心概念与方法论
- ## 两大评估维度
- 本评估系统始终围绕 **两个核心维度** 进行:
- ### 1. 动机维度(权重70%)
- **定义:** 用户"想要做什么",即原始问题的行为意图和目的
- - 核心是 **动词**:获取、学习、拍摄、制作、寻找等
- - 包括:核心动作 + 使用场景 + 最终目的
- ### 2. 品类维度(权重30%)
- **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词
- - 核心是 **名词+限定词**:川西秋季风光摄影素材
- - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等
- ---
- ## 如何识别原始问题的核心动机
- **核心动机必须是动词**,识别方法如下:
- ### 方法1: 显性动词直接提取
- 当原始问题明确包含动词时,直接提取
- 示例:
- "如何获取素材" → 核心动机 = "获取"
- "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习")
- "制作视频教程" → 核心动机 = "制作"
- ### 方法2: 隐性动词语义推理
- 当原始问题没有显性动词时,需要结合上下文推理
- 示例:
- 例: "川西秋天风光摄影" → 隐含动作="拍摄"
- → 需结合上下文判断
- 如果原始问题是纯名词短语,无任何动作线索:
- → 核心动机 = 无法识别
- → 初始权重 = 0
- → 相关度评估以品类匹配为主
- 示例:
- "摄影" → 无法识别动机,初始权重=0
- "川西风光" → 无法识别动机,初始权重=0
- # 输入信息
- 你将接收到以下输入:
- - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。
- - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。
- #判定流程
- #评估架构
- 输入: <原始问题> + <平台sug词条>
- ↓
- 【综合相关性判定】
- ├→ 步骤1: 评估<sug词条>与<原始问题>的相关度
- └→ 输出: -1到1之间的数值 + 分维度得分 + 判定依据
- 相关度评估维度详解
- 维度1: 动机维度评估(权重70%)
- 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度
- 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性
- 评分标准:
- 【正向匹配】
- +1.0: 核心动作完全一致
- - 例: 原始问题"如何获取素材" vs sug词"素材获取方法"
- - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致
- · 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致)
- +0.8~0.95: 核心动作语义相近或为同义表达
- - 例: 原始问题"如何获取素材" vs sug词"素材下载教程"
- - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略
- +0.5~0.75: 核心动作相关但非直接对应(相关实现路径)
- - 例: 原始问题"如何获取素材" vs sug词"素材管理整理"
- +0.2~0.45: 核心动作弱相关(同领域不同动作)
- - 例: 原始问题"如何拍摄风光" vs sug词"风光摄影欣赏"
- 【中性/无关】
- 0: 没有明确目的,动作意图无明确关联
- - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐"
- - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0
- 【负向偏离】
- -0.2~-0.05: 动作意图轻度冲突或误导
- - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知"
- -0.5~-0.25: 动作意图明显对立
- - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材"
- -1.0~-0.55: 动作意图完全相反或产生严重负面引导
- - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销"
- 维度2: 品类维度评估(权重30%)
- 评估对象: <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度
- 评分标准:
- 【正向匹配】
- +1.0: 核心主体+所有关键限定词完全匹配
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品"
- +0.75~0.95: 核心主体匹配,大部分限定词匹配
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季")
- +0.5~0.7: 核心主体匹配,少量限定词匹配或合理泛化
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影"
- +0.2~0.45: 仅主体词匹配,限定词全部缺失或错位
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门"
- +0.05~0.15: 主题领域相关但品类不同
- - 例: 原始问题"风光摄影素材" vs sug词"人文摄影素材"
- 【中性/无关】
- 0: 主体词部分相关但类别明显不同
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材"
- 【负向偏离】
- -0.2~-0.05: 主体词或限定词存在误导性
- - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库"
- -0.5~-0.25: 主体词明显错位或品类冲突
- - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程"
- -1.0~-0.55: 完全错误的品类或有害引导
- - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载"
- 综合得分计算与规则调整
- 步骤1: 应用依存性规则
- 规则A: 动机高分保护机制
- 如果 动机维度得分 ≥ 0.8:
- → 品类得分即使为0或轻微负向(-0.2~0)
- → 最终得分 = max(初步得分, 0.55)
- 解释: 当目的高度一致时,品类的泛化不应导致"弱相关"
- 规则B: 动机低分限制机制
- 如果 动机维度得分 ≤ 0.2:
- → 无论品类得分多高
- → 最终得分 = min(初步得分, 0.4)
- 解释: 目的不符时,品类匹配的价值有限
- 规则C: 动机负向决定机制
- 如果 动机维度得分 < 0:
- → 最终得分 = min(初步得分, 0)
- 解释: 动作意图冲突时,推荐具有误导性,不应为正相关
- 步骤3: 输出最终得分
- #基础加权计算
- 应用规则后的调整得分 = 目的动机维度得分 × 0.7 + 品类维度得分 × 0.3
- 取值范围: -1.0 ~ +1.0
- ---
- # 得分档位解释
- 高度相关】+0.8 ~ +1.0
- 相关性高度契合,用户可直接使用
- 动机和品类均高度匹配
- 典型场景: 动机≥0.85 且 品类≥0.7
- 【中高相关】+0.6 ~ +0.79
- 相关性较好,用户基本满意
- 动机匹配但品类有泛化,或反之
- 典型场景: 动机≥0.8 且 品类≥0.3
- 【中度相关】+0.3 ~ +0.59
- 部分相关,用户需要调整搜索策略
- 动机或品类存在一定偏差
- 典型场景: 动机0.4-0.7 且 品类0.3-0.7
- 【弱相关】+0.01 ~ +0.29
- 关联微弱,参考价值有限
- 仅有表层词汇重叠
- 【无关】0
- 无明确关联
- 原始问题无法识别动机 且 sug词无明确动作
- 没有目的性且没有品类匹配
- 【轻度负向】-0.29 ~ -0.01
- 产生轻微误导或干扰
- 【中度负向】-0.69 ~ -0.3
- 存在明显冲突或误导
- 【严重负向】-1.0 ~ -0.7
- 完全违背意图或产生有害引导
- ---
- # 输出要求
- 输出结果必须为一个 **JSON 格式**,包含以下内容:
- #注意事项:
- 始终围绕两个核心维度:所有评估都基于"动机"和"品类"两个维度,不偏离
- 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础
- 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移
- 负分使用原则:仅当sug词条对原始问题产生误导、冲突或有害引导时给予负分
- 零分使用原则:当sug词条与原始问题无明确关联,既不相关也不冲突时给予零分
- 分维度独立评分:
- 先提取原始问题核心动机
- 分别计算动机维度(含两个子维度)和品类维度得分
- 按70:30加权得到初步得分
- 应用规则调整得到最终得分
- 动机优先原则:当动机高度一致时,品类的合理泛化或具体化不应导致低评分
- 技巧类需求特殊对待:包含"技巧/方法/教程"等词的需求,对动作一致性要求更严格
- ## 输出格式(严格遵守)
- 必须输出标准 JSON 格式,包含以下两个字段:
- ```json
- {
- "reason": "详细的评估理由说明",
- "relevance_score": 0.85
- }
- ```
- ### 字段说明
- - **reason** (string): 详细的评估理由,说明动机维度和品类维度的匹配情况
- - **relevance_score** (number): -1.0 到 1.0 之间的数值,保留2位小数
- ### 重要约束
- 1. **reason 字段内容规则**:
- - 禁止使用英文双引号 `"`,必须使用替代符号
- - 推荐使用中文书名号:《》或【】
- - 或使用中文引号:「」或『』
- - 示例:使用"核心动机是《制作攻略图》"而不是"核心动机是"制作攻略图""
- 2. **JSON 格式要求**:
- - 必须是合法的 JSON 格式
- - relevance_score 必须是数字类型,不能是字符串
- - 字段名必须用双引号包裹
- ### 输出示例
- ✅ **正确示例**:
- ```json
- {
- "reason": "原始问题的核心动机是《制作攻略图》,包含【拼接】图片等操作。sug词条【拼接】与原始问题中的具体操作完全一致,动机维度匹配度为中等。品类维度相关性较弱。",
- "relevance_score": 0.22
- }
- ```
- ❌ **错误示例**(会导致解析失败):
- ```json
- {
- "reason": "原始问题的核心动机是"制作攻略图",包含"拼接"图片",
- "relevance_score": "0.22"
- }
- ```
- 错误原因:reason 中使用了未转义的英文双引号,relevance_score 是字符串而非数字
- ## 关键要求(务必遵守)
- **无论reason多长,必须确保输出完整的JSON格式:**
- 1. reason字段必须有结束的双引号 `"`
- 2. 必须包含 `relevance_score` 字段
- 3. JSON必须有完整的闭合大括号 `}`
- 4. 不能中途截断输出
- **示例:即使reason很长也要完整输出**
- ```json
- {
- "reason": "这里可以写很长的详细推理过程,包括动机维度的详细分析、品类维度的详细分析、规则应用过程等等...",
- "relevance_score": 0.45
- }
- ```
- """.strip()
- relevance_evaluator = Agent[None](
- name="相关度评估专家",
- instructions=relevance_evaluation_instructions,
- model=get_model(MODEL_NAME),
- output_type=RelevanceEvaluation,
- )
- # Agent 3: 加词选择专家
- class WordCombination(BaseModel):
- """单个词组合"""
- selected_word: str = Field(..., description="选择的词")
- combined_query: str = Field(..., description="组合后的新query")
- reasoning: str = Field(..., description="选择理由")
- class WordSelectionTop5(BaseModel):
- """加词选择结果(Top 5)"""
- combinations: list[WordCombination] = Field(
- ...,
- description="选择的Top 5组合(不足5个则返回所有)",
- min_items=1,
- max_items=5
- )
- overall_reasoning: str = Field(..., description="整体选择思路")
- word_selection_instructions = """
- 你是加词组合专家。
- ## 任务
- 从候选词列表中选择5个最合适的词,分别与当前seed组合成新的query。如果候选词不足5个,则返回所有。
- ## 选择原则
- 1. **相关性**:选择与当前seed最相关的词
- 2. **语义通顺**:组合后的query要符合搜索习惯
- 3. **扩展范围**:优先选择能扩展搜索范围的词
- 4. **多样性**:5个词应该覆盖不同的方面(如:时间、地点、类型、用途等)
- ## 组合约束
- 1. **只能使用seed和word的原始文本**
- 2. **不能添加任何连接词**(如"的"、"和"、"与"、"在"等)
- 3. **不能添加任何额外的词**
- 4. **组合方式**:seed+word 或 word+seed,选择更符合搜索习惯的顺序
- 5. **简单拼接**:直接将两个词拼接,不做任何修饰
- ## 错误示例
- ✗ "川西" + "秋季" → "川西的秋季"(加了"的")
- ✗ "川西" + "秋季" → "川西秋季风光"(加了额外的"风光")
- ✗ "摄影" + "技巧" → "摄影拍摄技巧"(加了"拍摄")
- ## 输出要求
- - 最多返回5个组合(如果候选词不足5个,返回所有)
- - 每个组合包含:
- * selected_word: 选择的词(必须在候选词列表中)
- * combined_query: 组合后的新query(只包含seed和word的原始文本)
- * reasoning: 选择理由(说明为什么选这个词)
- - overall_reasoning: 整体选择思路(说明这5个词的选择逻辑)
- """.strip()
- word_selector = Agent[None](
- name="加词组合专家",
- instructions=word_selection_instructions,
- model=get_model(MODEL_NAME),
- output_type=WordSelectionTop5,
- )
- # ============================================================================
- # 辅助函数
- # ============================================================================
- def process_note_data(note: dict) -> Post:
- """处理搜索接口返回的帖子数据"""
- note_card = note.get("note_card", {})
- image_list = note_card.get("image_list", [])
- interact_info = note_card.get("interact_info", {})
- user_info = note_card.get("user", {})
- # ========== 调试日志 START ==========
- note_id = note.get("id", "")
- raw_title = note_card.get("display_title") # 不提供默认值
- raw_body = note_card.get("desc")
- raw_type = note_card.get("type")
- # 打印原始值类型和内容
- print(f"\n[DEBUG] 处理帖子 {note_id}:")
- print(f" raw_title 类型: {type(raw_title).__name__}, 值: {repr(raw_title)}")
- print(f" raw_body 类型: {type(raw_body).__name__}, 值: {repr(raw_body)[:100] if raw_body else repr(raw_body)}")
- print(f" raw_type 类型: {type(raw_type).__name__}, 值: {repr(raw_type)}")
- # 检查是否为 None
- if raw_title is None:
- print(f" ⚠️ WARNING: display_title 是 None!")
- if raw_body is None:
- print(f" ⚠️ WARNING: desc 是 None!")
- if raw_type is None:
- print(f" ⚠️ WARNING: type 是 None!")
- # ========== 调试日志 END ==========
- # 提取图片URL - 使用新的字段名 image_url
- images = []
- for img in image_list:
- if isinstance(img, dict):
- # 尝试新字段名 image_url,如果不存在则尝试旧字段名 url_default
- img_url = img.get("image_url") or img.get("url_default")
- if img_url:
- images.append(img_url)
- # 判断类型
- note_type = note_card.get("type", "normal")
- video_url = ""
- if note_type == "video":
- video_info = note_card.get("video", {})
- if isinstance(video_info, dict):
- # 尝试获取视频URL
- video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "")
- return Post(
- note_id=note.get("id") or "",
- title=note_card.get("display_title") or "",
- body_text=note_card.get("desc") or "",
- type=note_type,
- images=images,
- video=video_url,
- interact_info={
- "liked_count": interact_info.get("liked_count", 0),
- "collected_count": interact_info.get("collected_count", 0),
- "comment_count": interact_info.get("comment_count", 0),
- "shared_count": interact_info.get("shared_count", 0)
- },
- note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}"
- )
- async def evaluate_with_o(text: str, o: str) -> tuple[float, str]:
- """评估文本与原始问题o的相关度
- Returns:
- tuple[float, str]: (相关度分数, 评估理由)
- """
- eval_input = f"""
- <原始问题>
- {o}
- </原始问题>
- <当前文本>
- {text}
- </当前文本>
- 请评估当前文本与原始问题的相关度。
- """
- result = await Runner.run(relevance_evaluator, eval_input)
- evaluation: RelevanceEvaluation = result.final_output
- return evaluation.relevance_score, evaluation.reason
- # ============================================================================
- # 核心流程函数
- # ============================================================================
- async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]:
- """
- 初始化阶段
- Returns:
- (seg_list, word_list_1, q_list_1, seed_list)
- """
- print(f"\n{'='*60}")
- print(f"初始化阶段")
- print(f"{'='*60}")
- # 1. 分词:原始问题(o) ->分词-> seg_list
- print(f"\n[步骤1] 分词...")
- result = await Runner.run(word_segmenter, o)
- segmentation: WordSegmentation = result.final_output
- seg_list = []
- for word in segmentation.words:
- seg_list.append(Seg(text=word, from_o=o))
- print(f"分词结果: {[s.text for s in seg_list]}")
- print(f"分词理由: {segmentation.reasoning}")
- # 2. 分词评估:seg_list -> 每个seg与o进行评分(并发)
- print(f"\n[步骤2] 评估每个分词与原始问题的相关度...")
- async def evaluate_seg(seg: Seg) -> Seg:
- seg.score_with_o, seg.reason = await evaluate_with_o(seg.text, o)
- return seg
- if seg_list:
- eval_tasks = [evaluate_seg(seg) for seg in seg_list]
- await asyncio.gather(*eval_tasks)
- for seg in seg_list:
- print(f" {seg.text}: {seg.score_with_o:.2f}")
- # 3. 构建word_list_1: seg_list -> word_list_1(固定词库)
- print(f"\n[步骤3] 构建word_list_1(固定词库)...")
- word_list_1 = []
- for seg in seg_list:
- word_list_1.append(Word(
- text=seg.text,
- score_with_o=seg.score_with_o,
- from_o=o
- ))
- print(f"word_list_1(固定): {[w.text for w in word_list_1]}")
- # 4. 构建q_list_1:seg_list 作为 q_list_1
- print(f"\n[步骤4] 构建q_list_1...")
- q_list_1 = []
- for seg in seg_list:
- q_list_1.append(Q(
- text=seg.text,
- score_with_o=seg.score_with_o,
- reason=seg.reason,
- from_source="seg"
- ))
- print(f"q_list_1: {[q.text for q in q_list_1]}")
- # 5. 构建seed_list: seg_list -> seed_list
- print(f"\n[步骤5] 构建seed_list...")
- seed_list = []
- for seg in seg_list:
- seed_list.append(Seed(
- text=seg.text,
- added_words=[],
- from_type="seg",
- score_with_o=seg.score_with_o
- ))
- print(f"seed_list: {[s.text for s in seed_list]}")
- return seg_list, word_list_1, q_list_1, seed_list
- async def run_round(
- round_num: int,
- q_list: list[Q],
- word_list_1: list[Word],
- seed_list: list[Seed],
- o: str,
- context: RunContext,
- xiaohongshu_api: XiaohongshuSearchRecommendations,
- xiaohongshu_search: XiaohongshuSearch,
- sug_threshold: float = 0.7
- ) -> tuple[list[Q], list[Seed], list[Search]]:
- """
- 运行一轮
- Args:
- round_num: 轮次编号
- q_list: 当前轮的q列表
- word_list_1: 固定的词库(第0轮分词结果)
- seed_list: 当前的seed列表
- o: 原始问题
- context: 运行上下文
- xiaohongshu_api: 建议词API
- xiaohongshu_search: 搜索API
- sug_threshold: suggestion的阈值
- Returns:
- (q_list_next, seed_list_next, search_list)
- """
- print(f"\n{'='*60}")
- print(f"第{round_num}轮")
- print(f"{'='*60}")
- round_data = {
- "round_num": round_num,
- "input_q_list": [{"text": q.text, "score": q.score_with_o} for q in q_list],
- "input_word_list_1_size": len(word_list_1),
- "input_seed_list_size": len(seed_list)
- }
- # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list
- print(f"\n[步骤1] 为每个q请求建议词...")
- sug_list_list = [] # list of list
- for q in q_list:
- print(f"\n 处理q: {q.text}")
- suggestions = xiaohongshu_api.get_recommendations(keyword=q.text)
- q_sug_list = []
- if suggestions:
- print(f" 获取到 {len(suggestions)} 个建议词")
- for sug_text in suggestions:
- sug = Sug(
- text=sug_text,
- from_q=QFromQ(text=q.text, score_with_o=q.score_with_o)
- )
- q_sug_list.append(sug)
- else:
- print(f" 未获取到建议词")
- sug_list_list.append(q_sug_list)
- # 2. sug评估:sug_list_list -> 每个sug与o进行评分(并发)
- print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...")
- # 2.1 收集所有需要评估的sug,并记录它们所属的q
- all_sugs = []
- sug_to_q_map = {} # 记录每个sug属于哪个q
- for i, q_sug_list in enumerate(sug_list_list):
- if q_sug_list:
- q_text = q_list[i].text
- for sug in q_sug_list:
- all_sugs.append(sug)
- sug_to_q_map[id(sug)] = q_text
- # 2.2 并发评估所有sug
- async def evaluate_sug(sug: Sug) -> Sug:
- sug.score_with_o, sug.reason = await evaluate_with_o(sug.text, o)
- return sug
- if all_sugs:
- eval_tasks = [evaluate_sug(sug) for sug in all_sugs]
- await asyncio.gather(*eval_tasks)
- # 2.3 打印结果并组织到sug_details
- sug_details = {} # 保存每个Q对应的sug列表
- for i, q_sug_list in enumerate(sug_list_list):
- if q_sug_list:
- q_text = q_list[i].text
- print(f"\n 来自q '{q_text}' 的建议词:")
- sug_details[q_text] = []
- for sug in q_sug_list:
- print(f" {sug.text}: {sug.score_with_o:.2f}")
- # 保存到sug_details
- sug_details[q_text].append({
- "text": sug.text,
- "score": sug.score_with_o,
- "reason": sug.reason
- })
- # 3. search_list构建
- print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...")
- search_list = []
- high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold]
- if high_score_sugs:
- print(f" 找到 {len(high_score_sugs)} 个高分建议词")
- # 并发搜索
- async def search_for_sug(sug: Sug) -> Search:
- print(f" 搜索: {sug.text}")
- try:
- search_result = xiaohongshu_search.search(keyword=sug.text)
- result_str = search_result.get("result", "{}")
- if isinstance(result_str, str):
- result_data = json.loads(result_str)
- else:
- result_data = result_str
- notes = result_data.get("data", {}).get("data", [])
- post_list = []
- for note in notes[:10]: # 只取前10个
- post = process_note_data(note)
- post_list.append(post)
- print(f" → 找到 {len(post_list)} 个帖子")
- return Search(
- text=sug.text,
- score_with_o=sug.score_with_o,
- from_q=sug.from_q,
- post_list=post_list
- )
- except Exception as e:
- print(f" ✗ 搜索失败: {e}")
- return Search(
- text=sug.text,
- score_with_o=sug.score_with_o,
- from_q=sug.from_q,
- post_list=[]
- )
- search_tasks = [search_for_sug(sug) for sug in high_score_sugs]
- search_list = await asyncio.gather(*search_tasks)
- else:
- print(f" 没有高分建议词,search_list为空")
- # 4. 构建q_list_next
- print(f"\n[步骤4] 构建q_list_next...")
- q_list_next = []
- add_word_details = {} # 保存每个seed对应的组合词列表
- all_seed_combinations = [] # 保存本轮所有seed的组合词(用于后续构建seed_list_next)
- # 4.1 对于seed_list中的每个seed,从word_list_1中选词组合,产生Top 5
- print(f"\n 4.1 为每个seed加词(产生Top 5组合)...")
- for seed in seed_list:
- print(f"\n 处理seed: {seed.text}")
- # 从固定词库word_list_1筛选候选词
- candidate_words = []
- for word in word_list_1:
- # 检查词是否已在seed中
- if word.text in seed.text:
- continue
- # 检查词是否已被添加过
- if word.text in seed.added_words:
- continue
- candidate_words.append(word)
- if not candidate_words:
- print(f" 没有可用的候选词")
- continue
- print(f" 候选词数量: {len(candidate_words)}")
- # 调用Agent一次性选择并组合Top 5
- candidate_words_text = ', '.join([w.text for w in candidate_words])
- selection_input = f"""
- <原始问题>
- {o}
- </原始问题>
- <当前Seed>
- {seed.text}
- </当前Seed>
- <候选词列表>
- {candidate_words_text}
- </候选词列表>
- 请从候选词列表中选择最多5个最合适的词,分别与当前seed组合成新的query。
- """
- result = await Runner.run(word_selector, selection_input)
- selection_result: WordSelectionTop5 = result.final_output
- print(f" Agent选择了 {len(selection_result.combinations)} 个组合")
- print(f" 整体选择思路: {selection_result.overall_reasoning}")
- # 并发评估所有组合的相关度
- async def evaluate_combination(comb: WordCombination) -> dict:
- score, reason = await evaluate_with_o(comb.combined_query, o)
- return {
- 'word': comb.selected_word,
- 'query': comb.combined_query,
- 'score': score,
- 'reason': reason,
- 'reasoning': comb.reasoning
- }
- eval_tasks = [evaluate_combination(comb) for comb in selection_result.combinations]
- top_5 = await asyncio.gather(*eval_tasks)
- print(f" 评估完成,得到 {len(top_5)} 个组合")
- # 将Top 5全部加入q_list_next
- for comb in top_5:
- print(f" ✓ {comb['query']} (分数: {comb['score']:.2f})")
- new_q = Q(
- text=comb['query'],
- score_with_o=comb['score'],
- reason=comb['reason'],
- from_source="add"
- )
- q_list_next.append(new_q)
- # 记录已添加的词
- seed.added_words.append(comb['word'])
- # 保存到add_word_details
- add_word_details[seed.text] = [
- {
- "text": comb['query'],
- "score": comb['score'],
- "reason": comb['reason'],
- "selected_word": comb['word']
- }
- for comb in top_5
- ]
- # 保存到all_seed_combinations(用于构建seed_list_next)
- all_seed_combinations.extend(top_5)
- # 4.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next
- print(f"\n 4.2 将高分sug加入q_list_next...")
- for sug in all_sugs:
- if sug.from_q and sug.score_with_o > sug.from_q.score_with_o:
- new_q = Q(
- text=sug.text,
- score_with_o=sug.score_with_o,
- reason=sug.reason,
- from_source="sug"
- )
- q_list_next.append(new_q)
- print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} > {sug.from_q.score_with_o:.2f})")
- # 5. 构建seed_list_next(关键修改:不保留上一轮的seed)
- print(f"\n[步骤5] 构建seed_list_next(不保留上轮seed)...")
- seed_list_next = []
- existing_seed_texts = set()
- # 5.1 加入本轮所有组合词
- print(f" 5.1 加入本轮所有组合词...")
- for comb in all_seed_combinations:
- if comb['query'] not in existing_seed_texts:
- new_seed = Seed(
- text=comb['query'],
- added_words=[], # 新seed的added_words清空
- from_type="add",
- score_with_o=comb['score']
- )
- seed_list_next.append(new_seed)
- existing_seed_texts.add(comb['query'])
- print(f" ✓ {comb['query']} (分数: {comb['score']:.2f})")
- # 5.2 加入高分sug
- print(f" 5.2 加入高分sug...")
- for sug in all_sugs:
- # sug分数 > 对应query分数
- if sug.from_q and sug.score_with_o > sug.from_q.score_with_o and sug.text not in existing_seed_texts:
- new_seed = Seed(
- text=sug.text,
- added_words=[],
- from_type="sug",
- score_with_o=sug.score_with_o
- )
- seed_list_next.append(new_seed)
- existing_seed_texts.add(sug.text)
- print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} > 来源query: {sug.from_q.score_with_o:.2f})")
- # 序列化搜索结果数据(包含帖子详情)
- search_results_data = []
- for search in search_list:
- search_results_data.append({
- "text": search.text,
- "score_with_o": search.score_with_o,
- "post_list": [
- {
- "note_id": post.note_id,
- "note_url": post.note_url,
- "title": post.title,
- "body_text": post.body_text,
- "images": post.images,
- "interact_info": post.interact_info
- }
- for post in search.post_list
- ]
- })
- # 记录本轮数据
- round_data.update({
- "sug_count": len(all_sugs),
- "high_score_sug_count": len(high_score_sugs),
- "search_count": len(search_list),
- "total_posts": sum(len(s.post_list) for s in search_list),
- "q_list_next_size": len(q_list_next),
- "seed_list_next_size": len(seed_list_next),
- "total_combinations": len(all_seed_combinations),
- "output_q_list": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "from": q.from_source} for q in q_list_next],
- "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next],
- "sug_details": sug_details,
- "add_word_details": add_word_details,
- "search_results": search_results_data
- })
- context.rounds.append(round_data)
- print(f"\n本轮总结:")
- print(f" 建议词数量: {len(all_sugs)}")
- print(f" 高分建议词: {len(high_score_sugs)}")
- print(f" 搜索数量: {len(search_list)}")
- print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}")
- print(f" 组合词数量: {len(all_seed_combinations)}")
- print(f" 下轮q数量: {len(q_list_next)}")
- print(f" 下轮seed数量: {len(seed_list_next)}")
- return q_list_next, seed_list_next, search_list
- async def iterative_loop(
- context: RunContext,
- max_rounds: int = 2,
- sug_threshold: float = 0.7
- ):
- """主迭代循环"""
- print(f"\n{'='*60}")
- print(f"开始迭代循环")
- print(f"最大轮数: {max_rounds}")
- print(f"sug阈值: {sug_threshold}")
- print(f"{'='*60}")
- # 初始化
- seg_list, word_list_1, q_list, seed_list = await initialize(context.o, context)
- # API实例
- xiaohongshu_api = XiaohongshuSearchRecommendations()
- xiaohongshu_search = XiaohongshuSearch()
- # 保存初始化数据
- context.rounds.append({
- "round_num": 0,
- "type": "initialization",
- "seg_list": [{"text": s.text, "score": s.score_with_o, "reason": s.reason} for s in seg_list],
- "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list_1],
- "q_list_1": [{"text": q.text, "score": q.score_with_o, "reason": q.reason} for q in q_list],
- "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o} for s in seed_list]
- })
- # 收集所有搜索结果
- all_search_list = []
- # 迭代
- round_num = 1
- while q_list and round_num <= max_rounds:
- q_list, seed_list, search_list = await run_round(
- round_num=round_num,
- q_list=q_list,
- word_list_1=word_list_1, # 传递固定词库
- seed_list=seed_list,
- o=context.o,
- context=context,
- xiaohongshu_api=xiaohongshu_api,
- xiaohongshu_search=xiaohongshu_search,
- sug_threshold=sug_threshold
- )
- all_search_list.extend(search_list)
- round_num += 1
- print(f"\n{'='*60}")
- print(f"迭代完成")
- print(f" 总轮数: {round_num - 1}")
- print(f" 总搜索次数: {len(all_search_list)}")
- print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}")
- print(f"{'='*60}")
- return all_search_list
- # ============================================================================
- # 主函数
- # ============================================================================
- async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False):
- """主函数"""
- current_time, log_url = set_trace()
- # 读取输入
- input_context_file = os.path.join(input_dir, 'context.md')
- input_q_file = os.path.join(input_dir, 'q.md')
- c = read_file_as_string(input_context_file) # 原始需求
- o = read_file_as_string(input_q_file) # 原始问题
- # 版本信息
- version = os.path.basename(__file__)
- version_name = os.path.splitext(version)[0]
- # 日志目录
- log_dir = os.path.join(input_dir, "output", version_name, current_time)
- # 创建运行上下文
- run_context = RunContext(
- version=version,
- input_files={
- "input_dir": input_dir,
- "context_file": input_context_file,
- "q_file": input_q_file,
- },
- c=c,
- o=o,
- log_dir=log_dir,
- log_url=log_url,
- )
- # 执行迭代
- all_search_list = await iterative_loop(
- run_context,
- max_rounds=max_rounds,
- sug_threshold=sug_threshold
- )
- # 格式化输出
- output = f"原始需求:{run_context.c}\n"
- output += f"原始问题:{run_context.o}\n"
- output += f"总搜索次数:{len(all_search_list)}\n"
- output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n"
- output += "\n" + "="*60 + "\n"
- if all_search_list:
- output += "【搜索结果】\n\n"
- for idx, search in enumerate(all_search_list, 1):
- output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n"
- output += f" 帖子数: {len(search.post_list)}\n"
- if search.post_list:
- for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个
- output += f" {post_idx}) {post.title}\n"
- output += f" URL: {post.note_url}\n"
- output += "\n"
- else:
- output += "未找到搜索结果\n"
- run_context.final_output = output
- print(f"\n{'='*60}")
- print("最终结果")
- print(f"{'='*60}")
- print(output)
- # 保存日志
- os.makedirs(run_context.log_dir, exist_ok=True)
- context_file_path = os.path.join(run_context.log_dir, "run_context.json")
- context_dict = run_context.model_dump()
- with open(context_file_path, "w", encoding="utf-8") as f:
- json.dump(context_dict, f, ensure_ascii=False, indent=2)
- print(f"\nRunContext saved to: {context_file_path}")
- # 保存详细的搜索结果
- search_results_path = os.path.join(run_context.log_dir, "search_results.json")
- search_results_data = [s.model_dump() for s in all_search_list]
- with open(search_results_path, "w", encoding="utf-8") as f:
- json.dump(search_results_data, f, ensure_ascii=False, indent=2)
- print(f"Search results saved to: {search_results_path}")
- # 可视化
- if visualize:
- import subprocess
- output_html = os.path.join(run_context.log_dir, "visualization.html")
- print(f"\n🎨 生成可视化HTML...")
- # 获取绝对路径
- abs_context_file = os.path.abspath(context_file_path)
- abs_output_html = os.path.abspath(output_html)
- # 运行可视化脚本
- result = subprocess.run([
- "node",
- "visualization/sug_v6_1_2_8/index.js",
- abs_context_file,
- abs_output_html
- ])
- if result.returncode == 0:
- print(f"✅ 可视化已生成: {output_html}")
- else:
- print(f"❌ 可视化生成失败")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.115 广度遍历版")
- parser.add_argument(
- "--input-dir",
- type=str,
- default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?",
- help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?"
- )
- parser.add_argument(
- "--max-rounds",
- type=int,
- default=4,
- help="最大轮数,默认: 4"
- )
- parser.add_argument(
- "--sug-threshold",
- type=float,
- default=0.7,
- help="suggestion阈值,默认: 0.7"
- )
- parser.add_argument(
- "--visualize",
- action="store_true",
- default=True,
- help="运行完成后自动生成可视化HTML"
- )
- args = parser.parse_args()
- asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize))
|