| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 增强搜索系统 V2
- 支持LLM评估和扩展搜索的完整流程
- """
- import json
- import logging
- import copy
- import time
- import os
- import argparse
- import subprocess
- from typing import Dict, List, Any, Optional, Set, Tuple
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from openrouter_client import OpenRouterClient
- from llm_evaluator import LLMEvaluator
- from xiaohongshu_search import XiaohongshuSearch
- from stage7_analyzer import Stage7DeconstructionAnalyzer
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- logging.FileHandler('enhanced_search_v2.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class EnhancedSearchV2:
- """增强搜索系统V2"""
- def __init__(
- self,
- how_json_path: str,
- dimension_associations_path: str,
- intra_associations_path: str,
- optimized_clustered_data_path: str,
- openrouter_api_key: Optional[str] = None,
- output_dir: str = "output_v2",
- top_n: int = 10,
- max_total_searches: Optional[int] = None,
- search_max_workers: int = 3,
- max_searches_per_feature: Optional[int] = None,
- max_searches_per_base_word: Optional[int] = None,
- combination_source: str = "how_based",
- enable_stage6: bool = False,
- stage6_max_workers: int = 10,
- stage6_max_notes: int = 20,
- enable_stage7: bool = False,
- stage7_only: bool = False,
- stage7_max_workers: int = 5,
- stage7_max_notes: Optional[int] = None,
- stage7_skip: int = 0,
- stage7_sort_by: str = 'score',
- stage7_api_url: str = "http://192.168.245.150:7000/what/analysis/single",
- stage7_min_score: float = 0.8
- ):
- """
- 初始化系统
- Args:
- how_json_path: How解构文件路径
- dimension_associations_path: 维度关联文件路径
- intra_associations_path: 维度内关联文件路径
- optimized_clustered_data_path: 人设特征库路径
- openrouter_api_key: OpenRouter API密钥
- output_dir: 输出目录
- top_n: 每个原始特征取评分最高的N个搜索词(默认10)
- max_total_searches: 全局最大搜索次数限制(默认None不限制)
- search_max_workers: 搜索并发数(默认3)
- max_searches_per_feature: 每个原始特征的最大搜索次数(默认None不限制)
- max_searches_per_base_word: 每个base_word的最大搜索次数(默认None不限制)
- combination_source: 组合词来源方式(默认how_based)
- - "how_based": 从how文件提取相似度>=0.8的候选词(新方式,默认)
- - "association": 基于关联分析提取候选词(旧方式)
- enable_stage6: 是否启用Stage 6评估(默认False)
- stage6_max_workers: Stage 6并发评估数(默认10)
- stage6_max_notes: 每个搜索结果评估的最大帖子数(默认20)
- enable_stage7: 是否启用Stage 7深度解构(默认False)
- stage7_only: 只运行Stage 7(从Stage 6结果开始,默认False)
- stage7_max_workers: Stage 7并发数(默认5)
- stage7_max_notes: Stage 7最多处理多少个帖子(默认None不限制)
- stage7_skip: Stage 7跳过前N个帖子(默认0)
- stage7_sort_by: Stage 7排序方式:score/time/engagement(默认score)
- stage7_api_url: Stage 7解构API地址
- stage7_min_score: Stage 7处理的最低分数阈值(默认0.8,0-1分制)
- """
- self.how_json_path = how_json_path
- self.dimension_associations_path = dimension_associations_path
- self.intra_associations_path = intra_associations_path
- self.optimized_clustered_data_path = optimized_clustered_data_path
- self.output_dir = output_dir
- self.top_n = top_n
- self.max_total_searches = max_total_searches
- self.search_max_workers = search_max_workers
- self.max_searches_per_feature = max_searches_per_feature
- self.max_searches_per_base_word = max_searches_per_base_word
- self.combination_source = combination_source
- self.enable_stage6 = enable_stage6
- self.stage6_max_workers = stage6_max_workers
- self.stage6_max_notes = stage6_max_notes
- self.enable_stage7 = enable_stage7
- self.stage7_only = stage7_only
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- # 加载数据
- logger.info("加载数据文件...")
- self.how_data = self._load_json(how_json_path)
- self.dimension_associations = self._load_json(dimension_associations_path)
- self.intra_associations = self._load_json(intra_associations_path)
- self.optimized_clustered_data = self._load_json(optimized_clustered_data_path)
- # 初始化组件
- logger.info("初始化组件...")
- self.openrouter_client = OpenRouterClient(
- api_key=openrouter_api_key,
- model="google/gemini-2.5-flash",
- retry_delay=5 # 增加重试延迟避免限流
- )
- self.llm_evaluator = LLMEvaluator(self.openrouter_client)
- self.search_client = XiaohongshuSearch()
- # 初始化 Stage 7 分析器
- self.stage7_analyzer = Stage7DeconstructionAnalyzer(
- api_url=stage7_api_url,
- max_workers=stage7_max_workers,
- max_notes=stage7_max_notes,
- min_score=stage7_min_score,
- skip_count=stage7_skip,
- sort_by=stage7_sort_by,
- output_dir=output_dir,
- enable_image_download=False, # 直接使用原始图片URL,不做代理
- image_server_url="http://localhost:8765", # 图片服务器URL(已弃用)
- image_download_dir="downloaded_images" # 图片下载目录(已弃用)
- )
- logger.info("系统初始化完成")
- def _load_json(self, file_path: str) -> Any:
- """加载JSON文件"""
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"加载文件失败 {file_path}: {e}")
- raise
- def _save_json(self, data: Any, file_path: str):
- """保存JSON文件"""
- try:
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- logger.info(f"已保存: {file_path}")
- except Exception as e:
- logger.error(f"保存文件失败 {file_path}: {e}")
- raise
- # ========== 阶段1:筛选 0.5 <= 相似度 < 0.8 的特征 ==========
- def stage1_filter_features(self) -> List[Dict[str, Any]]:
- """
- 阶段1:筛选中等匹配度特征
- 筛选条件:0.5 <= 最高相似度 < 0.8
- Returns:
- 筛选后的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段1:筛选中等匹配度特征 (0.5 <= 相似度 < 0.8)")
- logger.info("=" * 60)
- results = []
- how_result = self.how_data.get('how解构结果', {})
- total_features = 0
- filtered_out_low = 0 # < 0.5
- filtered_out_high = 0 # >= 0.8
- selected_count = 0
- # 遍历三个维度
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- logger.info(f"\n处理 {level_name}...")
- for item_idx, item in enumerate(level_list):
- item_name = item.get('名称', f'未命名-{item_idx}')
- how_steps = item.get('how步骤列表', [])
- for step in how_steps:
- features = step.get('特征列表', [])
- for feature in features:
- feature_name = feature.get('特征名称', '')
- match_results = feature.get('匹配结果', [])
- total_features += 1
- if not match_results:
- continue
- # 找到最高相似度
- max_similarity = max(
- (m.get('匹配结果', {}).get('相似度', 0) for m in match_results),
- default=0
- )
- # 筛选条件
- if max_similarity < 0.5:
- filtered_out_low += 1
- continue
- elif max_similarity >= 0.8:
- filtered_out_high += 1
- continue
- # 0.5 <= max_similarity < 0.8,保留
- # 按相似度降序排序,取前3个
- sorted_matches = sorted(
- match_results,
- key=lambda x: x.get('匹配结果', {}).get('相似度', 0),
- reverse=True
- )
- top3_matches = sorted_matches[:3] # 取前3个
- # 构建top3匹配信息列表
- top3_match_info = []
- for match in top3_matches:
- feature_classification = match.get('特征分类', [])
- classification_path = self._build_classification_path(feature_classification)
- # 如果路径为空且是分类类型,搜索补全路径
- if not classification_path and match.get('特征类型') == '分类':
- feature_name_to_search = match.get('人设特征名称', '')
- classification_path = self._search_classification_path(feature_name_to_search)
- is_classification = self._is_classification(match.get('人设特征名称', ''), classification_path)
- top3_match_info.append({
- '人设特征名称': match.get('人设特征名称'),
- '人设特征层级': match.get('人设特征层级'),
- '特征类型': match.get('特征类型'),
- '特征分类': feature_classification,
- '相似度': match.get('匹配结果', {}).get('相似度', 0),
- '匹配说明': match.get('匹配结果', {}).get('说明', ''),
- '是分类': is_classification,
- '所属分类路径': classification_path
- })
- result_item = {
- '原始特征名称': feature_name,
- '来源层级': level_name,
- '权重': feature.get('权重', 0),
- '所属点名称': item_name,
- '最高匹配信息': top3_match_info[0], # 保留第1个用于Stage2
- 'top3匹配信息': top3_match_info # 新增字段
- }
- results.append(result_item)
- selected_count += 1
- # 显示top3匹配信息
- top3_names = [m['人设特征名称'] for m in top3_match_info]
- logger.info(f" ✓ {feature_name} → Top{len(top3_match_info)}: {', '.join(top3_names)}")
- # 统计信息
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段1完成")
- logger.info(f" 总特征数: {total_features}")
- logger.info(f" 过滤掉(<0.5): {filtered_out_low}")
- logger.info(f" 过滤掉(>=0.8): {filtered_out_high}")
- logger.info(f" 保留(0.5-0.8): {selected_count}")
- logger.info("=" * 60)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage1_filtered_features.json")
- self._save_json(results, output_path)
- return results
- def _build_classification_path(self, feature_classification: List[str]) -> str:
- """
- 构建分类路径
- Args:
- feature_classification: 特征分类数组
- Returns:
- 分类路径
- """
- if not feature_classification:
- return ""
- # 步骤1: 去掉中间元素的"实质"后缀
- cleaned = []
- for i, item in enumerate(feature_classification):
- if i == len(feature_classification) - 1: # 最后一个保留
- cleaned.append(item)
- elif item.endswith("实质") and i != 0: # 中间的去掉"实质"
- cleaned.append(item[:-2])
- else:
- cleaned.append(item)
- # 步骤2: 反转数组
- reversed_list = list(reversed(cleaned))
- # 步骤3: 拼接路径
- path = "/".join(reversed_list)
- return path
- def _is_classification(self, persona_feature_name: str, classification_path: str) -> bool:
- """
- 判断是分类还是特征
- Args:
- persona_feature_name: 人设特征名称
- classification_path: 分类路径
- Returns:
- True: 是分类, False: 是特征
- """
- # 在optimized_clustered_data中查找
- # 如果在特征列表中找到,就是特征
- # 如果作为节点存在且有子节点,就是分类
- # 导航到节点
- node = self._navigate_to_node(classification_path)
- if not node:
- return False
- # 检查是否在特征列表中
- features = node.get('特征列表', [])
- for f in features:
- if f.get('特征名称') == persona_feature_name:
- return False # 在特征列表中,是特征
- # 检查是否作为子节点存在
- if persona_feature_name in node:
- sub_node = node[persona_feature_name]
- if isinstance(sub_node, dict):
- return True # 是子节点,是分类
- return False # 默认是特征
- def _navigate_to_node(self, path: str) -> Optional[Dict[str, Any]]:
- """
- 导航到指定路径的节点
- Args:
- path: 路径,如 "实质/猫咪宠物"
- Returns:
- 节点,未找到返回None
- """
- if not path:
- return None
- parts = path.split('/')
- first_part = parts[0]
- # 确定顶层key
- top_level_map = {
- '意图': '目的点',
- '要素': '目的点',
- '实质': None,
- '形式': None,
- '场景': None
- }
- top_keys = []
- if first_part in top_level_map:
- mapped = top_level_map[first_part]
- if mapped:
- top_keys.append(mapped)
- if not top_keys:
- top_keys = ['灵感点列表', '关键点列表', '目的点']
- # 尝试在每个顶层中查找
- for top_key in top_keys:
- current = self.optimized_clustered_data.get(top_key)
- if not current:
- continue
- # 逐层导航
- found = True
- for part in parts:
- if isinstance(current, dict) and part in current:
- current = current[part]
- else:
- found = False
- break
- if found and isinstance(current, dict):
- return current
- return None
- def _recursive_search(
- self,
- obj: Dict[str, Any],
- target_name: str,
- current_path: str = ""
- ) -> Optional[str]:
- """
- 递归搜索分类节点
- Args:
- obj: 当前搜索的对象
- target_name: 目标分类名称
- current_path: 当前路径
- Returns:
- 找到的完整路径,未找到返回None
- """
- if not isinstance(obj, dict):
- return None
- # 遍历所有键
- for key in obj.keys():
- # 跳过元数据和特征列表
- if key in ['_meta', '特征列表']:
- continue
- # 检查是否匹配
- if target_name in key or key in target_name:
- # 找到匹配,返回路径
- if current_path:
- return f"{current_path}/{key}"
- else:
- return key
- # 递归搜索子节点
- if isinstance(obj[key], dict):
- next_path = f"{current_path}/{key}" if current_path else key
- result = self._recursive_search(obj[key], target_name, next_path)
- if result:
- return result
- return None
- def _search_classification_path(self, classification_name: str) -> str:
- """
- 在optimized_clustered_data中搜索分类节点路径
- Args:
- classification_name: 分类名称,如"实体物品实质"
- Returns:
- 完整路径,如"实质/实体物品",未找到返回空字符串
- """
- if not classification_name:
- return ""
- # 清理名称:去掉常见后缀
- clean_name = classification_name
- for suffix in ['实质', '意图', '形式', '要素']:
- if clean_name.endswith(suffix) and len(clean_name) > len(suffix):
- clean_name = clean_name[:-len(suffix)]
- break
- logger.info(f" 搜索分类: {classification_name} → 清理为: {clean_name}")
- # 在三个顶级列表中搜索
- for top_key in ['灵感点列表', '关键点列表', '目的点']:
- top_data = self.optimized_clustered_data.get(top_key, {})
- if not top_data:
- continue
- # 递归搜索
- path = self._recursive_search(top_data, clean_name, "")
- if path:
- logger.info(f" ✓ 找到路径: {path}")
- return path
- logger.warning(f" ✗ 未找到分类路径: {classification_name}")
- return ""
- # ========== 阶段2:收集关联分类+标签+子分类 ==========
- def stage2_find_associations(self, filtered_features: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段2:查找关联分类,收集分类名称、标签、子分类
- 改进: 为top3的每个base_word都查找关联
- Args:
- filtered_features: 阶段1筛选的特征
- Returns:
- 带关联信息的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段2:查找关联分类(为每个base_word)")
- logger.info("=" * 60)
- for idx, feature in enumerate(filtered_features, 1):
- logger.info(f"\n[{idx}/{len(filtered_features)}] 处理: {feature['原始特征名称']}")
- # 获取top3 base_words
- top3_info = feature.get('top3匹配信息', [])
- if not top3_info:
- logger.warning(f" 无top3匹配信息,跳过")
- feature['找到的关联_按base_word'] = {}
- continue
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 为每个base_word查找关联
- associations_by_base_word = {}
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- is_classification = base_info['是分类']
- classification_path = base_info['所属分类路径']
- source_level = base_info['人设特征层级']
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word}")
- if is_classification:
- search_path = classification_path
- logger.info(f" 匹配到分类: {search_path}")
- else:
- search_path = classification_path
- logger.info(f" 匹配到特征,使用所属分类: {search_path}")
- # 查找跨维度关联
- associations = self._find_associations(search_path, source_level)
- logger.info(f" 找到 {len(associations)} 个跨维度关联")
- # 查找维度内关联
- intra_associations = self._find_intra_dimension_associations(search_path, source_level)
- logger.info(f" 找到 {len(intra_associations)} 个维度内关联")
- # 合并两种关联
- all_associations = associations + intra_associations
- # 收集关联信息
- base_word_associations = []
- for assoc in all_associations:
- target_path = assoc['目标分类']
- # 收集分类信息
- classification_info = self._collect_classification_info(target_path)
- if classification_info:
- # 检查是否为维度内关联
- is_intra = assoc['关联类型'] == '维度内组合关联'
- base_word_associations.append({
- '来源方向': assoc['来源方向'],
- '关联类型': assoc['关联类型'],
- '目标分类路径': target_path,
- '共同帖子数': assoc.get('点数', assoc.get('共同帖子数', 0)),
- 'Jaccard相似度': assoc.get('Jaccard相似度', 0.0) if not is_intra else 0.0,
- '分类名称': classification_info['classification_name'],
- '标签列表': classification_info['tags'],
- '子分类列表': classification_info['sub_classifications']
- })
- associations_by_base_word[base_word] = base_word_associations
- logger.info(f" 总计 {len(base_word_associations)} 个关联(跨维度: {len(associations)}, 维度内: {len(intra_associations)})")
- # 保存结果
- feature['找到的关联_按base_word'] = associations_by_base_word
- # 向后兼容:保留基于最高匹配信息的关联(即第1个base_word的关联)
- first_base_word = top3_info[0].get('人设特征名称', '')
- feature['找到的关联'] = associations_by_base_word.get(first_base_word, [])
- total_associations = sum(len(v) for v in associations_by_base_word.values())
- logger.info(f" 总共找到 {total_associations} 个关联({len(associations_by_base_word)} 个base_word)")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage2_associations.json")
- self._save_json(filtered_features, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段2完成")
- logger.info("=" * 60)
- return filtered_features
- def _find_associations(self, classification_path: str, source_level: str) -> List[Dict[str, Any]]:
- """
- 查找关联节点
- Args:
- classification_path: 分类路径
- source_level: 源层级
- Returns:
- 关联节点列表
- """
- associations = []
- # 确定维度名称
- if '灵感点' in source_level:
- dimension_key = '灵感点维度'
- elif '关键点' in source_level:
- dimension_key = '关键点维度'
- elif '目的点' in source_level:
- dimension_key = '目的点维度'
- else:
- return associations
- # 获取维度数据
- single_dim = self.dimension_associations.get('单维度关联分析', {})
- dimension_data = single_dim.get(dimension_key, {})
- if not dimension_data:
- return associations
- # 遍历所有方向
- for direction_key, direction_data in dimension_data.items():
- if direction_key == '说明':
- continue
- # 查找源分类
- if classification_path in direction_data:
- source_data = direction_data[classification_path]
- # 获取关联节点
- for assoc_key in source_data.keys():
- if assoc_key.startswith('与') and assoc_key.endswith('的关联'):
- assoc_list = source_data[assoc_key]
- for assoc_item in assoc_list:
- associations.append({
- '来源方向': direction_key,
- '关联类型': assoc_key,
- '目标分类': assoc_item.get('目标分类'),
- '目标层级': assoc_item.get('目标层级'),
- '共同帖子数': assoc_item.get('共同帖子数'),
- 'Jaccard相似度': assoc_item.get('Jaccard相似度'),
- '共同帖子ID': assoc_item.get('共同帖子ID', [])
- })
- return associations
- def _find_intra_dimension_associations(
- self,
- classification_path: str,
- source_level: str
- ) -> List[Dict[str, Any]]:
- """
- 查找维度内关联
- 在同一维度内,查找叶子分类的组合关联。
- 例如:如果A和B经常在同一帖子中出现,它们就有维度内关联。
- Args:
- classification_path: 分类路径,如 "实质/身份与情绪/生理状态与行为/疲惫与熬夜状态"
- source_level: 源层级,如 "关键点列表"
- Returns:
- 关联列表,每个关联包含:
- - 来源方向: 维度-维度内
- - 关联类型: 维度内组合关联
- - 目标分类: 关联分类的完整路径
- - 组合键: 组合的唯一标识(如 "夸张极致表现|疲惫与熬夜状态")
- - 点数: 该组合出现的次数
- - 目标层级: 目标层级(与源层级相同)
- """
- if not self.intra_associations:
- return []
- associations = []
- # 步骤1: 提取叶子分类名称(路径最后一段)
- if not classification_path:
- return []
- leaf_name = classification_path.split('/')[-1]
- # 步骤2: 确定维度
- dimension = None
- if '灵感点' in source_level:
- dimension = '灵感点'
- elif '关键点' in source_level:
- dimension = '关键点'
- elif '目的点' in source_level:
- dimension = '目的点'
- if not dimension:
- return []
- # 步骤3: 查找组合
- clusters = self.intra_associations.get('叶子分类组合聚类', {}).get(dimension, {})
- if not clusters:
- return []
- # 步骤4: 遍历所有组合,找到包含当前叶子分类的组合
- for combo_key, cluster in clusters.items():
- combo_parts = combo_key.split('|')
- # 如果当前叶子分类在组合中
- if leaf_name not in combo_parts:
- continue
- # 提取点详情中的特征信息
- for point in cluster.get('点详情列表', []):
- for feature in point.get('特征列表', []):
- other_leaf = feature.get('叶子分类', '')
- other_path = feature.get('完整路径', '')
- # 跳过自己
- if other_leaf == leaf_name or not other_path:
- continue
- # 添加维度内关联(保持与跨维度关联相同的结构)
- associations.append({
- '来源方向': f'{dimension}-维度内',
- '关联类型': '维度内组合关联',
- '目标分类': other_path, # 使用'目标分类'保持与跨维度关联一致
- '组合键': combo_key,
- '点数': cluster.get('点数', 0),
- '目标层级': source_level # 同一维度内的关联,层级相同
- })
- return associations
- def _collect_classification_info(self, classification_path: str) -> Optional[Dict[str, Any]]:
- """
- 收集分类信息:分类名 + 标签 + 子分类
- Args:
- classification_path: 分类路径
- Returns:
- 分类信息
- """
- node = self._navigate_to_node(classification_path)
- if not node:
- return None
- # 分类名称(路径最后一段)
- classification_name = classification_path.split('/')[-1]
- # 标签(特征列表)
- tags = [f.get('特征名称', '') for f in node.get('特征列表', [])]
- # 子分类(子节点,排除_meta和特征列表)
- sub_classifications = [
- key for key in node.keys()
- if isinstance(node[key], dict) and key not in ['_meta', '特征列表']
- ]
- return {
- 'classification_name': classification_name,
- 'tags': tags,
- 'sub_classifications': sub_classifications
- }
- # ========== 阶段3:筛选高相似度匹配(>0.8) ==========
- def stage3_filter_high_similarity_matches(self, associations_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段3:筛选高相似度匹配(>0.8)
- 改进:为每个base_word独立筛选候选词
- 基于该base_word的关联范围,在how解构中找出相似度>0.8的匹配
- Args:
- associations_data: 阶段2的关联数据
- Returns:
- 带高相似度候选的数据
- """
- logger.info("=" * 60)
- logger.info("阶段3:筛选高相似度匹配(>0.8,为每个base_word)")
- logger.info("=" * 60)
- for idx, feature_result in enumerate(associations_data, 1):
- original_feature_name = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{len(associations_data)}] 处理: {original_feature_name}")
- # 获取top3 base_words
- top3_info = feature_result.get('top3匹配信息', [])
- associations_by_base_word = feature_result.get('找到的关联_按base_word', {})
- if not top3_info or not associations_by_base_word:
- logger.warning(f" 无top3匹配信息或关联数据,跳过")
- feature_result['高相似度候选_按base_word'] = {}
- continue
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 为每个base_word独立筛选候选词
- candidates_by_base_word = {}
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word}")
- # 步骤1: 收集该base_word的关联范围
- base_word_associations = associations_by_base_word.get(base_word, [])
- base_word_scope = self._collect_scope_from_associations(base_word_associations)
- logger.info(f" 关联范围包含 {len(base_word_scope)} 个分类/标签")
- if not base_word_scope:
- logger.warning(f" 无关联范围,跳过")
- candidates_by_base_word[base_word] = []
- continue
- # 步骤2: 遍历how解构,找出高相似度匹配
- high_sim_candidates = []
- total_checked = 0
- high_sim_found = 0
- how_result = self.how_data.get('how解构结果', {})
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- for item in level_list:
- for step in item.get('how步骤列表', []):
- for feature in step.get('特征列表', []):
- matches = feature.get('匹配结果', [])
- total_checked += len(matches)
- # 筛选相似度>0.8且在该base_word的范围内的匹配
- for match in matches:
- sim = match.get('匹配结果', {}).get('相似度', 0)
- persona_feature_name = match.get('人设特征名称', '')
- if sim > 0.8 and persona_feature_name in base_word_scope:
- high_sim_found += 1
- high_sim_candidates.append({
- '人设特征名称': persona_feature_name,
- '相似度': sim,
- '特征类型': match.get('特征类型', ''),
- '特征分类': match.get('特征分类', []),
- '人设特征层级': match.get('人设特征层级', ''),
- '来源路径': self._build_classification_path(match.get('特征分类', [])),
- '匹配说明': match.get('匹配结果', {}).get('说明', ''),
- '来源原始特征': feature.get('特征名称', '')
- })
- logger.info(f" 检查了 {total_checked} 个匹配")
- logger.info(f" 找到 {high_sim_found} 个相似度>0.8的匹配")
- # 按相似度降序排序并去重
- seen_names = set()
- unique_candidates = []
- high_sim_candidates.sort(key=lambda x: x['相似度'], reverse=True)
- for candidate in high_sim_candidates:
- name = candidate['人设特征名称']
- if name not in seen_names:
- seen_names.add(name)
- unique_candidates.append(candidate)
- candidates_by_base_word[base_word] = unique_candidates
- logger.info(f" 去重后筛选出 {len(unique_candidates)} 个候选")
- # 显示前5个
- if unique_candidates:
- logger.info(f" Top 5:")
- for c in unique_candidates[:5]:
- logger.info(f" • {c['人设特征名称']} ({c['相似度']:.3f}) ← 来自\"{c['来源原始特征']}\"")
- # 保存结果
- feature_result['高相似度候选_按base_word'] = candidates_by_base_word
- # 向后兼容:保留第1个base_word的候选
- first_base_word = top3_info[0].get('人设特征名称', '')
- feature_result['高相似度候选'] = candidates_by_base_word.get(first_base_word, [])
- total_candidates = sum(len(v) for v in candidates_by_base_word.values())
- logger.info(f" 总共筛选出 {total_candidates} 个候选({len(candidates_by_base_word)} 个base_word)")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage3_high_similarity.json")
- self._save_json(associations_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段3完成")
- logger.info("=" * 60)
- return associations_data
- def _collect_scope_from_associations(self, associations: List[Dict[str, Any]]) -> Set[str]:
- """
- 从关联列表中收集所有分类名和标签,形成范围集合
- Args:
- associations: 关联列表
- Returns:
- 包含所有分类名和标签的集合
- """
- scope = set()
- for assoc in associations:
- # 添加分类名
- scope.add(assoc['分类名称'])
- # 添加所有标签
- tags = assoc.get('标签列表', [])
- scope.update(tags)
- return scope
- def stage23_extract_candidates_from_how(self, filtered_features: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 新方式:从how文件提取相似度>=0.8的候选词
- 替代 Stage 2-3,但构造相同的数据结构
- 处理流程:
- 1. 遍历 how_data['how解构结果'] 所有特征的匹配结果
- 2. 筛选 相似度 >= 0.8 的人设特征名称
- 3. 去重(按最高相似度保留)
- 4. 按相似度降序排序
- 5. 为每个中心词复制相同的候选词列表
- 6. 构造 '高相似度候选_按base_word' 结构
- Args:
- filtered_features: Stage 1筛选的特征列表
- Returns:
- 与Stage 3输出结构完全一致的特征列表
- """
- logger.info("=" * 60)
- logger.info("Stage 2-3 (新方式): 从how文件提取高相似度候选词")
- logger.info("=" * 60)
- # Step 1: 从整个how文件提取候选词
- candidates_dict = {} # {人设特征名称: {候选词信息}}
- how_result = self.how_data.get('how解构结果', {})
- # 遍历三个维度
- for dimension in ['灵感点列表', '关键点列表', '目的点列表']:
- features_list = how_result.get(dimension, [])
- for item in features_list:
- item_name = item.get('名称', '')
- how_steps = item.get('how步骤列表', [])
- for step in how_steps:
- for feature in step.get('特征列表', []):
- feature_name = feature.get('特征名称', '')
- matches = feature.get('匹配结果', [])
- for match in matches:
- # 获取相似度(从匹配结果的嵌套结构中)
- similarity = match.get('匹配结果', {}).get('相似度', 0)
- persona_feature_name = match.get('人设特征名称', '')
- # 筛选相似度 >= 0.8
- if similarity >= 0.8 and persona_feature_name:
- # 去重逻辑:保留最高相似度
- if persona_feature_name not in candidates_dict or \
- similarity > candidates_dict[persona_feature_name]['相似度']:
- candidates_dict[persona_feature_name] = {
- '人设特征名称': persona_feature_name,
- '相似度': similarity,
- '特征类型': match.get('特征类型', ''),
- '特征分类': match.get('特征分类', []),
- '人设特征层级': match.get('人设特征层级', ''),
- '来源路径': self._build_classification_path(match.get('特征分类', [])),
- '匹配说明': match.get('匹配结果', {}).get('说明', ''),
- '来源原始特征': feature_name
- }
- # Step 2: 转为列表并按相似度降序排序
- global_candidates = sorted(
- candidates_dict.values(),
- key=lambda x: x['相似度'],
- reverse=True
- )
- logger.info(f"从how文件提取到 {len(global_candidates)} 个唯一的高相似度候选词")
- # 显示Top 10候选词
- if global_candidates:
- logger.info("Top 10 候选词:")
- for i, candidate in enumerate(global_candidates[:10], 1):
- logger.info(f" {i}. {candidate['人设特征名称']} (相似度: {candidate['相似度']:.3f})")
- # Step 3: 为每个特征构造输出结构
- results = []
- for idx, feature_data in enumerate(filtered_features, 1):
- original_feature_name = feature_data.get('原始特征名称', '')
- logger.info(f"\n[{idx}/{len(filtered_features)}] 处理: {original_feature_name}")
- top3_matches = feature_data.get('top3匹配信息', [])
- # 提取3个中心词
- base_words = [match.get('人设特征名称', '') for match in top3_matches[:3]]
- logger.info(f" 中心词: {', '.join(base_words)}")
- # 所有中心词共享相同的候选词列表
- high_similarity_by_base = {}
- for base_word in base_words:
- if base_word:
- high_similarity_by_base[base_word] = global_candidates.copy()
- logger.info(f" 每个中心词分配 {len(global_candidates)} 个候选词")
- result = {
- '原始特征名称': original_feature_name,
- '来源层级': feature_data.get('来源层级', ''), # 保留元数据
- '权重': feature_data.get('权重', 0), # 保留元数据
- 'top3匹配信息': top3_matches,
- '找到的关联_按base_word': {}, # 新方式不需要关联分析
- '高相似度候选_按base_word': high_similarity_by_base
- }
- results.append(result)
- # 保存结果
- output_path = os.path.join(self.output_dir, 'stage3_high_similarity_how_based.json')
- self._save_json(results, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"Stage 2-3 (新方式) 完成")
- logger.info(f" 提取候选词: {len(global_candidates)} 个")
- logger.info(f" 处理特征: {len(results)} 个")
- logger.info("=" * 60)
- return results
- def _collect_stage2_scope(self, feature_result: Dict[str, Any]) -> Set[str]:
- """
- 收集Stage2找到的所有分类名和标签,形成范围集合(兼容旧方法)
- Args:
- feature_result: 特征结果数据
- Returns:
- 包含所有分类名和标签的集合
- """
- associations = feature_result.get('找到的关联', [])
- return self._collect_scope_from_associations(associations)
- def _find_features_by_path(self, target_classification: str) -> List[Dict[str, Any]]:
- """
- 根据路径查找特征列表
- Args:
- target_classification: 目标分类路径
- Returns:
- 特征列表
- """
- node = self._navigate_to_node(target_classification)
- if not node:
- return []
- features = node.get('特征列表', [])
- # 深拷贝
- return copy.deepcopy(features)
- # ========== 阶段4:多词组合 + LLM评估 ==========
- def stage4_generate_and_evaluate_search_words(
- self,
- features_data: List[Dict[str, Any]],
- max_workers: int = 4,
- max_candidates: int = 20,
- max_combo_length: int = 4
- ) -> List[Dict[str, Any]]:
- """
- 阶段4:多词组合 + LLM评估
- 基于Stage1的基础词和Stage3的高相似度候选,
- 生成所有2-N词组合,通过LLM评估选出Top10
- Args:
- features_data: 阶段3的数据(包含高相似度候选)
- max_workers: 并发评估的原始特征数(默认4)
- max_candidates: 参与组合的最大候选词数(默认20)
- max_combo_length: 最大组合词数(默认4,即基础词+3个候选)
- Returns:
- 带LLM评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段4:多词组合 + LLM评估")
- logger.info(f" 最大候选词数: {max_candidates}")
- logger.info(f" 最大组合长度: {max_combo_length} 词")
- logger.info(f" 并发数: {max_workers} 个原始特征")
- logger.info("=" * 60)
- total_features = len(features_data)
- # 使用ThreadPoolExecutor并行处理不同的原始特征
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- futures = []
- for idx, feature_result in enumerate(features_data, 1):
- future = executor.submit(
- self._process_single_feature_combinations,
- idx,
- total_features,
- feature_result,
- max_candidates,
- max_combo_length
- )
- futures.append((future, feature_result))
- # 等待所有任务完成并收集结果
- for future, feature_result in futures:
- try:
- _ = future.result() # 等待完成,结果已经写回到feature_result中
- except Exception as e:
- logger.error(f" 评估失败: {feature_result['原始特征名称']}, 错误: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage4_combinations_evaluated.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段4完成")
- logger.info("=" * 60)
- return features_data
- def _process_single_feature_combinations(
- self,
- idx: int,
- total: int,
- feature_result: Dict[str, Any],
- max_candidates: int,
- max_combo_length: int
- ) -> None:
- """
- 处理单个原始特征的组合生成和评估
- 改进: 每个base_word使用自己的候选词(而不是共享)
- Steps:
- 1. Get top3 base_words from Stage1's top3匹配信息
- 2. For each base_word:
- a. Get candidates from Stage3's 高相似度候选_按base_word
- b. Generate combinations
- c. LLM evaluation
- d. Select Top 10
- 3. Save grouped results
- Args:
- idx: 特征索引
- total: 总特征数
- feature_result: 特征结果数据
- max_candidates: 参与组合的最大候选词数
- max_combo_length: 最大组合词数
- """
- original_feature = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{total}] 处理: {original_feature}")
- # 步骤1: 获取top3基础词
- top3_info = feature_result.get('top3匹配信息', [])
- if not top3_info:
- logger.info(f" 无top3匹配信息,跳过")
- feature_result['组合评估结果_分组'] = []
- return
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 步骤2: 获取按base_word分组的候选词
- candidates_by_base_word = feature_result.get('高相似度候选_按base_word', {})
- if not candidates_by_base_word:
- logger.warning(f" 无按base_word分组的候选词,跳过")
- feature_result['组合评估结果_分组'] = []
- return
- # 步骤3: 为每个base_word独立处理
- grouped_results = []
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- base_similarity = base_info.get('相似度', 0)
- if not base_word:
- continue
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word} (相似度: {base_similarity:.3f})")
- # 获取该base_word的候选词
- base_candidates = candidates_by_base_word.get(base_word, [])
- candidates = base_candidates[:max_candidates]
- candidate_words = [c['人设特征名称'] for c in candidates]
- if not candidate_words:
- logger.warning(f" 该base_word无候选词,跳过")
- grouped_results.append({
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'base_word_info': base_info,
- 'top10_searches': [],
- 'available_words': []
- })
- continue
- logger.info(f" 候选词数量: {len(candidate_words)} (限制: {max_candidates})")
- # LLM生成query(新方式:直接让LLM基于候选词生成query)
- logger.info(f" 使用LLM生成query(中心词: {base_word})...")
- evaluated = self.llm_evaluator.generate_queries_from_candidates(
- original_feature=original_feature,
- base_word=base_word,
- candidate_words=candidate_words,
- max_queries=10
- )
- # 选出Top 10(已经由LLM生成方法控制数量)
- top_10 = evaluated[:10]
- logger.info(f" 生成完成,共 {len(top_10)} 个query")
- # 保存分组结果 - 每个base_word有自己的available_words
- grouped_results.append({
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'base_word_info': base_info,
- 'top10_searches': top_10,
- 'available_words': candidate_words # 该base_word自己的候选词
- })
- # 写回结果
- feature_result['组合评估结果_分组'] = grouped_results
- total_searches = sum(len(g['top10_searches']) for g in grouped_results)
- logger.info(f" 完成!共 {len(grouped_results)} 个base_word,{total_searches} 个搜索词")
- # ========== 阶段5:执行搜索 ==========
- def _execute_single_search(
- self,
- idx: int,
- total: int,
- search_word: str,
- feature_ref: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 执行单个搜索任务(用于并发执行)
- Args:
- idx: 搜索索引
- total: 总搜索数
- search_word: 搜索词
- feature_ref: 特征引用(用于写入结果)
- Returns:
- 搜索结果信息
- """
- logger.info(f"[{idx}/{total}] 搜索: {search_word}")
- try:
- result = self.search_client.search(
- keyword=search_word,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 写入结果
- feature_ref['search_result'] = result
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count,
- 'search_params': {
- 'keyword': search_word,
- 'content_type': '图文',
- 'sort_type': '综合'
- }
- }
- return {'status': 'success', 'search_word': search_word, 'note_count': note_count}
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- feature_ref['search_result'] = None
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'failed',
- 'note_count': 0,
- 'error': str(e)
- }
- return {'status': 'failed', 'search_word': search_word, 'error': str(e)}
- def stage5_execute_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0,
- top_n: int = 10
- ) -> List[Dict[str, Any]]:
- """
- 阶段5:执行小红书搜索
- Args:
- features_data: 阶段4的数据
- search_delay: 搜索延迟
- top_n: 每个原始特征取评分最高的N个搜索词
- Returns:
- 带搜索结果的数据
- """
- logger.info("=" * 60)
- logger.info("阶段5:执行小红书搜索")
- logger.info("=" * 60)
- # 按原始特征分组收集搜索词(从Stage4的组合评估结果_分组读取)
- feature_search_groups = {}
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- if original_feature not in feature_search_groups:
- feature_search_groups[original_feature] = []
- # 从Stage4的组合评估结果_分组读取(新结构)
- grouped_results = feature_result.get('组合评估结果_分组', [])
- if grouped_results:
- # 使用分组结构:每个base_word的top10都执行
- for group in grouped_results:
- base_word = group.get('base_word', '')
- base_similarity = group.get('base_word_similarity', 0)
- base_word_searches = []
- for eval_item in group.get('top10_searches', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- base_word_searches.append({
- 'search_word': sw,
- 'score': score,
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'feature_ref': eval_item # 引用评估项,用于写入搜索结果
- })
- # 应用每个base_word的搜索次数限制
- if self.max_searches_per_base_word and len(base_word_searches) > self.max_searches_per_base_word:
- logger.info(f" 应用base_word限制: {base_word} 从 {len(base_word_searches)} 减少到 {self.max_searches_per_base_word}")
- base_word_searches = base_word_searches[:self.max_searches_per_base_word]
- feature_search_groups[original_feature].extend(base_word_searches)
- else:
- # 兼容旧结构(组合评估结果)
- for eval_item in feature_result.get('组合评估结果', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- feature_search_groups[original_feature].append({
- 'search_word': sw,
- 'score': score,
- 'feature_ref': eval_item
- })
- # 应用每个原始特征的搜索次数限制
- if self.max_searches_per_feature and len(feature_search_groups[original_feature]) > self.max_searches_per_feature:
- logger.info(f" 应用特征限制: {original_feature} 从 {len(feature_search_groups[original_feature])} 减少到 {self.max_searches_per_feature}")
- feature_search_groups[original_feature] = feature_search_groups[original_feature][:self.max_searches_per_feature]
- # 收集所有搜索任务(分组结构下执行所有base_word的top10,不再过滤)
- all_searches = []
- total_count = 0
- for original_feature, search_list in feature_search_groups.items():
- total_count += len(search_list)
- all_searches.extend(search_list)
- logger.info(f" {original_feature}: {len(search_list)} 个搜索词")
- # 应用全局搜索次数限制
- if self.max_total_searches and len(all_searches) > self.max_total_searches:
- logger.info(f" 应用全局限制:从 {len(all_searches)} 个减少到 {self.max_total_searches} 个")
- all_searches = all_searches[:self.max_total_searches]
- logger.info(f"\n共 {len(all_searches)} 个搜索任务")
- logger.info(f" 并发执行搜索(并发数: {self.search_max_workers})")
- # 使用ThreadPoolExecutor并发执行搜索
- with ThreadPoolExecutor(max_workers=self.search_max_workers) as executor:
- # 提交所有搜索任务
- futures = []
- for idx, item in enumerate(all_searches, 1):
- future = executor.submit(
- self._execute_single_search,
- idx,
- len(all_searches),
- item['search_word'],
- item['feature_ref']
- )
- futures.append(future)
- # 等待所有搜索完成
- for future in as_completed(futures):
- try:
- result = future.result()
- # 结果已经写入feature_ref,无需额外处理
- except Exception as e:
- logger.error(f" 搜索任务失败: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage5_with_search_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段5完成")
- logger.info("=" * 60)
- return features_data
- # ========== 阶段6:LLM评估搜索结果 ==========
- def stage6_evaluate_search_results(
- self,
- features_data: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """
- 阶段6:用LLM评估搜索结果(多模态)
- Args:
- features_data: 阶段5的数据
- Returns:
- 带结果评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段6:LLM评估搜索结果")
- logger.info("=" * 60)
- # 收集所有需要评估的特征节点
- features_to_evaluate = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- if feature.get('search_result') and feature['search_metadata']['status'] == 'success':
- features_to_evaluate.append({
- 'original_feature': original_feature,
- 'feature_node': feature
- })
- logger.info(f"共 {len(features_to_evaluate)} 个搜索结果需要评估")
- # 并行评估(并发数较低)
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = []
- for item in features_to_evaluate:
- future = executor.submit(
- self._evaluate_single_search_result,
- item['original_feature'],
- item['feature_node']
- )
- futures.append((future, item))
- # 收集结果
- for idx, (future, item) in enumerate(futures, 1):
- try:
- evaluation = future.result()
- item['feature_node']['result_evaluation'] = evaluation
- logger.info(f" [{idx}/{len(futures)}] {item['feature_node']['search_word']}: "
- f"relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" 评估失败: {item['feature_node']['search_word']}, 错误: {e}")
- item['feature_node']['result_evaluation'] = None
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage6_with_evaluations.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段6完成")
- logger.info("=" * 60)
- return features_data
- def _evaluate_single_search_result(
- self,
- original_feature: str,
- feature_node: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 评估单个搜索结果(使用并行评估)
- Args:
- original_feature: 原始特征
- feature_node: 特征节点
- Returns:
- 评估结果
- """
- search_word = feature_node.get('search_word', '')
- notes = feature_node['search_result'].get('data', {}).get('data', [])
- return self.llm_evaluator.evaluate_search_results_parallel(
- original_feature=original_feature,
- search_word=search_word,
- notes=notes,
- max_notes=20,
- max_workers=20 # 20个并发评估每个帖子
- )
- def stage6_evaluate_search_results_with_filter(
- self,
- features_data: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """
- 阶段6:用LLM评估搜索结果(使用两层过滤评估)
- 遍历所有搜索结果,使用两层评估机制:
- 1. 第一层:过滤与搜索Query无关的结果
- 2. 第二层:评估与目标特征的匹配度(0.8-1.0/0.6-0.79/0.5-0.59/≤0.4)
- Args:
- features_data: 阶段5的数据
- Returns:
- 带评估结果的数据
- """
- logger.info("=" * 60)
- logger.info("阶段6:LLM评估搜索结果(两层过滤评估)")
- logger.info(f" 并发数: {self.stage6_max_workers}")
- logger.info(f" 每个搜索最多评估: {self.stage6_max_notes} 个帖子")
- logger.info("=" * 60)
- # 收集所有需要评估的搜索项
- search_items_to_evaluate = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- # 从组合评估结果_分组中读取搜索结果
- grouped_results = feature_result.get('组合评估结果_分组', [])
- if grouped_results:
- for group in grouped_results:
- for eval_item in group.get('top10_searches', []):
- # 检查是否有搜索结果
- if eval_item.get('search_result') and eval_item.get('search_metadata', {}).get('status') == 'success':
- search_items_to_evaluate.append({
- 'original_feature': original_feature,
- 'search_item': eval_item,
- 'base_word': group.get('base_word', '')
- })
- else:
- # 兼容旧结构
- for eval_item in feature_result.get('组合评估结果', []):
- if eval_item.get('search_result') and eval_item.get('search_metadata', {}).get('status') == 'success':
- search_items_to_evaluate.append({
- 'original_feature': original_feature,
- 'search_item': eval_item,
- 'base_word': ''
- })
- logger.info(f"共 {len(search_items_to_evaluate)} 个搜索结果需要评估")
- # 并行评估所有搜索结果
- with ThreadPoolExecutor(max_workers=self.stage6_max_workers) as executor:
- futures = []
- for idx, item in enumerate(search_items_to_evaluate, 1):
- future = executor.submit(
- self._evaluate_single_search_with_filter,
- idx,
- len(search_items_to_evaluate),
- item['original_feature'],
- item['search_item'],
- item['base_word']
- )
- futures.append((future, item))
- # 收集结果
- success_count = 0
- failed_count = 0
- for future, item in futures:
- try:
- evaluation = future.result()
- item['search_item']['evaluation_with_filter'] = evaluation
- success_count += 1
- except Exception as e:
- logger.error(f" 评估失败: {item['search_item'].get('search_word', 'unknown')}, 错误: {e}")
- item['search_item']['evaluation_with_filter'] = None
- failed_count += 1
- logger.info(f"\n评估完成: 成功 {success_count}, 失败 {failed_count}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage6_with_evaluations.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段6完成")
- logger.info("=" * 60)
- return features_data
- def _evaluate_single_search_with_filter(
- self,
- idx: int,
- total: int,
- original_feature: str,
- search_item: Dict[str, Any],
- base_word: str
- ) -> Dict[str, Any]:
- """
- 评估单个搜索结果(使用两层过滤)
- Args:
- idx: 索引
- total: 总数
- original_feature: 原始特征
- search_item: 搜索项(包含search_word和search_result)
- base_word: 基础词
- Returns:
- 评估结果
- """
- search_word = search_item.get('search_word', '')
- notes = search_item['search_result'].get('data', {}).get('data', [])
- logger.info(f"[{idx}/{total}] 评估: {search_word} (帖子数: {len(notes)})")
- # 调用LLM评估器的批量评估方法
- evaluation = self.llm_evaluator.batch_evaluate_notes_with_filter(
- search_query=search_word,
- target_feature=original_feature,
- notes=notes,
- max_notes=self.stage6_max_notes,
- max_workers=self.stage6_max_workers
- )
- # 统计信息
- filtered_count = evaluation.get('filtered_count', 0)
- evaluated_count = evaluation.get('evaluated_count', 0)
- match_dist = evaluation.get('match_distribution', {})
- logger.info(f" ✓ 完成: 过滤 {filtered_count}, 评估 {evaluated_count}, "
- f"完全匹配 {match_dist.get('完全匹配(0.8-1.0)', 0)}, "
- f"相似匹配 {match_dist.get('相似匹配(0.6-0.79)', 0)}")
- return evaluation
- # ========== 阶段7:扩展搜索 ==========
- def stage7_extended_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0
- ) -> List[Dict[str, Any]]:
- """
- 阶段7:基于评估结果扩展搜索(多个)
- Args:
- features_data: 阶段6的数据
- search_delay: 搜索延迟
- Returns:
- 带扩展搜索的数据
- """
- logger.info("=" * 60)
- logger.info("阶段7:扩展搜索")
- logger.info("=" * 60)
- # 收集需要扩展搜索的任务
- extension_tasks = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- result_eval = feature.get('result_evaluation')
- if not result_eval:
- continue
- extracted_elements = result_eval.get('extracted_elements', [])
- if not extracted_elements:
- continue
- # 为每个提取的元素创建扩展搜索
- base_search_word = feature.get('search_word', '')
- for element in extracted_elements:
- extended_keyword = f"{base_search_word} {element}"
- extension_tasks.append({
- 'extended_keyword': extended_keyword,
- 'original_feature': original_feature,
- 'feature_node': feature,
- 'element': element
- })
- logger.info(f"共 {len(extension_tasks)} 个扩展搜索任务")
- # 执行扩展搜索
- for idx, task in enumerate(extension_tasks, 1):
- extended_kw = task['extended_keyword']
- logger.info(f"[{idx}/{len(extension_tasks)}] 扩展搜索: {extended_kw}")
- try:
- result = self.search_client.search(
- keyword=extended_kw,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 评估扩展搜索结果
- logger.info(f" 评估扩展搜索结果...")
- evaluation = self.llm_evaluator.evaluate_search_results(
- original_feature=task['original_feature'],
- search_word=extended_kw,
- notes=result.get('data', {}).get('data', []),
- max_notes=20,
- max_images_per_note=2
- )
- # 存储扩展搜索结果
- feature_node = task['feature_node']
- if 'extended_searches' not in feature_node:
- feature_node['extended_searches'] = []
- feature_node['extended_searches'].append({
- 'extended_keyword': extended_kw,
- 'based_on_element': task['element'],
- 'search_result': result,
- 'search_metadata': {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count
- },
- 'result_evaluation': evaluation
- })
- logger.info(f" 评估完成,relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- # 延迟
- if idx < len(extension_tasks):
- time.sleep(search_delay)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage7_final_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段7完成")
- logger.info("=" * 60)
- return features_data
- # ========== 主流程 ==========
- def run_full_pipeline(self):
- """执行完整流程"""
- logger.info("\n" + "=" * 60)
- logger.info("开始执行完整流程")
- logger.info("=" * 60)
- try:
- # Stage 7 Only 模式:只运行 Stage 7
- if self.stage7_only:
- logger.info("运行模式: Stage 7 Only (从 Stage 6 结果开始)")
- stage6_path = os.path.join(self.output_dir, "stage6_with_evaluations.json")
- if not os.path.exists(stage6_path):
- raise FileNotFoundError(f"Stage 6 结果不存在: {stage6_path}")
- with open(stage6_path, 'r', encoding='utf-8') as f:
- stage6_results = json.load(f)
- stage7_results = self.stage7_analyzer.run(stage6_results)
- return stage7_results
- # 正常流程:从 Stage 1 开始
- # 阶段1
- stage1_results = self.stage1_filter_features()
- # 阶段2-3:根据 combination_source 选择方式
- if self.combination_source == "how_based":
- # 新方式:直接从how文件提取候选词(跳过Stage 2,直接生成Stage 3格式)
- logger.info(f"\n使用组合词来源方式: {self.combination_source} (新方式)")
- stage3_results = self.stage23_extract_candidates_from_how(stage1_results)
- else:
- # 旧方式:基于关联分析(association)
- logger.info(f"\n使用组合词来源方式: {self.combination_source} (旧方式)")
- # 阶段2
- stage2_results = self.stage2_find_associations(stage1_results)
- # 阶段3
- stage3_results = self.stage3_filter_high_similarity_matches(stage2_results)
- # 阶段4
- stage4_results = self.stage4_generate_and_evaluate_search_words(
- stage3_results,
- max_workers=8, # 提高并发从4到8
- max_combo_length=3 # 降低组合长度从4到3
- )
- # 阶段5
- stage5_results = self.stage5_execute_searches(stage4_results, search_delay=2.0, top_n=self.top_n)
- # 阶段6 - 条件执行(使用新的两层过滤评估)
- if self.enable_stage6:
- stage6_results = self.stage6_evaluate_search_results_with_filter(stage5_results)
- else:
- stage6_results = stage5_results
- logger.info("\n" + "=" * 60)
- logger.info("阶段6:跳过(未启用)")
- logger.info("=" * 60)
- # 阶段7 - 深度解构分析(条件执行)
- if self.enable_stage7:
- stage7_results = self.stage7_analyzer.run(stage6_results)
- final_results = stage7_results
- else:
- final_results = stage6_results
- logger.info("\n" + "=" * 60)
- if self.enable_stage7:
- logger.info("✓ 完整流程执行完成(Stage1-7)")
- elif self.enable_stage6:
- logger.info("✓ 完整流程执行完成(Stage1-6)")
- else:
- logger.info("✓ 完整流程执行完成(Stage1-5)")
- logger.info("=" * 60)
- # 自动执行可视化
- logger.info("\n" + "=" * 60)
- logger.info("开始生成可视化...")
- logger.info("=" * 60)
- try:
- # 根据是否启用stage6选择不同的可视化脚本
- viz_script = 'visualize_stage6_results.py' if self.enable_stage6 else 'visualize_stage5_results.py'
- logger.info(f" 使用可视化脚本: {viz_script}")
- result = subprocess.run(
- ['python3', viz_script],
- capture_output=True,
- text=True,
- timeout=60
- )
- if result.returncode == 0:
- logger.info("✓ 可视化生成成功")
- logger.info(result.stdout)
- else:
- logger.error(f"可视化生成失败: {result.stderr}")
- except subprocess.TimeoutExpired:
- logger.error("可视化生成超时")
- except Exception as e:
- logger.error(f"可视化生成异常: {e}")
- return final_results
- except Exception as e:
- logger.error(f"流程执行失败: {e}")
- raise
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='增强搜索系统V2')
- parser.add_argument(
- '--how-json',
- default='69114f150000000007001f30_how copy.json',
- help='How解构文件路径'
- )
- parser.add_argument(
- '--dimension-associations',
- default='dimension_associations_analysis.json',
- help='维度关联文件路径'
- )
- parser.add_argument(
- '--intra-associations',
- default='intra_dimension_associations_analysis.json',
- help='维度内关联文件路径'
- )
- parser.add_argument(
- '--optimized-clustered',
- default='optimized_clustered_data_gemini-3-pro-preview.json',
- help='人设特征库路径'
- )
- parser.add_argument(
- '--api-key',
- default=None,
- help='OpenRouter API密钥(默认从环境变量读取)'
- )
- parser.add_argument(
- '--output-dir',
- default='output_v2',
- help='输出目录'
- )
- parser.add_argument(
- '--top-n',
- type=int,
- default=10,
- help='每个原始特征取评分最高的N个搜索词(默认10)'
- )
- parser.add_argument(
- '--max-total-searches',
- type=int,
- default=None,
- help='全局最大搜索次数限制(默认None不限制)'
- )
- parser.add_argument(
- '--search-workers',
- type=int,
- default=3,
- help='搜索并发数(默认3)'
- )
- parser.add_argument(
- '--max-searches-per-feature',
- type=int,
- default=None,
- help='每个原始特征的最大搜索次数(默认None不限制)'
- )
- parser.add_argument(
- '--max-searches-per-base-word',
- type=int,
- default=None,
- help='每个base_word的最大搜索次数(默认None不限制)'
- )
- parser.add_argument(
- '--combination-source',
- type=str,
- choices=['how_based', 'association'],
- default='how_based',
- help='组合词来源方式(默认how_based):how_based=从how文件提取相似度>=0.8的候选词(新方式),association=基于关联分析提取候选词(旧方式)'
- )
- parser.add_argument(
- '--enable-stage6',
- action='store_true',
- help='启用Stage 6评估(默认False)'
- )
- parser.add_argument(
- '--stage6-max-workers',
- type=int,
- default=10,
- help='Stage 6并发评估数(默认10)'
- )
- parser.add_argument(
- '--stage6-max-notes',
- type=int,
- default=20,
- help='每个搜索结果评估的最大帖子数(默认20)'
- )
- parser.add_argument(
- '--enable-stage7',
- action='store_true',
- help='启用 Stage 7 深度解构分析'
- )
- parser.add_argument(
- '--stage7-only',
- action='store_true',
- help='只运行 Stage 7(从 Stage 6 结果开始)'
- )
- parser.add_argument(
- '--stage7-max-workers',
- type=int,
- default=5,
- help='Stage 7 并发数(默认5)'
- )
- parser.add_argument(
- '--stage7-max-notes',
- type=int,
- default=None,
- help='Stage 7 最多处理多少个完全匹配的帖子(默认None不限制)'
- )
- parser.add_argument(
- '--stage7-skip',
- type=int,
- default=0,
- help='Stage 7 跳过前 N 个完全匹配的帖子(默认0)'
- )
- parser.add_argument(
- '--stage7-sort-by',
- type=str,
- choices=['score', 'time', 'engagement'],
- default='score',
- help='Stage 7 排序方式: score(评分), time(时间), engagement(互动量)'
- )
- parser.add_argument(
- '--stage7-api-url',
- type=str,
- default='http://192.168.245.150:7000/what/analysis/single',
- help='Stage 7 解构 API 地址'
- )
- parser.add_argument(
- '--stage7-min-score',
- type=float,
- default=0.8,
- help='Stage 7 处理的最低分数阈值(默认0.8,0-1分制)'
- )
- args = parser.parse_args()
- # 创建系统实例
- system = EnhancedSearchV2(
- how_json_path=args.how_json,
- dimension_associations_path=args.dimension_associations,
- intra_associations_path=args.intra_associations,
- optimized_clustered_data_path=args.optimized_clustered,
- openrouter_api_key=args.api_key,
- output_dir=args.output_dir,
- top_n=args.top_n,
- max_total_searches=args.max_total_searches,
- search_max_workers=args.search_workers,
- max_searches_per_feature=args.max_searches_per_feature,
- max_searches_per_base_word=args.max_searches_per_base_word,
- enable_stage6=args.enable_stage6,
- stage6_max_workers=args.stage6_max_workers,
- stage6_max_notes=args.stage6_max_notes,
- enable_stage7=args.enable_stage7,
- stage7_only=args.stage7_only,
- stage7_max_workers=args.stage7_max_workers,
- stage7_max_notes=args.stage7_max_notes,
- stage7_skip=args.stage7_skip,
- stage7_sort_by=args.stage7_sort_by,
- stage7_api_url=args.stage7_api_url,
- stage7_min_score=args.stage7_min_score
- )
- # 执行完整流程
- system.run_full_pipeline()
- if __name__ == '__main__':
- main()
|