| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 增强搜索系统 V2
- 支持LLM评估和扩展搜索的完整流程
- """
- import json
- import logging
- import copy
- import time
- import os
- import argparse
- import subprocess
- from typing import Dict, List, Any, Optional, Set, Tuple
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from itertools import combinations
- from openrouter_client import OpenRouterClient
- from llm_evaluator import LLMEvaluator
- from xiaohongshu_search import XiaohongshuSearch
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- logging.FileHandler('enhanced_search_v2.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class EnhancedSearchV2:
- """增强搜索系统V2"""
- def __init__(
- self,
- how_json_path: str,
- dimension_associations_path: str,
- optimized_clustered_data_path: str,
- openrouter_api_key: Optional[str] = None,
- output_dir: str = "output_v2",
- top_n: int = 10,
- max_total_searches: Optional[int] = None,
- search_max_workers: int = 3
- ):
- """
- 初始化系统
- Args:
- how_json_path: How解构文件路径
- dimension_associations_path: 维度关联文件路径
- optimized_clustered_data_path: 人设特征库路径
- openrouter_api_key: OpenRouter API密钥
- output_dir: 输出目录
- top_n: 每个原始特征取评分最高的N个搜索词(默认10)
- max_total_searches: 全局最大搜索次数限制(默认None不限制)
- search_max_workers: 搜索并发数(默认3)
- """
- self.how_json_path = how_json_path
- self.dimension_associations_path = dimension_associations_path
- self.optimized_clustered_data_path = optimized_clustered_data_path
- self.output_dir = output_dir
- self.top_n = top_n
- self.max_total_searches = max_total_searches
- self.search_max_workers = search_max_workers
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- # 加载数据
- logger.info("加载数据文件...")
- self.how_data = self._load_json(how_json_path)
- self.dimension_associations = self._load_json(dimension_associations_path)
- self.optimized_clustered_data = self._load_json(optimized_clustered_data_path)
- # 初始化组件
- logger.info("初始化组件...")
- self.openrouter_client = OpenRouterClient(
- api_key=openrouter_api_key,
- model="google/gemini-2.5-flash",
- retry_delay=5 # 增加重试延迟避免限流
- )
- self.llm_evaluator = LLMEvaluator(self.openrouter_client)
- self.search_client = XiaohongshuSearch()
- logger.info("系统初始化完成")
- def _load_json(self, file_path: str) -> Any:
- """加载JSON文件"""
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"加载文件失败 {file_path}: {e}")
- raise
- def _save_json(self, data: Any, file_path: str):
- """保存JSON文件"""
- try:
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- logger.info(f"已保存: {file_path}")
- except Exception as e:
- logger.error(f"保存文件失败 {file_path}: {e}")
- raise
- # ========== 阶段1:筛选 0.5 <= 相似度 < 0.8 的特征 ==========
- def stage1_filter_features(self) -> List[Dict[str, Any]]:
- """
- 阶段1:筛选中等匹配度特征
- 筛选条件:0.5 <= 最高相似度 < 0.8
- Returns:
- 筛选后的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段1:筛选中等匹配度特征 (0.5 <= 相似度 < 0.8)")
- logger.info("=" * 60)
- results = []
- how_result = self.how_data.get('how解构结果', {})
- total_features = 0
- filtered_out_low = 0 # < 0.5
- filtered_out_high = 0 # >= 0.8
- selected_count = 0
- # 遍历三个维度
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- logger.info(f"\n处理 {level_name}...")
- for item_idx, item in enumerate(level_list):
- item_name = item.get('名称', f'未命名-{item_idx}')
- how_steps = item.get('how步骤列表', [])
- for step in how_steps:
- features = step.get('特征列表', [])
- for feature in features:
- feature_name = feature.get('特征名称', '')
- match_results = feature.get('匹配结果', [])
- total_features += 1
- if not match_results:
- continue
- # 找到最高相似度
- max_similarity = max(
- (m.get('匹配结果', {}).get('相似度', 0) for m in match_results),
- default=0
- )
- # 筛选条件
- if max_similarity < 0.5:
- filtered_out_low += 1
- continue
- elif max_similarity >= 0.8:
- filtered_out_high += 1
- continue
- # 0.5 <= max_similarity < 0.8,保留
- best_match = max(
- match_results,
- key=lambda x: x.get('匹配结果', {}).get('相似度', 0)
- )
- # 判断是分类还是特征
- feature_classification = best_match.get('特征分类', [])
- classification_path = self._build_classification_path(feature_classification)
- # 如果路径为空且是分类类型,搜索补全路径
- if not classification_path and best_match.get('特征类型') == '分类':
- feature_name_to_search = best_match.get('人设特征名称', '')
- classification_path = self._search_classification_path(feature_name_to_search)
- is_classification = self._is_classification(best_match.get('人设特征名称', ''), classification_path)
- result_item = {
- '原始特征名称': feature_name,
- '来源层级': level_name,
- '权重': feature.get('权重', 0),
- '所属点名称': item_name,
- '最高匹配信息': {
- '人设特征名称': best_match.get('人设特征名称'),
- '人设特征层级': best_match.get('人设特征层级'),
- '特征类型': best_match.get('特征类型'),
- '特征分类': feature_classification,
- '相似度': best_match.get('匹配结果', {}).get('相似度', 0),
- '匹配说明': best_match.get('匹配结果', {}).get('说明', ''),
- '是分类': is_classification,
- '所属分类路径': classification_path
- }
- }
- results.append(result_item)
- selected_count += 1
- logger.info(f" ✓ {feature_name} → {best_match.get('人设特征名称')} "
- f"(相似度: {max_similarity:.3f}, "
- f"{'分类' if is_classification else '特征'})")
- # 统计信息
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段1完成")
- logger.info(f" 总特征数: {total_features}")
- logger.info(f" 过滤掉(<0.5): {filtered_out_low}")
- logger.info(f" 过滤掉(>=0.8): {filtered_out_high}")
- logger.info(f" 保留(0.5-0.8): {selected_count}")
- logger.info("=" * 60)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage1_filtered_features.json")
- self._save_json(results, output_path)
- return results
- def _build_classification_path(self, feature_classification: List[str]) -> str:
- """
- 构建分类路径
- Args:
- feature_classification: 特征分类数组
- Returns:
- 分类路径
- """
- if not feature_classification:
- return ""
- # 步骤1: 去掉中间元素的"实质"后缀
- cleaned = []
- for i, item in enumerate(feature_classification):
- if i == len(feature_classification) - 1: # 最后一个保留
- cleaned.append(item)
- elif item.endswith("实质") and i != 0: # 中间的去掉"实质"
- cleaned.append(item[:-2])
- else:
- cleaned.append(item)
- # 步骤2: 反转数组
- reversed_list = list(reversed(cleaned))
- # 步骤3: 拼接路径
- path = "/".join(reversed_list)
- return path
- def _is_classification(self, persona_feature_name: str, classification_path: str) -> bool:
- """
- 判断是分类还是特征
- Args:
- persona_feature_name: 人设特征名称
- classification_path: 分类路径
- Returns:
- True: 是分类, False: 是特征
- """
- # 在optimized_clustered_data中查找
- # 如果在特征列表中找到,就是特征
- # 如果作为节点存在且有子节点,就是分类
- # 导航到节点
- node = self._navigate_to_node(classification_path)
- if not node:
- return False
- # 检查是否在特征列表中
- features = node.get('特征列表', [])
- for f in features:
- if f.get('特征名称') == persona_feature_name:
- return False # 在特征列表中,是特征
- # 检查是否作为子节点存在
- if persona_feature_name in node:
- sub_node = node[persona_feature_name]
- if isinstance(sub_node, dict):
- return True # 是子节点,是分类
- return False # 默认是特征
- def _navigate_to_node(self, path: str) -> Optional[Dict[str, Any]]:
- """
- 导航到指定路径的节点
- Args:
- path: 路径,如 "实质/猫咪宠物"
- Returns:
- 节点,未找到返回None
- """
- if not path:
- return None
- parts = path.split('/')
- first_part = parts[0]
- # 确定顶层key
- top_level_map = {
- '意图': '目的点',
- '要素': '目的点',
- '实质': None,
- '形式': None,
- '场景': None
- }
- top_keys = []
- if first_part in top_level_map:
- mapped = top_level_map[first_part]
- if mapped:
- top_keys.append(mapped)
- if not top_keys:
- top_keys = ['灵感点列表', '关键点列表', '目的点']
- # 尝试在每个顶层中查找
- for top_key in top_keys:
- current = self.optimized_clustered_data.get(top_key)
- if not current:
- continue
- # 逐层导航
- found = True
- for part in parts:
- if isinstance(current, dict) and part in current:
- current = current[part]
- else:
- found = False
- break
- if found and isinstance(current, dict):
- return current
- return None
- def _recursive_search(
- self,
- obj: Dict[str, Any],
- target_name: str,
- current_path: str = ""
- ) -> Optional[str]:
- """
- 递归搜索分类节点
- Args:
- obj: 当前搜索的对象
- target_name: 目标分类名称
- current_path: 当前路径
- Returns:
- 找到的完整路径,未找到返回None
- """
- if not isinstance(obj, dict):
- return None
- # 遍历所有键
- for key in obj.keys():
- # 跳过元数据和特征列表
- if key in ['_meta', '特征列表']:
- continue
- # 检查是否匹配
- if target_name in key or key in target_name:
- # 找到匹配,返回路径
- if current_path:
- return f"{current_path}/{key}"
- else:
- return key
- # 递归搜索子节点
- if isinstance(obj[key], dict):
- next_path = f"{current_path}/{key}" if current_path else key
- result = self._recursive_search(obj[key], target_name, next_path)
- if result:
- return result
- return None
- def _search_classification_path(self, classification_name: str) -> str:
- """
- 在optimized_clustered_data中搜索分类节点路径
- Args:
- classification_name: 分类名称,如"实体物品实质"
- Returns:
- 完整路径,如"实质/实体物品",未找到返回空字符串
- """
- if not classification_name:
- return ""
- # 清理名称:去掉常见后缀
- clean_name = classification_name
- for suffix in ['实质', '意图', '形式', '要素']:
- if clean_name.endswith(suffix) and len(clean_name) > len(suffix):
- clean_name = clean_name[:-len(suffix)]
- break
- logger.info(f" 搜索分类: {classification_name} → 清理为: {clean_name}")
- # 在三个顶级列表中搜索
- for top_key in ['灵感点列表', '关键点列表', '目的点']:
- top_data = self.optimized_clustered_data.get(top_key, {})
- if not top_data:
- continue
- # 递归搜索
- path = self._recursive_search(top_data, clean_name, "")
- if path:
- logger.info(f" ✓ 找到路径: {path}")
- return path
- logger.warning(f" ✗ 未找到分类路径: {classification_name}")
- return ""
- # ========== 阶段2:收集关联分类+标签+子分类 ==========
- def stage2_find_associations(self, filtered_features: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段2:查找关联分类,收集分类名称、标签、子分类
- Args:
- filtered_features: 阶段1筛选的特征
- Returns:
- 带关联信息的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段2:查找关联分类")
- logger.info("=" * 60)
- for idx, feature in enumerate(filtered_features, 1):
- logger.info(f"\n[{idx}/{len(filtered_features)}] 处理: {feature['原始特征名称']}")
- match_info = feature['最高匹配信息']
- is_classification = match_info['是分类']
- classification_path = match_info['所属分类路径']
- source_level = match_info['人设特征层级']
- if is_classification:
- # 匹配的是分类,直接用分类路径
- search_path = classification_path
- logger.info(f" 匹配到分类: {search_path}")
- else:
- # 匹配的是特征,用所属分类路径
- search_path = classification_path
- logger.info(f" 匹配到特征,使用所属分类: {search_path}")
- # 查找关联
- associations = self._find_associations(search_path, source_level)
- # 收集关联信息
- feature['找到的关联'] = []
- for assoc in associations:
- target_path = assoc['目标分类']
- logger.info(f" 处理关联: {target_path}")
- # 收集分类信息
- classification_info = self._collect_classification_info(target_path)
- if classification_info:
- feature['找到的关联'].append({
- '来源方向': assoc['来源方向'],
- '关联类型': assoc['关联类型'],
- '目标分类路径': target_path,
- '共同帖子数': assoc['共同帖子数'],
- 'Jaccard相似度': assoc['Jaccard相似度'],
- '分类名称': classification_info['classification_name'],
- '标签列表': classification_info['tags'],
- '子分类列表': classification_info['sub_classifications']
- })
- logger.info(f" 找到 {len(feature['找到的关联'])} 个关联")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage2_associations.json")
- self._save_json(filtered_features, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段2完成")
- logger.info("=" * 60)
- return filtered_features
- def _find_associations(self, classification_path: str, source_level: str) -> List[Dict[str, Any]]:
- """
- 查找关联节点
- Args:
- classification_path: 分类路径
- source_level: 源层级
- Returns:
- 关联节点列表
- """
- associations = []
- # 确定维度名称
- if '灵感点' in source_level:
- dimension_key = '灵感点维度'
- elif '关键点' in source_level:
- dimension_key = '关键点维度'
- elif '目的点' in source_level:
- dimension_key = '目的点维度'
- else:
- return associations
- # 获取维度数据
- single_dim = self.dimension_associations.get('单维度关联分析', {})
- dimension_data = single_dim.get(dimension_key, {})
- if not dimension_data:
- return associations
- # 遍历所有方向
- for direction_key, direction_data in dimension_data.items():
- if direction_key == '说明':
- continue
- # 查找源分类
- if classification_path in direction_data:
- source_data = direction_data[classification_path]
- # 获取关联节点
- for assoc_key in source_data.keys():
- if assoc_key.startswith('与') and assoc_key.endswith('的关联'):
- assoc_list = source_data[assoc_key]
- for assoc_item in assoc_list:
- associations.append({
- '来源方向': direction_key,
- '关联类型': assoc_key,
- '目标分类': assoc_item.get('目标分类'),
- '目标层级': assoc_item.get('目标层级'),
- '共同帖子数': assoc_item.get('共同帖子数'),
- 'Jaccard相似度': assoc_item.get('Jaccard相似度'),
- '共同帖子ID': assoc_item.get('共同帖子ID', [])
- })
- return associations
- def _collect_classification_info(self, classification_path: str) -> Optional[Dict[str, Any]]:
- """
- 收集分类信息:分类名 + 标签 + 子分类
- Args:
- classification_path: 分类路径
- Returns:
- 分类信息
- """
- node = self._navigate_to_node(classification_path)
- if not node:
- return None
- # 分类名称(路径最后一段)
- classification_name = classification_path.split('/')[-1]
- # 标签(特征列表)
- tags = [f.get('特征名称', '') for f in node.get('特征列表', [])]
- # 子分类(子节点,排除_meta和特征列表)
- sub_classifications = [
- key for key in node.keys()
- if isinstance(node[key], dict) and key not in ['_meta', '特征列表']
- ]
- return {
- 'classification_name': classification_name,
- 'tags': tags,
- 'sub_classifications': sub_classifications
- }
- # ========== 阶段3:筛选高相似度匹配(>0.8) ==========
- def stage3_filter_high_similarity_matches(self, associations_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段3:筛选高相似度匹配(>0.8)
- 遍历how解构中的所有原始特征,找出匹配结果中相似度>0.8
- 且人设特征名称在Stage2关联范围内的高质量匹配
- Args:
- associations_data: 阶段2的关联数据
- Returns:
- 带高相似度候选的数据
- """
- logger.info("=" * 60)
- logger.info("阶段3:筛选高相似度匹配(>0.8)")
- logger.info("=" * 60)
- for idx, feature_result in enumerate(associations_data, 1):
- original_feature_name = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{len(associations_data)}] 处理: {original_feature_name}")
- # 步骤1: 收集Stage2的关联范围(分类名+标签)
- stage2_scope = self._collect_stage2_scope(feature_result)
- logger.info(f" Stage2范围包含 {len(stage2_scope)} 个分类/标签")
- # 步骤2: 遍历how解构中的所有原始特征,找出高相似度匹配
- high_sim_candidates = []
- total_checked = 0
- high_sim_found = 0
- how_result = self.how_data.get('how解构结果', {})
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- for item in level_list:
- for step in item.get('how步骤列表', []):
- for feature in step.get('特征列表', []):
- # 获取该特征的所有匹配
- matches = feature.get('匹配结果', [])
- total_checked += len(matches)
- # 筛选相似度>0.8且在Stage2范围内的匹配
- for match in matches:
- sim = match.get('匹配结果', {}).get('相似度', 0)
- persona_feature_name = match.get('人设特征名称', '')
- if sim > 0.8 and persona_feature_name in stage2_scope:
- high_sim_found += 1
- # 记录来源信息
- high_sim_candidates.append({
- '人设特征名称': persona_feature_name,
- '相似度': sim,
- '特征类型': match.get('特征类型', ''),
- '特征分类': match.get('特征分类', []),
- '人设特征层级': match.get('人设特征层级', ''),
- '来源路径': self._build_classification_path(match.get('特征分类', [])),
- '匹配说明': match.get('匹配结果', {}).get('说明', ''),
- '来源原始特征': feature.get('特征名称', '') # 记录来自哪个原始特征
- })
- logger.info(f" 检查了 {total_checked} 个匹配")
- logger.info(f" 找到 {high_sim_found} 个相似度>0.8的匹配")
- # 按相似度降序排序,并去重(同一个人设特征名称只保留最高分)
- seen_names = set()
- unique_candidates = []
- high_sim_candidates.sort(key=lambda x: x['相似度'], reverse=True)
- for candidate in high_sim_candidates:
- name = candidate['人设特征名称']
- if name not in seen_names:
- seen_names.add(name)
- unique_candidates.append(candidate)
- # 添加到结果中
- feature_result['高相似度候选'] = unique_candidates
- logger.info(f" 去重后筛选出 {len(unique_candidates)} 个高相似度候选")
- # 显示前5个
- if unique_candidates:
- logger.info(f" Top 5:")
- for c in unique_candidates[:5]:
- logger.info(f" • {c['人设特征名称']} ({c['相似度']:.3f}) ← 来自\"{c['来源原始特征']}\"")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage3_high_similarity.json")
- self._save_json(associations_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段3完成")
- logger.info("=" * 60)
- return associations_data
- def _collect_stage2_scope(self, feature_result: Dict[str, Any]) -> Set[str]:
- """
- 收集Stage2找到的所有分类名和标签,形成范围集合
- Args:
- feature_result: 特征结果数据
- Returns:
- 包含所有分类名和标签的集合
- """
- scope = set()
- for assoc in feature_result.get('找到的关联', []):
- # 添加分类名
- scope.add(assoc['分类名称'])
- # 添加所有标签
- tags = assoc.get('标签列表', [])
- scope.update(tags)
- return scope
- def _find_features_by_path(self, target_classification: str) -> List[Dict[str, Any]]:
- """
- 根据路径查找特征列表
- Args:
- target_classification: 目标分类路径
- Returns:
- 特征列表
- """
- node = self._navigate_to_node(target_classification)
- if not node:
- return []
- features = node.get('特征列表', [])
- # 深拷贝
- return copy.deepcopy(features)
- # ========== 阶段4:多词组合 + LLM评估 ==========
- def stage4_generate_and_evaluate_search_words(
- self,
- features_data: List[Dict[str, Any]],
- max_workers: int = 4,
- max_candidates: int = 20,
- max_combo_length: int = 4
- ) -> List[Dict[str, Any]]:
- """
- 阶段4:多词组合 + LLM评估
- 基于Stage1的基础词和Stage3的高相似度候选,
- 生成所有2-N词组合,通过LLM评估选出Top10
- Args:
- features_data: 阶段3的数据(包含高相似度候选)
- max_workers: 并发评估的原始特征数(默认4)
- max_candidates: 参与组合的最大候选词数(默认20)
- max_combo_length: 最大组合词数(默认4,即基础词+3个候选)
- Returns:
- 带LLM评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段4:多词组合 + LLM评估")
- logger.info(f" 最大候选词数: {max_candidates}")
- logger.info(f" 最大组合长度: {max_combo_length} 词")
- logger.info(f" 并发数: {max_workers} 个原始特征")
- logger.info("=" * 60)
- total_features = len(features_data)
- # 使用ThreadPoolExecutor并行处理不同的原始特征
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- futures = []
- for idx, feature_result in enumerate(features_data, 1):
- future = executor.submit(
- self._process_single_feature_combinations,
- idx,
- total_features,
- feature_result,
- max_candidates,
- max_combo_length
- )
- futures.append((future, feature_result))
- # 等待所有任务完成并收集结果
- for future, feature_result in futures:
- try:
- _ = future.result() # 等待完成,结果已经写回到feature_result中
- except Exception as e:
- logger.error(f" 评估失败: {feature_result['原始特征名称']}, 错误: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage4_combinations_evaluated.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段4完成")
- logger.info("=" * 60)
- return features_data
- def _process_single_feature_combinations(
- self,
- idx: int,
- total: int,
- feature_result: Dict[str, Any],
- max_candidates: int,
- max_combo_length: int
- ) -> None:
- """
- 处理单个原始特征的组合生成和评估
- Steps:
- 1. Get base_word from Stage1's 最高匹配信息
- 2. Get candidates from Stage3's 高相似度候选 (top max_candidates)
- 3. Generate 2-N word combinations
- 4. LLM batch evaluation
- 5. Select Top 10 and write back
- Args:
- idx: 特征索引
- total: 总特征数
- feature_result: 特征结果数据
- max_candidates: 参与组合的最大候选词数
- max_combo_length: 最大组合词数
- """
- original_feature = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{total}] 处理: {original_feature}")
- # 步骤1: 获取基础词
- base_word = feature_result.get('最高匹配信息', {}).get('人设特征名称', '')
- if not base_word:
- logger.info(f" 无基础词,跳过")
- feature_result['组合评估结果'] = []
- return
- logger.info(f" 基础词: {base_word}")
- # 步骤2: 获取候选词(从高相似度候选中)
- high_sim_candidates = feature_result.get('高相似度候选', [])
- # 限制候选词数量
- candidates = high_sim_candidates[:max_candidates]
- candidate_words = [c['人设特征名称'] for c in candidates]
- if not candidate_words:
- logger.info(f" 无候选词,跳过")
- feature_result['组合评估结果'] = []
- return
- logger.info(f" 候选词数量: {len(candidate_words)} (限制: {max_candidates})")
- # 步骤3: 生成所有组合
- all_combinations = []
- # 生成1词到max_combo_length-1词的候选词组合(因为还要加上base_word)
- for length in range(1, min(max_combo_length, len(candidate_words) + 1)):
- for combo in combinations(candidate_words, length):
- # 组合成搜索词:基础词 + 候选词组合
- search_phrase = base_word + ' ' + ' '.join(combo)
- all_combinations.append({
- 'search_word': search_phrase,
- 'base_word': base_word,
- 'candidate_words': list(combo),
- 'combo_length': length + 1 # +1 因为包含base_word
- })
- logger.info(f" 生成 {len(all_combinations)} 个组合")
- # 步骤4: LLM批量评估
- logger.info(f" 开始LLM评估...")
- evaluated = self.llm_evaluator.evaluate_search_words_in_batches(
- original_feature=original_feature,
- search_words=[c['search_word'] for c in all_combinations],
- batch_size=50
- )
- # 步骤5: 选出Top 10
- top_10 = evaluated[:10]
- # 写回结果
- feature_result['组合评估结果'] = top_10
- max_score = top_10[0]['score'] if top_10 else 0.0
- logger.info(f" 评估完成,Top 10 最高分: {max_score:.3f}")
- # ========== 阶段5:执行搜索 ==========
- def _execute_single_search(
- self,
- idx: int,
- total: int,
- search_word: str,
- feature_ref: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 执行单个搜索任务(用于并发执行)
- Args:
- idx: 搜索索引
- total: 总搜索数
- search_word: 搜索词
- feature_ref: 特征引用(用于写入结果)
- Returns:
- 搜索结果信息
- """
- logger.info(f"[{idx}/{total}] 搜索: {search_word}")
- try:
- result = self.search_client.search(
- keyword=search_word,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 写入结果
- feature_ref['search_result'] = result
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count,
- 'search_params': {
- 'keyword': search_word,
- 'content_type': '图文',
- 'sort_type': '综合'
- }
- }
- return {'status': 'success', 'search_word': search_word, 'note_count': note_count}
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- feature_ref['search_result'] = None
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'failed',
- 'note_count': 0,
- 'error': str(e)
- }
- return {'status': 'failed', 'search_word': search_word, 'error': str(e)}
- def stage5_execute_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0,
- top_n: int = 10
- ) -> List[Dict[str, Any]]:
- """
- 阶段5:执行小红书搜索
- Args:
- features_data: 阶段4的数据
- search_delay: 搜索延迟
- top_n: 每个原始特征取评分最高的N个搜索词
- Returns:
- 带搜索结果的数据
- """
- logger.info("=" * 60)
- logger.info("阶段5:执行小红书搜索")
- logger.info("=" * 60)
- # 按原始特征分组收集搜索词(从Stage4的组合评估结果读取)
- feature_search_groups = {}
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- if original_feature not in feature_search_groups:
- feature_search_groups[original_feature] = []
- # 从Stage4的组合评估结果读取
- for eval_item in feature_result.get('组合评估结果', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- feature_search_groups[original_feature].append({
- 'search_word': sw,
- 'score': score,
- 'feature_ref': eval_item # 引用评估项,用于写入搜索结果
- })
- # 每组取Top N
- all_searches = []
- total_before_filter = 0
- total_filtered = 0
- for original_feature, search_list in feature_search_groups.items():
- total_before_filter += len(search_list)
- # 按分数降序排序
- sorted_list = sorted(search_list, key=lambda x: x['score'], reverse=True)
- # 取前top_n个
- selected = sorted_list[:top_n]
- all_searches.extend(selected)
- filtered = len(sorted_list) - len(selected)
- total_filtered += filtered
- logger.info(f" {original_feature}: 从 {len(sorted_list)} 个搜索词中选择 Top {len(selected)} (过滤 {filtered} 个)")
- # 应用全局搜索次数限制
- if self.max_total_searches and len(all_searches) > self.max_total_searches:
- logger.info(f" 应用全局限制:从 {len(all_searches)} 个减少到 {self.max_total_searches} 个")
- all_searches = all_searches[:self.max_total_searches]
- logger.info(f"\n共 {len(all_searches)} 个搜索任务(过滤前: {total_before_filter}, 过滤掉: {total_filtered})")
- logger.info(f" 并发执行搜索(并发数: {self.search_max_workers})")
- # 使用ThreadPoolExecutor并发执行搜索
- with ThreadPoolExecutor(max_workers=self.search_max_workers) as executor:
- # 提交所有搜索任务
- futures = []
- for idx, item in enumerate(all_searches, 1):
- future = executor.submit(
- self._execute_single_search,
- idx,
- len(all_searches),
- item['search_word'],
- item['feature_ref']
- )
- futures.append(future)
- # 等待所有搜索完成
- for future in as_completed(futures):
- try:
- result = future.result()
- # 结果已经写入feature_ref,无需额外处理
- except Exception as e:
- logger.error(f" 搜索任务失败: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage5_with_search_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段5完成")
- logger.info("=" * 60)
- return features_data
- # ========== 阶段6:LLM评估搜索结果 ==========
- def stage6_evaluate_search_results(
- self,
- features_data: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """
- 阶段6:用LLM评估搜索结果(多模态)
- Args:
- features_data: 阶段5的数据
- Returns:
- 带结果评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段6:LLM评估搜索结果")
- logger.info("=" * 60)
- # 收集所有需要评估的特征节点
- features_to_evaluate = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- if feature.get('search_result') and feature['search_metadata']['status'] == 'success':
- features_to_evaluate.append({
- 'original_feature': original_feature,
- 'feature_node': feature
- })
- logger.info(f"共 {len(features_to_evaluate)} 个搜索结果需要评估")
- # 并行评估(并发数较低)
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = []
- for item in features_to_evaluate:
- future = executor.submit(
- self._evaluate_single_search_result,
- item['original_feature'],
- item['feature_node']
- )
- futures.append((future, item))
- # 收集结果
- for idx, (future, item) in enumerate(futures, 1):
- try:
- evaluation = future.result()
- item['feature_node']['result_evaluation'] = evaluation
- logger.info(f" [{idx}/{len(futures)}] {item['feature_node']['search_word']}: "
- f"relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" 评估失败: {item['feature_node']['search_word']}, 错误: {e}")
- item['feature_node']['result_evaluation'] = None
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage6_with_evaluations.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段6完成")
- logger.info("=" * 60)
- return features_data
- def _evaluate_single_search_result(
- self,
- original_feature: str,
- feature_node: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 评估单个搜索结果(使用并行评估)
- Args:
- original_feature: 原始特征
- feature_node: 特征节点
- Returns:
- 评估结果
- """
- search_word = feature_node.get('search_word', '')
- notes = feature_node['search_result'].get('data', {}).get('data', [])
- return self.llm_evaluator.evaluate_search_results_parallel(
- original_feature=original_feature,
- search_word=search_word,
- notes=notes,
- max_notes=20,
- max_workers=20 # 20个并发评估每个帖子
- )
- # ========== 阶段7:扩展搜索 ==========
- def stage7_extended_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0
- ) -> List[Dict[str, Any]]:
- """
- 阶段7:基于评估结果扩展搜索(多个)
- Args:
- features_data: 阶段6的数据
- search_delay: 搜索延迟
- Returns:
- 带扩展搜索的数据
- """
- logger.info("=" * 60)
- logger.info("阶段7:扩展搜索")
- logger.info("=" * 60)
- # 收集需要扩展搜索的任务
- extension_tasks = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- result_eval = feature.get('result_evaluation')
- if not result_eval:
- continue
- extracted_elements = result_eval.get('extracted_elements', [])
- if not extracted_elements:
- continue
- # 为每个提取的元素创建扩展搜索
- base_search_word = feature.get('search_word', '')
- for element in extracted_elements:
- extended_keyword = f"{base_search_word} {element}"
- extension_tasks.append({
- 'extended_keyword': extended_keyword,
- 'original_feature': original_feature,
- 'feature_node': feature,
- 'element': element
- })
- logger.info(f"共 {len(extension_tasks)} 个扩展搜索任务")
- # 执行扩展搜索
- for idx, task in enumerate(extension_tasks, 1):
- extended_kw = task['extended_keyword']
- logger.info(f"[{idx}/{len(extension_tasks)}] 扩展搜索: {extended_kw}")
- try:
- result = self.search_client.search(
- keyword=extended_kw,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 评估扩展搜索结果
- logger.info(f" 评估扩展搜索结果...")
- evaluation = self.llm_evaluator.evaluate_search_results(
- original_feature=task['original_feature'],
- search_word=extended_kw,
- notes=result.get('data', {}).get('data', []),
- max_notes=20,
- max_images_per_note=2
- )
- # 存储扩展搜索结果
- feature_node = task['feature_node']
- if 'extended_searches' not in feature_node:
- feature_node['extended_searches'] = []
- feature_node['extended_searches'].append({
- 'extended_keyword': extended_kw,
- 'based_on_element': task['element'],
- 'search_result': result,
- 'search_metadata': {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count
- },
- 'result_evaluation': evaluation
- })
- logger.info(f" 评估完成,relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- # 延迟
- if idx < len(extension_tasks):
- time.sleep(search_delay)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage7_final_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段7完成")
- logger.info("=" * 60)
- return features_data
- # ========== 主流程 ==========
- def run_full_pipeline(self):
- """执行完整流程"""
- logger.info("\n" + "=" * 60)
- logger.info("开始执行完整流程")
- logger.info("=" * 60)
- try:
- # 阶段1
- stage1_results = self.stage1_filter_features()
- # 阶段2
- stage2_results = self.stage2_find_associations(stage1_results)
- # 阶段3 - 使用新方法:筛选高相似度匹配
- stage3_results = self.stage3_filter_high_similarity_matches(stage2_results)
- # 阶段4
- stage4_results = self.stage4_generate_and_evaluate_search_words(
- stage3_results,
- max_workers=8, # 提高并发从4到8
- max_combo_length=3 # 降低组合长度从4到3
- )
- # 阶段5
- stage5_results = self.stage5_execute_searches(stage4_results, search_delay=2.0, top_n=self.top_n)
- # 阶段6 - 暂时切断执行(代码保留)
- # stage6_results = self.stage6_evaluate_search_results(stage5_results)
- # 阶段7 - 暂时切断执行(代码保留)
- # final_results = self.stage7_extended_searches(stage6_results, search_delay=2.0)
- logger.info("\n" + "=" * 60)
- logger.info("✓ 完整流程执行完成(Stage1-5)")
- logger.info("=" * 60)
- # 自动执行可视化
- logger.info("\n" + "=" * 60)
- logger.info("开始生成可视化...")
- logger.info("=" * 60)
- try:
- result = subprocess.run(
- ['python3', 'visualize_stage5_results.py'],
- capture_output=True,
- text=True,
- timeout=60
- )
- if result.returncode == 0:
- logger.info("✓ 可视化生成成功")
- logger.info(result.stdout)
- else:
- logger.error(f"可视化生成失败: {result.stderr}")
- except subprocess.TimeoutExpired:
- logger.error("可视化生成超时")
- except Exception as e:
- logger.error(f"可视化生成异常: {e}")
- return stage5_results
- except Exception as e:
- logger.error(f"流程执行失败: {e}")
- raise
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='增强搜索系统V2')
- parser.add_argument(
- '--how-json',
- default='69114f150000000007001f30_how copy.json',
- help='How解构文件路径'
- )
- parser.add_argument(
- '--dimension-associations',
- default='dimension_associations_analysis.json',
- help='维度关联文件路径'
- )
- parser.add_argument(
- '--optimized-clustered',
- default='optimized_clustered_data_gemini-3-pro-preview.json',
- help='人设特征库路径'
- )
- parser.add_argument(
- '--api-key',
- default=None,
- help='OpenRouter API密钥(默认从环境变量读取)'
- )
- parser.add_argument(
- '--output-dir',
- default='output_v2',
- help='输出目录'
- )
- parser.add_argument(
- '--top-n',
- type=int,
- default=10,
- help='每个原始特征取评分最高的N个搜索词(默认10)'
- )
- parser.add_argument(
- '--max-total-searches',
- type=int,
- default=None,
- help='全局最大搜索次数限制(默认None不限制)'
- )
- parser.add_argument(
- '--search-workers',
- type=int,
- default=3,
- help='搜索并发数(默认3)'
- )
- args = parser.parse_args()
- # 创建系统实例
- system = EnhancedSearchV2(
- how_json_path=args.how_json,
- dimension_associations_path=args.dimension_associations,
- optimized_clustered_data_path=args.optimized_clustered,
- openrouter_api_key=args.api_key,
- output_dir=args.output_dir,
- top_n=args.top_n,
- max_total_searches=args.max_total_searches,
- search_max_workers=args.search_workers
- )
- # 执行完整流程
- system.run_full_pipeline()
- if __name__ == '__main__':
- main()
|