| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344 |
- import asyncio
- import json
- import os
- import sys
- import argparse
- from datetime import datetime
- from typing import Literal
- from agents import Agent, Runner, ModelSettings
- from lib.my_trace import set_trace
- from pydantic import BaseModel, Field
- from lib.utils import read_file_as_string
- from lib.client import get_model
- MODEL_NAME = "google/gemini-2.5-flash"
- # 得分提升阈值:sug或组合词必须比来源query提升至少此幅度才能进入下一轮
- REQUIRED_SCORE_GAIN = 0.05
- from script.search_recommendations.xiaohongshu_search_recommendations import XiaohongshuSearchRecommendations
- from script.search.xiaohongshu_search import XiaohongshuSearch
- # ============================================================================
- # 日志工具类
- # ============================================================================
- class TeeLogger:
- """同时输出到控制台和日志文件的工具类"""
- def __init__(self, stdout, log_file):
- self.stdout = stdout
- self.log_file = log_file
- def write(self, message):
- self.stdout.write(message)
- self.log_file.write(message)
- self.log_file.flush() # 实时写入,避免丢失日志
- def flush(self):
- self.stdout.flush()
- self.log_file.flush()
- # ============================================================================
- # 数据模型
- # ============================================================================
- class Seg(BaseModel):
- """分词(旧版)- v120使用"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_o: str = "" # 原始问题
- # ============================================================================
- # 新架构数据模型 (v121)
- # ============================================================================
- class Segment(BaseModel):
- """语义片段(Round 0语义分段结果)"""
- text: str # 片段文本
- type: str # 语义类型: 疑问标记/核心动作/修饰短语/中心名词/逻辑连接
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_o: str = "" # 原始问题
- words: list[str] = Field(default_factory=list) # 该片段拆分出的词列表(Round 0拆词结果)
- word_scores: dict[str, float] = Field(default_factory=dict) # 词的评分 {word: score}
- word_reasons: dict[str, str] = Field(default_factory=dict) # 词的评分理由 {word: reason}
- class DomainCombination(BaseModel):
- """域组合(Round N的N域组合结果)"""
- text: str # 组合后的文本
- domains: list[int] = Field(default_factory=list) # 参与组合的域索引列表(对应segments的索引)
- type_label: str = "" # 类型标签,如 [疑问标记+核心动作+中心名词]
- source_words: list[list[str]] = Field(default_factory=list) # 来源词列表,每个元素是一个域的词列表,如 [["猫咪"], ["梗图"]]
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_segments: list[str] = Field(default_factory=list) # 来源segment的文本列表
- # ============================================================================
- # 旧架构数据模型(保留但不使用)
- # ============================================================================
- # class Word(BaseModel):
- # """词(旧版)- v120使用,v121不再使用"""
- # text: str
- # score_with_o: float = 0.0 # 与原始问题的评分
- # from_o: str = "" # 原始问题
- class Word(BaseModel):
- """词"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- from_o: str = "" # 原始问题
- class QFromQ(BaseModel):
- """Q来源信息(用于Sug中记录)"""
- text: str
- score_with_o: float = 0.0
- class Q(BaseModel):
- """查询"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_source: str = "" # v120: seg/sug/add; v121新增: segment/domain_comb/sug
- type_label: str = "" # v121新增:域类型标签(仅用于domain_comb来源)
- class Sug(BaseModel):
- """建议词"""
- text: str
- score_with_o: float = 0.0 # 与原始问题的评分
- reason: str = "" # 评分理由
- from_q: QFromQ | None = None # 来自的q
- class Seed(BaseModel):
- """种子(旧版)- v120使用,v121不再使用"""
- text: str
- added_words: list[str] = Field(default_factory=list) # 已经增加的words
- from_type: str = "" # seg/sug/add
- score_with_o: float = 0.0 # 与原始问题的评分
- class Post(BaseModel):
- """帖子"""
- title: str = ""
- body_text: str = ""
- type: str = "normal" # video/normal
- images: list[str] = Field(default_factory=list) # 图片url列表,第一张为封面
- video: str = "" # 视频url
- interact_info: dict = Field(default_factory=dict) # 互动信息
- note_id: str = ""
- note_url: str = ""
- class Search(Sug):
- """搜索结果(继承Sug)"""
- post_list: list[Post] = Field(default_factory=list) # 搜索得到的帖子列表
- class RunContext(BaseModel):
- """运行上下文"""
- version: str
- input_files: dict[str, str]
- c: str # 原始需求
- o: str # 原始问题
- log_url: str
- log_dir: str
- # v121新增:语义分段结果
- segments: list[dict] = Field(default_factory=list) # Round 0的语义分段结果
- # 每轮的数据
- rounds: list[dict] = Field(default_factory=list) # 每轮的详细数据
- # 最终结果
- final_output: str | None = None
- # 评估缓存:避免重复评估相同文本
- evaluation_cache: dict[str, tuple[float, str]] = Field(default_factory=dict)
- # key: 文本, value: (score, reason)
- # ============================================================================
- # Agent 定义
- # ============================================================================
- # ============================================================================
- # v121 新增 Agent
- # ============================================================================
- # Agent: 语义分段专家 (Prompt1)
- class SemanticSegment(BaseModel):
- """单个语义片段"""
- segment_text: str = Field(..., description="片段文本")
- segment_type: str = Field(..., description="语义类型(疑问标记/核心动作/修饰短语/中心名词/逻辑连接)")
- reasoning: str = Field(..., description="分段理由")
- class SemanticSegmentation(BaseModel):
- """语义分段结果"""
- segments: list[SemanticSegment] = Field(..., description="语义片段列表")
- overall_reasoning: str = Field(..., description="整体分段思路")
- semantic_segmentation_instructions = """
- 你是语义分段专家。给定一个搜索query,将其拆分成不同语义类型的片段。
- ## 语义类型定义(5种)
- 1. **疑问标记**:如何、怎么、什么、哪里等疑问词
- 2. **核心动作**:关键动词,如获取、制作、拍摄、寻找等
- 3. **修饰短语**:形容词、副词等修饰成分,如高质量、能体现...特色等
- 4. **中心名词**:核心名词,如素材、梗图、攻略等
- 5. **逻辑连接**:并且、或者、以及等连接词(较少出现)
- ## 分段原则
- 1. **语义完整性**:每个片段应该是一个完整的语义单元
- 2. **类型互斥**:每个片段只能属于一种类型
- 3. **保留原文**:片段文本必须保留原query中的字符,不得改写
- 4. **顺序保持**:片段顺序应与原query一致
- ## 示例
- **Query**: "如何获取能体现川西秋季特色的高质量风光摄影素材?"
- **分段结果**:
- 1. "如何" - 疑问标记
- 2. "获取" - 核心动作
- 3. "能体现川西秋季特色的" - 修饰短语
- 4. "高质量" - 修饰短语
- 5. "风光摄影素材" - 中心名词
- ## 输出要求
- - segments: 片段列表
- - segment_text: 片段文本(必须来自原query)
- - segment_type: 语义类型(从5种类型中选择)
- - reasoning: 为什么这样分段
- - overall_reasoning: 整体分段思路
- ## JSON输出规范
- 1. **格式要求**:必须输出标准JSON格式
- 2. **引号规范**:字符串中如需表达引用,使用书名号《》或「」,不要使用英文引号或中文引号""
- """.strip()
- semantic_segmenter = Agent[None](
- name="语义分段专家",
- instructions=semantic_segmentation_instructions,
- model=get_model(MODEL_NAME),
- output_type=SemanticSegmentation,
- )
- # ============================================================================
- # v120 保留 Agent
- # ============================================================================
- # Agent 1: 分词专家(v121用于Round 0拆词)
- class WordSegmentation(BaseModel):
- """分词结果"""
- words: list[str] = Field(..., description="分词结果列表")
- reasoning: str = Field(..., description="分词理由")
- word_segmentation_instructions = """
- 你是分词专家。给定一个query,将其拆分成有意义的最小单元。
- ## 分词原则
- 1. 保留有搜索意义的词汇
- 2. 拆分成独立的概念
- 3. 保留专业术语的完整性
- 4. 去除虚词(的、吗、呢等)
- ## 输出要求
- 返回分词列表和分词理由。
- """.strip()
- word_segmenter = Agent[None](
- name="分词专家",
- instructions=word_segmentation_instructions,
- model=get_model(MODEL_NAME),
- output_type=WordSegmentation,
- )
- # Agent 2: 动机维度评估专家 + 品类维度评估专家(两阶段评估)
- # 动机评估的嵌套模型
- class CoreMotivationExtraction(BaseModel):
- """核心动机提取"""
- 简要说明核心动机: str = Field(..., description="核心动机说明")
- class MotivationEvaluation(BaseModel):
- """动机维度评估"""
- 原始问题核心动机提取: CoreMotivationExtraction = Field(..., description="原始问题核心动机提取")
- 动机维度得分: float = Field(..., description="动机维度得分 -1~1")
- 简要说明动机维度相关度理由: str = Field(..., description="动机维度相关度理由")
- class CategoryEvaluation(BaseModel):
- """品类维度评估"""
- 品类维度得分: float = Field(..., description="品类维度得分 -1~1")
- 简要说明品类维度相关度理由: str = Field(..., description="品类维度相关度理由")
- # 动机评估 prompt - 第一轮版本(来自 sug_v6_1_2_115.py)
- motivation_evaluation_instructions_round1 = """
- #角色
- 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。
- ---
- # 核心概念与方法论
- ## 评估维度
- 本评估系统围绕 **动机维度** 进行:
- ### 1. 动机维度
- **定义:** 用户"想要做什么",即原始问题的行为意图和目的
- - 核心是 **动词**:获取、学习、拍摄、制作、寻找等
- - 包括:核心动作 + 使用场景 + 最终目的
- ---
- ## 如何识别原始问题的核心动机
- **核心动机必须是动词**,识别方法如下:
- ### 方法1: 显性动词直接提取
- 当原始问题明确包含动词时,直接提取
- 示例:
- "如何获取素材" → 核心动机 = "获取"
- "寻找拍摄技巧" → 核心动机 = "寻找"(或"学习")
- "制作视频教程" → 核心动机 = "制作"
- ### 方法2: 隐性动词语义推理
- 当原始问题没有显性动词时,需要结合上下文推理
- 示例:
- 例: "川西秋天风光摄影" → 隐含动作="拍摄"
- → 需结合上下文判断
- 如果原始问题是纯名词短语,无任何动作线索:
- → 核心动机 = 无法识别
- → 在此情况下,动机维度得分应为 0。
- 示例:
- "摄影" → 无法识别动机,动机维度得分 = 0
- "川西风光" → 无法识别动机,动机维度得分 = 0
- ---
- # 输入信息
- 你将接收到以下输入:
- - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。
- - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。
- #判定流程
- #评估架构
- 输入: <原始问题> + <平台sug词条>
- ↓
- 【动机维度相关性判定】
- ├→ 步骤1: 评估<sug词条>与<原始问题>的需求动机匹配度
- └→ 输出: -1到1之间的数值 + 判定依据
- 相关度评估维度详解
- 维度1: 动机维度评估
- 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度
- 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性
- 评分标准:
- 【正向匹配】
- +0.95~1.0: 核心动作完全一致
- - 例: 原始问题"如何获取素材" vs sug词"素材获取方法"
- - 特殊规则: 如果sug词的核心动作是原始问题动作的**具体化子集**,也判定为完全一致
- · 例: 原始问题"扣除猫咪主体的方法" vs sug词"扣除猫咪眼睛的方法"(子集但目的一致)
- +0.75~0.95: 核心动作语义相近或为同义表达
- - 例: 原始问题"如何获取素材" vs sug词"如何下载素材"
- - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略
- +0.5~0.75: 核心动作相关但非直接对应(相关实现路径)
- - 例: 原始问题"如何获取素材" vs sug词"素材管理整理"
- +0.2~0.45: 核心动作弱相关(同领域不同动作)
- - 例: 原始问题"如何拍摄风光" vs sug词"风光摄影欣赏"
- 【中性/无关】
- 0: 没有明确目的,动作意图无明确关联
- - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐"
- - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0
- - 如果原始问题无法识别动机,则动机维度得分为0。
- 【负向偏离】
- -0.2~-0.05: 动作意图轻度冲突或误导
- - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知"
- -0.5~-0.25: 动作意图明显对立
- - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材"
- -1.0~-0.55: 动作意图完全相反或产生严重负面引导
- - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销"
- ---
- # 输出要求
- 输出结果必须为一个 **JSON 格式**,包含以下内容:
- ```json
- {
- "原始问题核心动机提取": {
- "简要说明核心动机": ""
- },
- "动机维度得分": "-1到1之间的小数",
- "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由"
- }
- **输出约束(非常重要)**:
- 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内**
- 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合
- 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号
- #注意事项:
- 始终围绕动机维度:所有评估都基于"动机"维度,不偏离
- 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础
- 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移
- 负分使用原则:仅当sug词条对原始问题动机产生误导、冲突或有害引导时给予负分
- 零分使用原则:当sug词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。
- """.strip()
- # 动机评估 prompt - 后续轮次版本(当前 116 版本)
- motivation_evaluation_instructions = """
- #角色
- 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的需求动机匹配度,给出 **-1 到 1 之间** 的数值评分。
- ---
- # 动机评估核心原则(必读)
- ### 动机 = 动作 + 对象 + 场景
- 评估时必须同时考虑三要素,不能只看动词:
- - **动作**:制定、规划、获取、拍摄等
- - **对象**:旅行行程 vs 每日计划、风光照片 vs 证件照
- - **场景**:旅游 vs 日常、摄影 vs 办公
- ### 关键判断:动词相同 ≠ 动机匹配
- 错误:只看动词相同就给高分
- - "制定旅行行程" vs "制定每日计划" → 给0.95 错误
- - "拍摄风光" vs "拍摄证件照" → 给0.95 错误
- 正确:检查对象和场景是否匹配
- - 对象不同领域 → 降至0.3左右
- - 场景不同 → 降至0.3左右
- # 核心概念与方法论
- ## 评估维度
- 本评估系统围绕 **动机维度** 进行:
- # 维度独立性警告
- 【严格约束】本评估**只评估动机维度**:
- **禁止使用"主题相关"作为评分依据**:评分理由中不得出现"主题"、"内容"、"话题"等词
- ### 1. 动机维度
- **定义:** 用户"想要做什么",即原始问题的行为意图和目的
- - 核心是 **动词**:获取、学习、拍摄、制作、寻找等
- - 包括:核心动作 + 使用场景 + 最终目的
- ---
- 如果原始问题是纯名词短语,无任何动作线索:
- → 核心动机 = 无法识别
- → 在此情况下,动机维度得分应为 0。
- 示例:
- "摄影" → 无法识别动机,动机维度得分 = 0
- "川西风光" → 无法识别动机,动机维度得分 = 0
- ---
- # 输入信息
- 你将接收到以下输入:
- - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。
- - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。
- #判定流程
- #评估架构
- 输入: <原始问题> + <平台sug词条>
- ↓
- 【动机维度相关性判定】
- ├→ 步骤1: 评估<sug词条>与<原始问题>的需求动机匹配度
- └→ 输出: -1到1之间的数值 + 判定依据
- 相关度评估维度详解
- 维度1: 动机维度评估
- 评估对象: <平台sug词条> 与 <原始问题> 的需求动机匹配度
- 说明: 核心动作是用户需求的第一优先级,决定了推荐的基本有效性
- 评分标准:
- 【正向匹配】
- +0.95~1.0: 动作+对象+场景完全一致
- - 要求:动词、对象、场景都必须匹配,不能只看动词
- - "制定旅行行程" vs "制定每日计划"
- 虽然动词相同,但对象和场景完全不同,不属于高分
- - 特殊规则: 如果sug词的核心动作是原始问题动作在动作+对象+场景一致下的**具体化子集**,也判定为完全一致
- +0.75~0.95: 核心动作语义相近或为同义表达
- - 例: 原始问题"如何获取素材" vs sug词"如何下载素材"
- - 同义词对: 获取≈下载≈寻找, 技巧≈方法≈教程≈攻略
- +0.5~0.75: 核心动作相关但非直接对应(相关实现路径)
- - 例: 原始问题"如何获取素材" vs sug词"素材管理整理"
- +0.25~0.4: 动词相同但对象或场景明显不同(弱相关)
- - 判断要点:动词一致,但对象不同领域或场景不同
- - 关键:不要因为动词相同就给0.95,必须检查对象!
- 【中性/无关】
- 0: 没有明确目的,动作意图无明确关联
- - 例: 原始问题"如何获取素材" vs sug词"摄影器材推荐"
- - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0
- - 如果原始问题无法识别动机,则动机维度得分为0
- 特别注意 - 禁止的错误理由:
- - 禁止: "虽然没有动作,但主题相关,所以给0.2"
- - 禁止:"内容有参考价值,所以给0.15"
- - 禁止: "都提到了XX(名词),所以不是完全无关"
- - 正确理由:"sug词条无动作意图,与原始问题的'XX'动机完全无关"
- 【负向偏离】
- -0.2~-0.05: 动作意图轻度冲突或误导
- - 例: 原始问题"如何获取素材" vs sug词"素材版权保护须知"
- -0.5~-0.25: 动作意图明显对立
- - 例: 原始问题"如何获取免费素材" vs sug词"如何售卖素材"
- -1.0~-0.55: 动作意图完全相反或产生严重负面引导
- - 例: 原始问题"免费素材获取" vs sug词"付费素材强制推销"
- ---
- # 输出要求
- 输出结果必须为一个 **JSON 格式**,包含以下内容:
- ```json
- {
- "原始问题核心动机提取": {
- "简要说明核心动机": ""
- },
- "动机维度得分": "-1到1之间的小数",
- "简要说明动机维度相关度理由": "评估该sug词条与原始问题动机匹配程度的理由"
- }
- **输出约束(非常重要)**:
- 1. **字符串长度限制**:\"简要说明动机维度相关度理由\"字段必须控制在**150字以内**
- 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合
- 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号
- #注意事项:
- 始终围绕动机维度:所有评估都基于"动机"维度,不偏离
- 核心动机必须是动词:在评估前,必须先提取原始问题的核心动机(动词),这是整个评估的基础
- 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移
- 负分使用原则:仅当sug词条对原始问题动机产生误导、冲突或有害引导时给予负分
- 零分使用原则:当sug词条与原始问题动机无明确关联,既不相关也不冲突时给予零分,或原始问题无法识别动机时。
- """.strip()
- # 品类评估 prompt
- category_evaluation_instructions = """
- #角色
- 你是一个 **专业的语言专家和语义相关性评判专家**。你的任务是:判断我给你的 <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度,给出 **-1 到 1 之间** 的数值评分。
- ---
- # 核心概念与方法论
- ## 评估维度
- 本评估系统围绕 **品类维度** 进行:
- # 维度独立性警告
- 【严格约束】本评估**只评估品类维度**,,必须遵守以下规则:
- 1. **只看名词和限定词**:评估时只考虑主体、限定词的匹配度
- 2. **完全忽略动词**:动作意图、目的等动机信息对本维度评分无影响
- ### 品类维度
- **定义:** 用户"关于什么内容",即原始问题的主题对象和限定词
- - 核心是 **名词+限定词**:川西秋季风光摄影素材
- - 包括:核心主体 + 地域限定 + 时间限定 + 质量限定等
- ## ⚠️ 品类评估核心原则(必读)
- ### 原则1:只看词条表面,禁止联想推演
- - 只能基于sug词实际包含的词汇评分
- - 禁止推测"可能包含"、"可以理解为"
- **错误示例:**
- 原始问题:"川西旅行行程" vs sug词:"每日计划"
- - 错误 "每日计划可以包含旅行规划,所以有关联" → 这是不允许的联想
- - 正确: "sug词只有'每日计划',无'旅行'字眼,品类不匹配" → 正确判断
- ### 原则2:通用概念 ≠ 特定概念
- - **通用**:计划、方法、技巧、素材(无领域限定)
- - **特定**:旅行行程、摄影技巧、烘焙方法(有明确领域)
- IF sug词是通用 且 原始问题是特定:
- → 品类不匹配 → 评分0.05~0.1
- 关键:通用概念不等于特定概念,不能因为"抽象上都是规划"就给分
- ---
- # 输入信息
- 你将接收到以下输入:
- - **<原始问题>**:用户的初始查询问题,代表用户的真实需求意图。
- - **<平台sug词条>**:平台推荐的词条列表,每个词条需要单独评估。
- #判定流程
- #评估架构
- 输入: <原始问题> + <平台sug词条>
- ↓
- 【品类维度相关性判定】
- ├→ 步骤1: 评估<sug词条>与<原始问题>的内容主体和限定词匹配度
- └→ 输出: -1到1之间的数值 + 判定依据
- 相关度评估维度详解
- 维度2: 品类维度评估
- 评估对象: <平台sug词条> 与 <原始问题> 的内容主体和限定词匹配度
- 评分标准:
- 【正向匹配】
- +0.95~1.0: 核心主体+所有关键限定词完全匹配
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西秋季风光摄影作品"
- +0.75~0.95: 核心主体匹配,存在限定词匹配
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"川西风光摄影素材"(缺失"秋季")
- +0.5~0.75: 核心主体匹配,无限定词匹配或合理泛化
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"四川风光摄影"
- +0.3~0.5: 核心主体匹配,但限定词缺失或存在语义错位
- - 特别注意"语义身份"差异,主体词出现但上下文语义不同
- - 例:
- · "猫咪的XX行为"(猫咪是行为者)
- · vs "用猫咪表达XX的梗图"(猫咪是媒介)
- · 虽都含"猫咪+XX",但语义角色不同
- +0.2~0.3: 主体词不匹配,限定词缺失或错位
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"风光摄影入门"
- +0.05~0.2: 主体词过度泛化或仅抽象相似
- - 例: sug词是通用概念,原始问题是特定概念
- sug词"每日计划"(通用)vs 原始问题 "川西旅行行程"(特定)
- → 评分:0.08
- 【中性/无关】
- 0: 类别明显不同,没有明确目的,无明确关联
- - 例: 原始问题"川西秋季风光摄影素材" vs sug词"人像摄影素材"
- - 例: 原始问题无法识别动机 且 sug词也无明确动作 → 0
- 【负向偏离】
- -0.2~-0.05: 主体词或限定词存在误导性
- - 例: 原始问题"免费摄影素材" vs sug词"付费摄影素材库"
- -0.5~-0.25: 主体词明显错位或品类冲突
- - 例: 原始问题"风光摄影素材" vs sug词"人像修图教程"
- -1.0~-0.55: 完全错误的品类或有害引导
- - 例: 原始问题"正版素材获取" vs sug词"盗版素材下载"
- ---
- # 输出要求
- 输出结果必须为一个 **JSON 格式**,包含以下内容:
- ```json
- {
- "品类维度得分": "-1到1之间的小数",
- "简要说明品类维度相关度理由": "评估该sug词条与原始问题品类匹配程度的理由"
- }
- ---
- **输出约束(非常重要)**:
- 1. **字符串长度限制**:\"简要说明品类维度相关度理由\"字段必须控制在**150字以内**
- 2. **JSON格式规范**:必须生成完整的JSON格式,确保字符串用双引号包裹且正确闭合
- 3. **引号使用**:字符串中如需表达引用,请使用《》或「」代替单引号或双引号
- ---
- #注意事项:
- 始终围绕品类维度:所有评估都基于"品类"维度,不偏离
- 严格标准一致性:对所有用例使用相同的评估标准,避免评分飘移
- 负分使用原则:仅当sug词条对原始问题品类产生误导、冲突或有害引导时给予负分
- 零分使用原则:当sug词条与原始问题品类无明确关联,既不相关也不冲突时给予零分
- """.strip()
- # 创建评估 Agent
- motivation_evaluator = Agent[None](
- name="动机维度评估专家(后续轮次)",
- instructions=motivation_evaluation_instructions,
- model=get_model(MODEL_NAME),
- output_type=MotivationEvaluation)
- category_evaluator = Agent[None](
- name="品类维度评估专家",
- instructions=category_evaluation_instructions,
- model=get_model(MODEL_NAME),
- output_type=CategoryEvaluation
- )
- # ============================================================================
- # v120 保留但不使用的 Agent(v121不再使用)
- # ============================================================================
- # # Agent 3: 加词选择专家(旧版 - v120使用,v121不再使用)
- # class WordCombination(BaseModel):
- # """单个词组合"""
- # selected_word: str = Field(..., description="选择的词")
- # combined_query: str = Field(..., description="组合后的新query")
- # reasoning: str = Field(..., description="选择理由")
- # class WordSelectionTop5(BaseModel):
- # """加词选择结果(Top 5)"""
- # combinations: list[WordCombination] = Field(
- # ...,
- # description="选择的Top 5组合(不足5个则返回所有)",
- # min_items=1,
- # max_items=5
- # )
- # overall_reasoning: str = Field(..., description="整体选择思路")
- # word_selection_instructions 已删除 (v121不再使用)
- # word_selector = Agent[None](
- # name="加词组合专家",
- # instructions=word_selection_instructions,
- # model=get_model(MODEL_NAME),
- # output_type=WordSelectionTop5,
- # model_settings=ModelSettings(temperature=0.2),
- # )
- # ============================================================================
- # 辅助函数
- # ============================================================================
- # ============================================================================
- # v121 新增辅助函数
- # ============================================================================
- def get_ordered_subsets(words: list[str], min_len: int = 1) -> list[list[str]]:
- """
- 生成words的所有有序子集(可跳过但不可重排)
- 使用 itertools.combinations 生成索引组合,保持原始顺序
- Args:
- words: 词列表
- min_len: 子集最小长度
- Returns:
- 所有可能的有序子集列表
- Example:
- words = ["川西", "秋季", "风光"]
- 结果:
- - 长度1: ["川西"], ["秋季"], ["风光"]
- - 长度2: ["川西", "秋季"], ["川西", "风光"], ["秋季", "风光"]
- - 长度3: ["川西", "秋季", "风光"]
- 共 C(3,1) + C(3,2) + C(3,3) = 3 + 3 + 1 = 7种
- """
- from itertools import combinations
- subsets = []
- n = len(words)
- # 遍历所有可能的长度(从min_len到n)
- for r in range(min_len, n + 1):
- # 生成长度为r的所有索引组合
- for indices in combinations(range(n), r):
- # 按照原始顺序提取词
- subset = [words[i] for i in indices]
- subsets.append(subset)
- return subsets
- def generate_domain_combinations(segments: list[Segment], n_domains: int) -> list[DomainCombination]:
- """
- 生成N域组合
- 步骤:
- 1. 从len(segments)个域中选择n_domains个域(组合,保持顺序)
- 2. 对每个选中的域,生成其words的所有有序子集
- 3. 计算笛卡尔积,生成所有可能的组合
- Args:
- segments: 语义片段列表
- n_domains: 参与组合的域数量
- Returns:
- 所有可能的N域组合列表
- Example:
- 有4个域: [疑问标记, 核心动作, 修饰短语, 中心名词]
- n_domains=2时,选择域的方式: C(4,2) = 6种
- 假设选中[核心动作, 中心名词]:
- - 核心动作的words: ["获取"], 子集: ["获取"]
- - 中心名词的words: ["风光", "摄影", "素材"], 子集: 7种
- 则该域选择下的组合数: 1 * 7 = 7种
- """
- from itertools import combinations, product
- all_combinations = []
- n = len(segments)
- # 检查参数有效性
- if n_domains > n or n_domains < 1:
- return []
- # 1. 选择n_domains个域(保持原始顺序)
- for domain_indices in combinations(range(n), n_domains):
- selected_segments = [segments[i] for i in domain_indices]
- # 2. 为每个选中的域生成其words的所有有序子集
- domain_subsets = []
- for seg in selected_segments:
- if len(seg.words) == 0:
- # 如果某个域没有词,跳过该域组合
- domain_subsets = []
- break
- subsets = get_ordered_subsets(seg.words, min_len=1)
- domain_subsets.append(subsets)
- # 如果某个域没有词,跳过
- if len(domain_subsets) != n_domains:
- continue
- # 3. 计算笛卡尔积
- for word_combination in product(*domain_subsets):
- # word_combination 是一个tuple,每个元素是一个词列表
- # 例如: (["获取"], ["风光", "摄影"])
- # 将所有词连接成一个字符串
- combined_text = "".join(["".join(words) for words in word_combination])
- # 生成类型标签
- type_labels = [selected_segments[i].type for i in range(n_domains)]
- type_label = "[" + "+".join(type_labels) + "]"
- # 创建DomainCombination对象
- comb = DomainCombination(
- text=combined_text,
- domains=list(domain_indices),
- type_label=type_label,
- source_words=[list(words) for words in word_combination], # 保存来源词
- from_segments=[seg.text for seg in selected_segments]
- )
- all_combinations.append(comb)
- return all_combinations
- def extract_words_from_segments(segments: list[Segment]) -> list[Q]:
- """
- 从 segments 中提取所有 words,转换为 Q 对象列表
- 用于 Round 1 的输入:将 Round 0 的 words 转换为可用于请求SUG的 query 列表
- Args:
- segments: Round 0 的语义片段列表
- Returns:
- list[Q]: word 列表,每个 word 作为一个 Q 对象
- """
- q_list = []
- for seg_idx, segment in enumerate(segments):
- for word in segment.words:
- # 从 segment.word_scores 获取该 word 的评分
- word_score = segment.word_scores.get(word, 0.0)
- word_reason = segment.word_reasons.get(word, "")
- # 创建 Q 对象
- q = Q(
- text=word,
- score_with_o=word_score,
- reason=word_reason,
- from_source="word", # 标记来源为 word
- type_label=f"[{segment.type}]" # 保留域信息
- )
- q_list.append(q)
- return q_list
- # ============================================================================
- # v120 保留辅助函数
- # ============================================================================
- def calculate_final_score(motivation_score: float, category_score: float) -> float:
- """
- 应用依存性规则计算最终得分
- 步骤1: 基础加权计算
- base_score = motivation_score * 0.7 + category_score * 0.3
- 步骤2: 极值保护规则
- Args:
- motivation_score: 动机维度得分 -1~1
- category_score: 品类维度得分 -1~1
- Returns:
- 最终得分 -1~1
- """
- # 基础加权得分
- base_score = motivation_score * 0.7 + category_score * 0.3
- # 规则C: 动机负向决定机制(最高优先级)
- if motivation_score < 0:
- return 0.0
- # 规则A: 动机高分保护机制
- if motivation_score >= 0.8:
- # 当目的高度一致时,品类的泛化不应导致"弱相关"
- return max(base_score, 0.7)
- # 规则B: 动机低分限制机制
- if motivation_score <= 0.2:
- # 目的不符时,品类匹配的价值有限
- return min(base_score, 0.5)
- # 无规则调整,返回基础得分
- return base_score
- def clean_json_string(text: str) -> str:
- """清理JSON中的非法控制字符(保留 \t \n \r)"""
- import re
- # 移除除了 \t(09) \n(0A) \r(0D) 之外的所有控制字符
- return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text)
- def process_note_data(note: dict) -> Post:
- """处理搜索接口返回的帖子数据"""
- note_card = note.get("note_card", {})
- image_list = note_card.get("image_list", [])
- interact_info = note_card.get("interact_info", {})
- user_info = note_card.get("user", {})
- # ========== 调试日志 START ==========
- note_id = note.get("id", "")
- raw_title = note_card.get("display_title") # 不提供默认值
- raw_body = note_card.get("desc")
- raw_type = note_card.get("type")
- # 打印原始值类型和内容
- print(f"\n[DEBUG] 处理帖子 {note_id}:")
- print(f" raw_title 类型: {type(raw_title).__name__}, 值: {repr(raw_title)}")
- print(f" raw_body 类型: {type(raw_body).__name__}, 值: {repr(raw_body)[:100] if raw_body else repr(raw_body)}")
- print(f" raw_type 类型: {type(raw_type).__name__}, 值: {repr(raw_type)}")
- # 检查是否为 None
- if raw_title is None:
- print(f" ⚠️ WARNING: display_title 是 None!")
- if raw_body is None:
- print(f" ⚠️ WARNING: desc 是 None!")
- if raw_type is None:
- print(f" ⚠️ WARNING: type 是 None!")
- # ========== 调试日志 END ==========
- # 提取图片URL - 使用新的字段名 image_url
- images = []
- for img in image_list:
- if isinstance(img, dict):
- # 尝试新字段名 image_url,如果不存在则尝试旧字段名 url_default
- img_url = img.get("image_url") or img.get("url_default")
- if img_url:
- images.append(img_url)
- # 判断类型
- note_type = note_card.get("type", "normal")
- video_url = ""
- if note_type == "video":
- video_info = note_card.get("video", {})
- if isinstance(video_info, dict):
- # 尝试获取视频URL
- video_url = video_info.get("media", {}).get("stream", {}).get("h264", [{}])[0].get("master_url", "")
- return Post(
- note_id=note.get("id") or "",
- title=note_card.get("display_title") or "",
- body_text=note_card.get("desc") or "",
- type=note_type,
- images=images,
- video=video_url,
- interact_info={
- "liked_count": interact_info.get("liked_count", 0),
- "collected_count": interact_info.get("collected_count", 0),
- "comment_count": interact_info.get("comment_count", 0),
- "shared_count": interact_info.get("shared_count", 0)
- },
- note_url=f"https://www.xiaohongshu.com/explore/{note.get('id', '')}"
- )
- async def evaluate_with_o(text: str, o: str, cache: dict[str, tuple[float, str]] | None = None) -> tuple[float, str]:
- """评估文本与原始问题o的相关度
- 采用两阶段评估 + 代码计算规则:
- 1. 动机维度评估(权重70%)
- 2. 品类维度评估(权重30%)
- 3. 应用规则A/B/C调整得分
- Args:
- text: 待评估的文本
- o: 原始问题
- cache: 评估缓存(可选),用于避免重复评估
- Returns:
- tuple[float, str]: (最终相关度分数, 综合评估理由)
- """
- # 检查缓存
- if cache is not None and text in cache:
- cached_score, cached_reason = cache[text]
- print(f" ⚡ 缓存命中: {text} -> {cached_score:.2f}")
- return cached_score, cached_reason
- # 准备输入
- eval_input = f"""
- <原始问题>
- {o}
- </原始问题>
- <平台sug词条>
- {text}
- </平台sug词条>
- 请评估平台sug词条与原始问题的匹配度。
- """
- # 添加重试机制
- max_retries = 2
- last_error = None
- for attempt in range(max_retries):
- try:
- # 并发调用两个评估器(统一使用标准评估策略)
- motivation_task = Runner.run(motivation_evaluator, eval_input)
- category_task = Runner.run(category_evaluator, eval_input)
- motivation_result, category_result = await asyncio.gather(
- motivation_task,
- category_task
- )
- # 获取评估结果
- motivation_eval: MotivationEvaluation = motivation_result.final_output
- category_eval: CategoryEvaluation = category_result.final_output
- # 提取得分
- motivation_score = motivation_eval.动机维度得分
- category_score = category_eval.品类维度得分
- # 计算基础得分
- base_score = motivation_score * 0.7 + category_score * 0.3
- # 应用规则计算最终得分
- final_score = calculate_final_score(motivation_score, category_score)
- # 组合评估理由
- core_motivation = motivation_eval.原始问题核心动机提取.简要说明核心动机
- motivation_reason = motivation_eval.简要说明动机维度相关度理由
- category_reason = category_eval.简要说明品类维度相关度理由
- combined_reason = (
- f"【核心动机】{core_motivation}\n"
- f"【动机维度 {motivation_score:.2f}】{motivation_reason}\n"
- f"【品类维度 {category_score:.2f}】{category_reason}\n"
- f"【基础得分 {base_score:.2f}】= 动机({motivation_score:.2f})*0.7 + 品类({category_score:.2f})*0.3\n"
- f"【最终得分 {final_score:.2f}】"
- )
- # 如果应用了规则,添加规则说明
- if final_score != base_score:
- if motivation_score < 0:
- combined_reason += "(应用规则C:动机负向决定机制)"
- elif motivation_score >= 0.8:
- combined_reason += "(应用规则A:动机高分保护机制)"
- elif motivation_score <= 0.2:
- combined_reason += "(应用规则B:动机低分限制机制)"
- # 存入缓存
- if cache is not None:
- cache[text] = (final_score, combined_reason)
- return final_score, combined_reason
- except Exception as e:
- last_error = e
- error_msg = str(e)
- if attempt < max_retries - 1:
- print(f" ⚠️ 评估失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:150]}")
- print(f" 正在重试...")
- await asyncio.sleep(1) # 等待1秒后重试
- else:
- print(f" ❌ 评估失败 (已达最大重试次数): {error_msg[:150]}")
- # 所有重试失败后,返回默认值
- fallback_reason = f"评估失败(重试{max_retries}次): {str(last_error)[:200]}"
- print(f" 使用默认值: score=0.0, reason={fallback_reason[:100]}...")
- return 0.0, fallback_reason
- # ============================================================================
- # 核心流程函数
- # ============================================================================
- async def initialize(o: str, context: RunContext) -> tuple[list[Seg], list[Word], list[Q], list[Seed]]:
- """
- 初始化阶段
- Returns:
- (seg_list, word_list_1, q_list_1, seed_list)
- """
- print(f"\n{'='*60}")
- print(f"初始化阶段")
- print(f"{'='*60}")
- # 1. 分词:原始问题(o) ->分词-> seg_list
- print(f"\n[步骤1] 分词...")
- result = await Runner.run(word_segmenter, o)
- segmentation: WordSegmentation = result.final_output
- seg_list = []
- for word in segmentation.words:
- seg_list.append(Seg(text=word, from_o=o))
- print(f"分词结果: {[s.text for s in seg_list]}")
- print(f"分词理由: {segmentation.reasoning}")
- # 2. 分词评估:seg_list -> 每个seg与o进行评分(使用信号量限制并发数)
- print(f"\n[步骤2] 评估每个分词与原始问题的相关度...")
- MAX_CONCURRENT_SEG_EVALUATIONS = 5
- seg_semaphore = asyncio.Semaphore(MAX_CONCURRENT_SEG_EVALUATIONS)
- async def evaluate_seg(seg: Seg) -> Seg:
- async with seg_semaphore:
- # 初始化阶段的分词评估使用第一轮 prompt (round_num=1)
- seg.score_with_o, seg.reason = await evaluate_with_o(seg.text, o, context.evaluation_cache, round_num=1)
- return seg
- if seg_list:
- print(f" 开始评估 {len(seg_list)} 个分词(并发限制: {MAX_CONCURRENT_SEG_EVALUATIONS})...")
- eval_tasks = [evaluate_seg(seg) for seg in seg_list]
- await asyncio.gather(*eval_tasks)
- for seg in seg_list:
- print(f" {seg.text}: {seg.score_with_o:.2f}")
- # 3. 构建word_list_1: seg_list -> word_list_1(固定词库)
- print(f"\n[步骤3] 构建word_list_1(固定词库)...")
- word_list_1 = []
- for seg in seg_list:
- word_list_1.append(Word(
- text=seg.text,
- score_with_o=seg.score_with_o,
- from_o=o
- ))
- print(f"word_list_1(固定): {[w.text for w in word_list_1]}")
- # 4. 构建q_list_1:seg_list 作为 q_list_1
- print(f"\n[步骤4] 构建q_list_1...")
- q_list_1 = []
- for seg in seg_list:
- q_list_1.append(Q(
- text=seg.text,
- score_with_o=seg.score_with_o,
- reason=seg.reason,
- from_source="seg"
- ))
- print(f"q_list_1: {[q.text for q in q_list_1]}")
- # 5. 构建seed_list: seg_list -> seed_list
- print(f"\n[步骤5] 构建seed_list...")
- seed_list = []
- for seg in seg_list:
- seed_list.append(Seed(
- text=seg.text,
- added_words=[],
- from_type="seg",
- score_with_o=seg.score_with_o
- ))
- print(f"seed_list: {[s.text for s in seed_list]}")
- return seg_list, word_list_1, q_list_1, seed_list
- async def run_round(
- round_num: int,
- q_list: list[Q],
- word_list_1: list[Word],
- seed_list: list[Seed],
- o: str,
- context: RunContext,
- xiaohongshu_api: XiaohongshuSearchRecommendations,
- xiaohongshu_search: XiaohongshuSearch,
- sug_threshold: float = 0.7
- ) -> tuple[list[Q], list[Seed], list[Search]]:
- """
- 运行一轮
- Args:
- round_num: 轮次编号
- q_list: 当前轮的q列表
- word_list_1: 固定的词库(第0轮分词结果)
- seed_list: 当前的seed列表
- o: 原始问题
- context: 运行上下文
- xiaohongshu_api: 建议词API
- xiaohongshu_search: 搜索API
- sug_threshold: suggestion的阈值
- Returns:
- (q_list_next, seed_list_next, search_list)
- """
- print(f"\n{'='*60}")
- print(f"第{round_num}轮")
- print(f"{'='*60}")
- round_data = {
- "round_num": round_num,
- "input_q_list": [{"text": q.text, "score": q.score_with_o, "type": "query"} for q in q_list],
- "input_word_list_1_size": len(word_list_1),
- "input_seed_list_size": len(seed_list)
- }
- # 1. 请求sug:q_list -> 每个q请求sug接口 -> sug_list_list
- print(f"\n[步骤1] 为每个q请求建议词...")
- sug_list_list = [] # list of list
- for q in q_list:
- print(f"\n 处理q: {q.text}")
- suggestions = xiaohongshu_api.get_recommendations(keyword=q.text)
- q_sug_list = []
- if suggestions:
- print(f" 获取到 {len(suggestions)} 个建议词")
- for sug_text in suggestions:
- sug = Sug(
- text=sug_text,
- from_q=QFromQ(text=q.text, score_with_o=q.score_with_o)
- )
- q_sug_list.append(sug)
- else:
- print(f" 未获取到建议词")
- sug_list_list.append(q_sug_list)
- # 2. sug评估:sug_list_list -> 每个sug与o进行评分(并发)
- print(f"\n[步骤2] 评估每个建议词与原始问题的相关度...")
- # 2.1 收集所有需要评估的sug,并记录它们所属的q
- all_sugs = []
- sug_to_q_map = {} # 记录每个sug属于哪个q
- for i, q_sug_list in enumerate(sug_list_list):
- if q_sug_list:
- q_text = q_list[i].text
- for sug in q_sug_list:
- all_sugs.append(sug)
- sug_to_q_map[id(sug)] = q_text
- # 2.2 并发评估所有sug(使用信号量限制并发数)
- # 每个 evaluate_sug 内部会并发调用 2 个 LLM,所以这里限制为 5,实际并发 LLM 请求为 10
- MAX_CONCURRENT_EVALUATIONS = 5
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS)
- async def evaluate_sug(sug: Sug) -> Sug:
- async with semaphore: # 限制并发数
- # 根据轮次选择 prompt: 第一轮使用 round1 prompt,后续使用标准 prompt
- sug.score_with_o, sug.reason = await evaluate_with_o(sug.text, o, context.evaluation_cache, round_num=round_num)
- return sug
- if all_sugs:
- print(f" 开始评估 {len(all_sugs)} 个建议词(并发限制: {MAX_CONCURRENT_EVALUATIONS})...")
- eval_tasks = [evaluate_sug(sug) for sug in all_sugs]
- await asyncio.gather(*eval_tasks)
- # 2.3 打印结果并组织到sug_details
- sug_details = {} # 保存每个Q对应的sug列表
- for i, q_sug_list in enumerate(sug_list_list):
- if q_sug_list:
- q_text = q_list[i].text
- print(f"\n 来自q '{q_text}' 的建议词:")
- sug_details[q_text] = []
- for sug in q_sug_list:
- print(f" {sug.text}: {sug.score_with_o:.2f}")
- # 保存到sug_details
- sug_details[q_text].append({
- "text": sug.text,
- "score": sug.score_with_o,
- "reason": sug.reason,
- "type": "sug"
- })
- # 2.4 剪枝判断(已禁用 - 保留所有分支)
- pruned_query_texts = set()
- if False: # 原: if round_num >= 2: # 剪枝功能已禁用,保留代码以便后续调整
- print(f"\n[剪枝判断] 第{round_num}轮开始应用剪枝策略...")
- for i, q in enumerate(q_list):
- q_sug_list = sug_list_list[i]
- if len(q_sug_list) == 0:
- continue # 没有sug则不剪枝
- # 剪枝条件1: 所有sug分数都低于query分数
- all_lower_than_query = all(sug.score_with_o < q.score_with_o for sug in q_sug_list)
- # 剪枝条件2: 所有sug分数都低于0.5
- all_below_threshold = all(sug.score_with_o < 0.5 for sug in q_sug_list)
- if all_lower_than_query and all_below_threshold:
- pruned_query_texts.add(q.text)
- max_sug_score = max(sug.score_with_o for sug in q_sug_list)
- print(f" 🔪 剪枝: {q.text} (query分数:{q.score_with_o:.2f}, sug最高分:{max_sug_score:.2f}, 全部<0.5)")
- if pruned_query_texts:
- print(f" 本轮共剪枝 {len(pruned_query_texts)} 个query")
- else:
- print(f" 本轮无query被剪枝")
- else:
- print(f"\n[剪枝判断] 剪枝功能已禁用,保留所有分支")
- # 3. search_list构建
- print(f"\n[步骤3] 构建search_list(阈值>{sug_threshold})...")
- search_list = []
- high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold]
- if high_score_sugs:
- print(f" 找到 {len(high_score_sugs)} 个高分建议词")
- # 并发搜索
- async def search_for_sug(sug: Sug) -> Search:
- print(f" 搜索: {sug.text}")
- try:
- search_result = xiaohongshu_search.search(keyword=sug.text)
- result_str = search_result.get("result", "{}")
- if isinstance(result_str, str):
- result_data = json.loads(result_str)
- else:
- result_data = result_str
- notes = result_data.get("data", {}).get("data", [])
- post_list = []
- for note in notes[:10]: # 只取前10个
- post = process_note_data(note)
- post_list.append(post)
- print(f" → 找到 {len(post_list)} 个帖子")
- return Search(
- text=sug.text,
- score_with_o=sug.score_with_o,
- from_q=sug.from_q,
- post_list=post_list
- )
- except Exception as e:
- print(f" ✗ 搜索失败: {e}")
- return Search(
- text=sug.text,
- score_with_o=sug.score_with_o,
- from_q=sug.from_q,
- post_list=[]
- )
- search_tasks = [search_for_sug(sug) for sug in high_score_sugs]
- search_list = await asyncio.gather(*search_tasks)
- else:
- print(f" 没有高分建议词,search_list为空")
- # 4. 构建q_list_next
- print(f"\n[步骤4] 构建q_list_next...")
- q_list_next = []
- existing_q_texts = set() # 用于去重
- add_word_details = {} # 保存每个seed对应的组合词列表
- all_seed_combinations = [] # 保存本轮所有seed的组合词(用于后续构建seed_list_next)
- # 4.1 对于seed_list中的每个seed,从word_list_1中选词组合,产生Top 5
- print(f"\n 4.1 为每个seed加词(产生Top 5组合)...")
- for seed in seed_list:
- print(f"\n 处理seed: {seed.text}")
- # 剪枝检查:跳过被剪枝的seed
- if seed.text in pruned_query_texts:
- print(f" ⊗ 跳过被剪枝的seed: {seed.text}")
- continue
- # 从固定词库word_list_1筛选候选词
- candidate_words = []
- for word in word_list_1:
- # 检查词是否已在seed中
- if word.text in seed.text:
- continue
- # 检查词是否已被添加过
- if word.text in seed.added_words:
- continue
- candidate_words.append(word)
- if not candidate_words:
- print(f" 没有可用的候选词")
- continue
- print(f" 候选词数量: {len(candidate_words)}")
- # 调用Agent一次性选择并组合Top 5(添加重试机制)
- candidate_words_text = ', '.join([w.text for w in candidate_words])
- selection_input = f"""
- <原始问题>
- {o}
- </原始问题>
- <当前Seed>
- {seed.text}
- </当前Seed>
- <候选词列表>
- {candidate_words_text}
- </候选词列表>
- 请从候选词列表中选择最多5个最合适的词,分别与当前seed组合成新的query。
- """
- # 重试机制
- max_retries = 2
- selection_result = None
- for attempt in range(max_retries):
- try:
- result = await Runner.run(word_selector, selection_input)
- selection_result = result.final_output
- break # 成功则跳出
- except Exception as e:
- error_msg = str(e)
- if attempt < max_retries - 1:
- print(f" ⚠️ 选词失败 (尝试 {attempt+1}/{max_retries}): {error_msg[:100]}")
- await asyncio.sleep(1)
- else:
- print(f" ❌ 选词失败,跳过该seed: {error_msg[:100]}")
- break
- if selection_result is None:
- print(f" 跳过seed: {seed.text}")
- continue
- print(f" Agent选择了 {len(selection_result.combinations)} 个组合")
- print(f" 整体选择思路: {selection_result.overall_reasoning}")
- # 并发评估所有组合的相关度
- async def evaluate_combination(comb: WordCombination) -> dict:
- combined = comb.combined_query
- # 验证:组合结果必须包含完整的seed和word
- # 检查是否包含seed的所有字符
- seed_chars_in_combined = all(char in combined for char in seed.text)
- # 检查是否包含word的所有字符
- word_chars_in_combined = all(char in combined for char in comb.selected_word)
- if not seed_chars_in_combined or not word_chars_in_combined:
- print(f" ⚠️ 警告:组合不完整")
- print(f" Seed: {seed.text}")
- print(f" Word: {comb.selected_word}")
- print(f" 组合: {combined}")
- print(f" 包含完整seed? {seed_chars_in_combined}")
- print(f" 包含完整word? {word_chars_in_combined}")
- # 返回极低分数,让这个组合不会被选中
- return {
- 'word': comb.selected_word,
- 'query': combined,
- 'score': -1.0, # 极低分数
- 'reason': f"组合不完整:缺少seed或word的部分内容",
- 'reasoning': comb.reasoning
- }
- # 正常评估,根据轮次选择 prompt
- score, reason = await evaluate_with_o(combined, o, context.evaluation_cache, round_num=round_num)
- return {
- 'word': comb.selected_word,
- 'query': combined,
- 'score': score,
- 'reason': reason,
- 'reasoning': comb.reasoning
- }
- eval_tasks = [evaluate_combination(comb) for comb in selection_result.combinations]
- top_5 = await asyncio.gather(*eval_tasks)
- print(f" 评估完成,得到 {len(top_5)} 个组合")
- # 将Top 5全部加入q_list_next(去重检查 + 得分过滤)
- for comb in top_5:
- # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才能加入下一轮
- if comb['score'] < seed.score_with_o + REQUIRED_SCORE_GAIN:
- print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})")
- continue
- # 去重检查
- if comb['query'] in existing_q_texts:
- print(f" ⊗ 跳过重复: {comb['query']}")
- continue
- print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} > 种子: {seed.score_with_o:.2f})")
- new_q = Q(
- text=comb['query'],
- score_with_o=comb['score'],
- reason=comb['reason'],
- from_source="add"
- )
- q_list_next.append(new_q)
- existing_q_texts.add(comb['query']) # 记录到去重集合
- # 记录已添加的词
- seed.added_words.append(comb['word'])
- # 保存到add_word_details
- add_word_details[seed.text] = [
- {
- "text": comb['query'],
- "score": comb['score'],
- "reason": comb['reason'],
- "selected_word": comb['word'],
- "seed_score": seed.score_with_o, # 添加原始种子的得分
- "type": "add"
- }
- for comb in top_5
- ]
- # 保存到all_seed_combinations(用于构建seed_list_next)
- # 附加seed_score,用于后续过滤
- for comb in top_5:
- comb['seed_score'] = seed.score_with_o
- all_seed_combinations.extend(top_5)
- # 4.2 对于sug_list_list中,每个sug大于来自的query分数,加到q_list_next(去重检查)
- print(f"\n 4.2 将高分sug加入q_list_next...")
- for sug in all_sugs:
- # 剪枝检查:跳过来自被剪枝query的sug
- if sug.from_q and sug.from_q.text in pruned_query_texts:
- print(f" ⊗ 跳过来自被剪枝query的sug: {sug.text} (来源: {sug.from_q.text})")
- continue
- # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才能加入下一轮
- if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN:
- # 去重检查
- if sug.text in existing_q_texts:
- print(f" ⊗ 跳过重复: {sug.text}")
- continue
- new_q = Q(
- text=sug.text,
- score_with_o=sug.score_with_o,
- reason=sug.reason,
- from_source="sug"
- )
- q_list_next.append(new_q)
- existing_q_texts.add(sug.text) # 记录到去重集合
- print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})")
- # 5. 构建seed_list_next(关键修改:不保留上一轮的seed)
- print(f"\n[步骤5] 构建seed_list_next(不保留上轮seed)...")
- seed_list_next = []
- existing_seed_texts = set()
- # 5.1 加入本轮所有组合词(只加入得分提升的)
- print(f" 5.1 加入本轮所有组合词(得分过滤)...")
- for comb in all_seed_combinations:
- # 得分过滤:组合词必须比种子提升至少REQUIRED_SCORE_GAIN才作为下一轮种子
- seed_score = comb.get('seed_score', 0)
- if comb['score'] < seed_score + REQUIRED_SCORE_GAIN:
- print(f" ⊗ 跳过低分: {comb['query']} (分数{comb['score']:.2f} < 种子{seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})")
- continue
- if comb['query'] not in existing_seed_texts:
- new_seed = Seed(
- text=comb['query'],
- added_words=[], # 新seed的added_words清空
- from_type="add",
- score_with_o=comb['score']
- )
- seed_list_next.append(new_seed)
- existing_seed_texts.add(comb['query'])
- print(f" ✓ {comb['query']} (分数: {comb['score']:.2f} >= 种子: {seed_score:.2f} + {REQUIRED_SCORE_GAIN:.2f})")
- # 5.2 加入高分sug
- print(f" 5.2 加入高分sug...")
- for sug in all_sugs:
- # 剪枝检查:跳过来自被剪枝query的sug
- if sug.from_q and sug.from_q.text in pruned_query_texts:
- continue
- # sug必须比来源query提升至少REQUIRED_SCORE_GAIN才作为下一轮种子
- if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN and sug.text not in existing_seed_texts:
- new_seed = Seed(
- text=sug.text,
- added_words=[],
- from_type="sug",
- score_with_o=sug.score_with_o
- )
- seed_list_next.append(new_seed)
- existing_seed_texts.add(sug.text)
- print(f" ✓ {sug.text} (分数: {sug.score_with_o:.2f} >= 来源query: {sug.from_q.score_with_o:.2f} + {REQUIRED_SCORE_GAIN:.2f})")
- # 序列化搜索结果数据(包含帖子详情)
- search_results_data = []
- for search in search_list:
- search_results_data.append({
- "text": search.text,
- "score_with_o": search.score_with_o,
- "post_list": [
- {
- "note_id": post.note_id,
- "note_url": post.note_url,
- "title": post.title,
- "body_text": post.body_text,
- "images": post.images,
- "interact_info": post.interact_info
- }
- for post in search.post_list
- ]
- })
- # 记录本轮数据
- round_data.update({
- "sug_count": len(all_sugs),
- "high_score_sug_count": len(high_score_sugs),
- "search_count": len(search_list),
- "total_posts": sum(len(s.post_list) for s in search_list),
- "q_list_next_size": len(q_list_next),
- "seed_list_next_size": len(seed_list_next),
- "total_combinations": len(all_seed_combinations),
- "pruned_query_count": len(pruned_query_texts),
- "pruned_queries": list(pruned_query_texts),
- "output_q_list": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "from": q.from_source, "type": "query"} for q in q_list_next],
- "seed_list_next": [{"text": seed.text, "from": seed.from_type, "score": seed.score_with_o} for seed in seed_list_next],
- "sug_details": sug_details,
- "add_word_details": add_word_details,
- "search_results": search_results_data
- })
- context.rounds.append(round_data)
- print(f"\n本轮总结:")
- print(f" 建议词数量: {len(all_sugs)}")
- print(f" 高分建议词: {len(high_score_sugs)}")
- print(f" 搜索数量: {len(search_list)}")
- print(f" 帖子总数: {sum(len(s.post_list) for s in search_list)}")
- print(f" 组合词数量: {len(all_seed_combinations)}")
- print(f" 下轮q数量: {len(q_list_next)}")
- print(f" 下轮seed数量: {len(seed_list_next)}")
- return q_list_next, seed_list_next, search_list
- async def iterative_loop(
- context: RunContext,
- max_rounds: int = 2,
- sug_threshold: float = 0.7
- ):
- """主迭代循环"""
- print(f"\n{'='*60}")
- print(f"开始迭代循环")
- print(f"最大轮数: {max_rounds}")
- print(f"sug阈值: {sug_threshold}")
- print(f"{'='*60}")
- # 初始化
- seg_list, word_list_1, q_list, seed_list = await initialize(context.o, context)
- # API实例
- xiaohongshu_api = XiaohongshuSearchRecommendations()
- xiaohongshu_search = XiaohongshuSearch()
- # 保存初始化数据
- context.rounds.append({
- "round_num": 0,
- "type": "initialization",
- "seg_list": [{"text": s.text, "score": s.score_with_o, "reason": s.reason, "type": "seg"} for s in seg_list],
- "word_list_1": [{"text": w.text, "score": w.score_with_o} for w in word_list_1],
- "q_list_1": [{"text": q.text, "score": q.score_with_o, "reason": q.reason, "type": "query"} for q in q_list],
- "seed_list": [{"text": s.text, "from_type": s.from_type, "score": s.score_with_o, "type": "seed"} for s in seed_list]
- })
- # 收集所有搜索结果
- all_search_list = []
- # 迭代
- round_num = 1
- while q_list and round_num <= max_rounds:
- q_list, seed_list, search_list = await run_round(
- round_num=round_num,
- q_list=q_list,
- word_list_1=word_list_1, # 传递固定词库
- seed_list=seed_list,
- o=context.o,
- context=context,
- xiaohongshu_api=xiaohongshu_api,
- xiaohongshu_search=xiaohongshu_search,
- sug_threshold=sug_threshold
- )
- all_search_list.extend(search_list)
- round_num += 1
- print(f"\n{'='*60}")
- print(f"迭代完成")
- print(f" 总轮数: {round_num - 1}")
- print(f" 总搜索次数: {len(all_search_list)}")
- print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}")
- print(f"{'='*60}")
- return all_search_list
- # ============================================================================
- # v121 新架构核心流程函数
- # ============================================================================
- async def initialize_v2(o: str, context: RunContext) -> list[Segment]:
- """
- v121 Round 0 初始化阶段
- 流程:
- 1. 语义分段: 调用 semantic_segmenter 将原始问题拆分成语义片段
- 2. 拆词: 对每个segment调用 word_segmenter 进行拆词
- 3. 评估: 对每个segment和词进行评估
- 4. 不进行组合(Round 0只分段和拆词)
- Returns:
- 语义片段列表 (Segment)
- """
- print(f"\n{'='*60}")
- print(f"Round 0: 初始化阶段(语义分段 + 拆词)")
- print(f"{'='*60}")
- # 1. 语义分段
- print(f"\n[步骤1] 语义分段...")
- result = await Runner.run(semantic_segmenter, o)
- segmentation: SemanticSegmentation = result.final_output
- print(f"语义分段结果: {len(segmentation.segments)} 个片段")
- print(f"整体分段思路: {segmentation.overall_reasoning}")
- segment_list = []
- for seg_item in segmentation.segments:
- segment = Segment(
- text=seg_item.segment_text,
- type=seg_item.segment_type,
- from_o=o
- )
- segment_list.append(segment)
- print(f" - [{segment.type}] {segment.text}")
- # 2. 对每个segment拆词并评估
- print(f"\n[步骤2] 对每个segment拆词并评估...")
- MAX_CONCURRENT_EVALUATIONS = 5
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS)
- async def process_segment(segment: Segment) -> Segment:
- """处理单个segment: 拆词 + 评估segment + 评估词"""
- async with semaphore:
- # 2.1 拆词
- word_result = await Runner.run(word_segmenter, segment.text)
- word_segmentation: WordSegmentation = word_result.final_output
- segment.words = word_segmentation.words
- # 2.2 评估segment与原始问题的相关度
- segment.score_with_o, segment.reason = await evaluate_with_o(
- segment.text, o, context.evaluation_cache
- )
- # 2.3 评估每个词与原始问题的相关度
- word_eval_tasks = []
- for word in segment.words:
- async def eval_word(w: str) -> tuple[str, float, str]:
- score, reason = await evaluate_with_o(w, o, context.evaluation_cache)
- return w, score, reason
- word_eval_tasks.append(eval_word(word))
- word_results = await asyncio.gather(*word_eval_tasks)
- for word, score, reason in word_results:
- segment.word_scores[word] = score
- segment.word_reasons[word] = reason
- return segment
- if segment_list:
- print(f" 开始处理 {len(segment_list)} 个segment(并发限制: {MAX_CONCURRENT_EVALUATIONS})...")
- process_tasks = [process_segment(seg) for seg in segment_list]
- await asyncio.gather(*process_tasks)
- # 打印步骤1结果
- print(f"\n[步骤1: 分段及拆词 结果]")
- for segment in segment_list:
- print(f" [{segment.type}] {segment.text} (分数: {segment.score_with_o:.2f})")
- print(f" 拆词: {segment.words}")
- for word in segment.words:
- score = segment.word_scores.get(word, 0.0)
- print(f" - {word}: {score:.2f}")
- # 保存到context(保留旧格式以兼容)
- context.segments = [
- {
- "text": seg.text,
- "type": seg.type,
- "score": seg.score_with_o,
- "reason": seg.reason,
- "words": seg.words,
- "word_scores": seg.word_scores,
- "word_reasons": seg.word_reasons
- }
- for seg in segment_list
- ]
- # 保存 Round 0 到 context.rounds(新格式用于可视化)
- context.rounds.append({
- "round_num": 0,
- "type": "initialization",
- "segments": [
- {
- "text": seg.text,
- "type": seg.type,
- "domain_index": idx,
- "score": seg.score_with_o,
- "reason": seg.reason,
- "words": [
- {
- "text": word,
- "score": seg.word_scores.get(word, 0.0),
- "reason": seg.word_reasons.get(word, "")
- }
- for word in seg.words
- ]
- }
- for idx, seg in enumerate(segment_list)
- ]
- })
- print(f"\n[Round 0 完成]")
- print(f" 分段数: {len(segment_list)}")
- total_words = sum(len(seg.words) for seg in segment_list)
- print(f" 总词数: {total_words}")
- return segment_list
- async def run_round_v2(
- round_num: int,
- query_input: list[Q],
- segments: list[Segment],
- o: str,
- context: RunContext,
- xiaohongshu_api: XiaohongshuSearchRecommendations,
- xiaohongshu_search: XiaohongshuSearch,
- sug_threshold: float = 0.7
- ) -> tuple[list[Q], list[Search]]:
- """
- v121 Round N 执行
- 正确的流程顺序:
- 1. 为 query_input 请求SUG
- 2. 评估SUG
- 3. 高分SUG搜索
- 4. N域组合(从segments生成)
- 5. 评估组合
- 6. 生成 q_list_next(组合 + 高分SUG)
- Args:
- round_num: 轮次编号 (1-4)
- query_input: 本轮的输入query列表(Round 1是words,Round 2+是上轮输出)
- segments: 语义片段列表(用于组合)
- o: 原始问题
- context: 运行上下文
- xiaohongshu_api: 建议词API
- xiaohongshu_search: 搜索API
- sug_threshold: SUG搜索阈值
- Returns:
- (q_list_next, search_list)
- """
- print(f"\n{'='*60}")
- print(f"Round {round_num}: {round_num}域组合")
- print(f"{'='*60}")
- round_data = {
- "round_num": round_num,
- "n_domains": round_num,
- "input_query_count": len(query_input)
- }
- MAX_CONCURRENT_EVALUATIONS = 5
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_EVALUATIONS)
- # 步骤1: 为 query_input 请求SUG
- print(f"\n[步骤1] 为{len(query_input)}个输入query请求SUG...")
- all_sugs = []
- sug_details = {}
- for q in query_input:
- suggestions = xiaohongshu_api.get_recommendations(keyword=q.text)
- if suggestions:
- print(f" {q.text}: 获取到 {len(suggestions)} 个SUG")
- for sug_text in suggestions:
- sug = Sug(
- text=sug_text,
- from_q=QFromQ(text=q.text, score_with_o=q.score_with_o)
- )
- all_sugs.append(sug)
- else:
- print(f" {q.text}: 未获取到SUG")
- print(f" 共获取 {len(all_sugs)} 个SUG")
- # 步骤2: 评估SUG
- if len(all_sugs) > 0:
- print(f"\n[步骤2] 评估{len(all_sugs)}个SUG...")
- async def evaluate_sug(sug: Sug) -> Sug:
- async with semaphore:
- sug.score_with_o, sug.reason = await evaluate_with_o(
- sug.text, o, context.evaluation_cache
- )
- return sug
- eval_tasks = [evaluate_sug(sug) for sug in all_sugs]
- await asyncio.gather(*eval_tasks)
- # 打印结果
- for sug in all_sugs:
- print(f" {sug.text}: {sug.score_with_o:.2f}")
- if sug.from_q:
- if sug.from_q.text not in sug_details:
- sug_details[sug.from_q.text] = []
- sug_details[sug.from_q.text].append({
- "text": sug.text,
- "score": sug.score_with_o,
- "reason": sug.reason,
- "type": "sug"
- })
- # 步骤3: 搜索高分SUG
- print(f"\n[步骤3] 搜索高分SUG(阈值 > {sug_threshold})...")
- high_score_sugs = [sug for sug in all_sugs if sug.score_with_o > sug_threshold]
- print(f" 找到 {len(high_score_sugs)} 个高分SUG")
- search_list = []
- if len(high_score_sugs) > 0:
- async def search_for_sug(sug: Sug) -> Search:
- print(f" 搜索: {sug.text}")
- try:
- search_result = xiaohongshu_search.search(keyword=sug.text)
- result_str = search_result.get("result", "{}")
- if isinstance(result_str, str):
- result_data = json.loads(result_str)
- else:
- result_data = result_str
- notes = result_data.get("data", {}).get("data", [])
- post_list = []
- for note in notes[:10]:
- post = process_note_data(note)
- post_list.append(post)
- print(f" → 找到 {len(post_list)} 个帖子")
- return Search(
- text=sug.text,
- score_with_o=sug.score_with_o,
- from_q=sug.from_q,
- post_list=post_list
- )
- except Exception as e:
- print(f" ✗ 搜索失败: {e}")
- return Search(
- text=sug.text,
- score_with_o=sug.score_with_o,
- from_q=sug.from_q,
- post_list=[]
- )
- search_tasks = [search_for_sug(sug) for sug in high_score_sugs]
- search_list = await asyncio.gather(*search_tasks)
- # 步骤4: 生成N域组合
- print(f"\n[步骤4] 生成{round_num}域组合...")
- domain_combinations = generate_domain_combinations(segments, round_num)
- print(f" 生成了 {len(domain_combinations)} 个组合")
- if len(domain_combinations) == 0:
- print(f" 无法生成{round_num}域组合")
- # 即使无法组合,也返回高分SUG作为下轮输入
- q_list_next = []
- for sug in all_sugs:
- if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN:
- q = Q(
- text=sug.text,
- score_with_o=sug.score_with_o,
- reason=sug.reason,
- from_source="sug",
- type_label=""
- )
- q_list_next.append(q)
- round_data.update({
- "domain_combinations_count": 0,
- "sug_count": len(all_sugs),
- "high_score_sug_count": len(high_score_sugs),
- "search_count": len(search_list),
- "sug_details": sug_details,
- "q_list_next_size": len(q_list_next)
- })
- context.rounds.append(round_data)
- return q_list_next, search_list
- # 步骤5: 评估所有组合
- print(f"\n[步骤5] 评估{len(domain_combinations)}个组合...")
- async def evaluate_combination(comb: DomainCombination) -> DomainCombination:
- async with semaphore:
- comb.score_with_o, comb.reason = await evaluate_with_o(
- comb.text, o, context.evaluation_cache
- )
- return comb
- eval_tasks = [evaluate_combination(comb) for comb in domain_combinations]
- await asyncio.gather(*eval_tasks)
- # 排序
- domain_combinations.sort(key=lambda x: x.score_with_o, reverse=True)
- # 打印Top 10
- print(f" 评估完成,Top 10:")
- for i, comb in enumerate(domain_combinations[:10], 1):
- print(f" {i}. {comb.text} {comb.type_label} (分数: {comb.score_with_o:.2f})")
- # 步骤6: 构建 q_list_next(组合 + 高分SUG)
- print(f"\n[步骤6] 生成下轮输入...")
- q_list_next = []
- # 6.1 添加高分组合
- high_score_combinations = [comb for comb in domain_combinations if comb.score_with_o > REQUIRED_SCORE_GAIN]
- for comb in high_score_combinations:
- q = Q(
- text=comb.text,
- score_with_o=comb.score_with_o,
- reason=comb.reason,
- from_source="domain_comb",
- type_label=comb.type_label
- )
- q_list_next.append(q)
- print(f" 添加 {len(high_score_combinations)} 个高分组合")
- # 6.2 添加高分SUG(满足增益条件)
- high_gain_sugs = []
- for sug in all_sugs:
- if sug.from_q and sug.score_with_o >= sug.from_q.score_with_o + REQUIRED_SCORE_GAIN:
- q = Q(
- text=sug.text,
- score_with_o=sug.score_with_o,
- reason=sug.reason,
- from_source="sug",
- type_label=""
- )
- q_list_next.append(q)
- high_gain_sugs.append(sug)
- print(f" 添加 {len(high_gain_sugs)} 个高增益SUG(增益 > {REQUIRED_SCORE_GAIN})")
- # 保存round数据
- search_results_data = [
- {
- "text": search.text,
- "score_with_o": search.score_with_o,
- "post_count": len(search.post_list)
- }
- for search in search_list
- ]
- round_data.update({
- "input_queries": [{"text": q.text, "score": q.score_with_o, "from_source": q.from_source, "type": "input"} for q in query_input],
- "domain_combinations_count": len(domain_combinations),
- "domain_combinations_top10": [
- {
- "text": comb.text,
- "type_label": comb.type_label,
- "score": comb.score_with_o,
- "reason": comb.reason,
- "domains": comb.domains,
- "source_words": comb.source_words,
- "from_segments": comb.from_segments
- }
- for comb in domain_combinations[:10]
- ],
- "high_score_combinations": [{"text": q.text, "score": q.score_with_o, "type_label": q.type_label, "type": "combination"} for q in q_list_next if q.from_source == "domain_comb"],
- "sug_count": len(all_sugs),
- "sug_details": sug_details,
- "high_score_sug_count": len(high_score_sugs),
- "high_gain_sugs": [{"text": q.text, "score": q.score_with_o, "type": "sug"} for q in q_list_next if q.from_source == "sug"],
- "search_count": len(search_list),
- "search_results": search_results_data,
- "q_list_next_size": len(q_list_next)
- })
- context.rounds.append(round_data)
- print(f"\nRound {round_num} 总结:")
- print(f" 输入Query数: {len(query_input)}")
- print(f" 域组合数: {len(domain_combinations)}")
- print(f" 高分组合: {len(high_score_combinations)}")
- print(f" SUG数: {len(all_sugs)}")
- print(f" 高分SUG数: {len(high_score_sugs)}")
- print(f" 高增益SUG: {len(high_gain_sugs)}")
- print(f" 搜索数: {len(search_list)}")
- print(f" 下轮Query数: {len(q_list_next)}")
- return q_list_next, search_list
- async def iterative_loop_v2(
- context: RunContext,
- max_rounds: int = 4,
- sug_threshold: float = 0.7
- ):
- """v121 主迭代循环"""
- print(f"\n{'='*60}")
- print(f"开始v121迭代循环(语义分段跨域组词版)")
- print(f"最大轮数: {max_rounds}")
- print(f"sug阈值: {sug_threshold}")
- print(f"{'='*60}")
- # Round 0: 初始化(语义分段 + 拆词)
- segments = await initialize_v2(context.o, context)
- # API实例
- xiaohongshu_api = XiaohongshuSearchRecommendations()
- xiaohongshu_search = XiaohongshuSearch()
- # 收集所有搜索结果
- all_search_list = []
- # 准备 Round 1 的输入:从 segments 提取所有 words
- query_input = extract_words_from_segments(segments)
- print(f"\n提取了 {len(query_input)} 个词作为 Round 1 的输入")
- # Round 1-N: 迭代循环
- num_segments = len(segments)
- actual_max_rounds = min(max_rounds, num_segments)
- round_num = 1
- while query_input and round_num <= actual_max_rounds:
- query_input, search_list = await run_round_v2(
- round_num=round_num,
- query_input=query_input, # 传递上一轮的输出
- segments=segments,
- o=context.o,
- context=context,
- xiaohongshu_api=xiaohongshu_api,
- xiaohongshu_search=xiaohongshu_search,
- sug_threshold=sug_threshold
- )
- all_search_list.extend(search_list)
- # 如果没有新的query,提前结束
- if not query_input:
- print(f"\n第{round_num}轮后无新query生成,提前结束迭代")
- break
- round_num += 1
- print(f"\n{'='*60}")
- print(f"迭代完成")
- print(f" 实际轮数: {round_num}")
- print(f" 总搜索次数: {len(all_search_list)}")
- print(f" 总帖子数: {sum(len(s.post_list) for s in all_search_list)}")
- print(f"{'='*60}")
- return all_search_list
- # ============================================================================
- # 主函数
- # ============================================================================
- async def main(input_dir: str, max_rounds: int = 2, sug_threshold: float = 0.7, visualize: bool = False):
- """主函数"""
- current_time, log_url = set_trace()
- # 读取输入
- input_context_file = os.path.join(input_dir, 'context.md')
- input_q_file = os.path.join(input_dir, 'q.md')
- c = read_file_as_string(input_context_file) # 原始需求
- o = read_file_as_string(input_q_file) # 原始问题
- # 版本信息
- version = os.path.basename(__file__)
- version_name = os.path.splitext(version)[0]
- # 日志目录
- log_dir = os.path.join(input_dir, "output", version_name, current_time)
- # 创建运行上下文
- run_context = RunContext(
- version=version,
- input_files={
- "input_dir": input_dir,
- "context_file": input_context_file,
- "q_file": input_q_file,
- },
- c=c,
- o=o,
- log_dir=log_dir,
- log_url=log_url,
- )
- # 创建日志目录
- os.makedirs(run_context.log_dir, exist_ok=True)
- # 配置日志文件
- log_file_path = os.path.join(run_context.log_dir, "run.log")
- log_file = open(log_file_path, 'w', encoding='utf-8')
- # 重定向stdout到TeeLogger(同时输出到控制台和文件)
- original_stdout = sys.stdout
- sys.stdout = TeeLogger(original_stdout, log_file)
- try:
- print(f"📝 日志文件: {log_file_path}")
- print(f"{'='*60}\n")
- # 执行迭代 (v121: 使用新架构)
- all_search_list = await iterative_loop_v2(
- run_context,
- max_rounds=max_rounds,
- sug_threshold=sug_threshold
- )
- # 格式化输出
- output = f"原始需求:{run_context.c}\n"
- output += f"原始问题:{run_context.o}\n"
- output += f"总搜索次数:{len(all_search_list)}\n"
- output += f"总帖子数:{sum(len(s.post_list) for s in all_search_list)}\n"
- output += "\n" + "="*60 + "\n"
- if all_search_list:
- output += "【搜索结果】\n\n"
- for idx, search in enumerate(all_search_list, 1):
- output += f"{idx}. 搜索词: {search.text} (分数: {search.score_with_o:.2f})\n"
- output += f" 帖子数: {len(search.post_list)}\n"
- if search.post_list:
- for post_idx, post in enumerate(search.post_list[:3], 1): # 只显示前3个
- output += f" {post_idx}) {post.title}\n"
- output += f" URL: {post.note_url}\n"
- output += "\n"
- else:
- output += "未找到搜索结果\n"
- run_context.final_output = output
- print(f"\n{'='*60}")
- print("最终结果")
- print(f"{'='*60}")
- print(output)
- # 保存上下文文件
- context_file_path = os.path.join(run_context.log_dir, "run_context.json")
- context_dict = run_context.model_dump()
- with open(context_file_path, "w", encoding="utf-8") as f:
- json.dump(context_dict, f, ensure_ascii=False, indent=2)
- print(f"\nRunContext saved to: {context_file_path}")
- # 保存详细的搜索结果
- search_results_path = os.path.join(run_context.log_dir, "search_results.json")
- search_results_data = [s.model_dump() for s in all_search_list]
- with open(search_results_path, "w", encoding="utf-8") as f:
- json.dump(search_results_data, f, ensure_ascii=False, indent=2)
- print(f"Search results saved to: {search_results_path}")
- # 可视化
- if visualize:
- import subprocess
- output_html = os.path.join(run_context.log_dir, "visualization.html")
- print(f"\n🎨 生成可视化HTML...")
- # 获取绝对路径
- abs_context_file = os.path.abspath(context_file_path)
- abs_output_html = os.path.abspath(output_html)
- # 运行可视化脚本
- result = subprocess.run([
- "node",
- "visualization/sug_v6_1_2_121/index.js",
- abs_context_file,
- abs_output_html
- ])
- if result.returncode == 0:
- print(f"✅ 可视化已生成: {output_html}")
- else:
- print(f"❌ 可视化生成失败")
- finally:
- # 恢复stdout
- sys.stdout = original_stdout
- log_file.close()
- print(f"\n📝 运行日志已保存: {log_file_path}")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="搜索query优化工具 - v6.1.2.121 语义分段跨域组词版")
- parser.add_argument(
- "--input-dir",
- type=str,
- default="input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?",
- help="输入目录路径,默认: input/旅游-逸趣玩旅行/如何获取能体现川西秋季特色的高质量风光摄影素材?"
- )
- parser.add_argument(
- "--max-rounds",
- type=int,
- default=4,
- help="最大轮数,默认: 4"
- )
- parser.add_argument(
- "--sug-threshold",
- type=float,
- default=0.7,
- help="suggestion阈值,默认: 0.7"
- )
- parser.add_argument(
- "--visualize",
- action="store_true",
- default=True,
- help="运行完成后自动生成可视化HTML"
- )
- args = parser.parse_args()
- asyncio.run(main(args.input_dir, max_rounds=args.max_rounds, sug_threshold=args.sug_threshold, visualize=args.visualize))
|