| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 增强搜索系统 V2
- 支持LLM评估和扩展搜索的完整流程
- """
- import json
- import logging
- import copy
- import time
- import os
- import argparse
- import subprocess
- from typing import Dict, List, Any, Optional, Set, Tuple
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from itertools import combinations
- from openrouter_client import OpenRouterClient
- from llm_evaluator import LLMEvaluator
- from xiaohongshu_search import XiaohongshuSearch
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- logging.FileHandler('enhanced_search_v2.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class EnhancedSearchV2:
- """增强搜索系统V2"""
- def __init__(
- self,
- how_json_path: str,
- dimension_associations_path: str,
- optimized_clustered_data_path: str,
- openrouter_api_key: Optional[str] = None,
- output_dir: str = "output_v2",
- top_n: int = 10,
- max_total_searches: Optional[int] = None,
- search_max_workers: int = 3
- ):
- """
- 初始化系统
- Args:
- how_json_path: How解构文件路径
- dimension_associations_path: 维度关联文件路径
- optimized_clustered_data_path: 人设特征库路径
- openrouter_api_key: OpenRouter API密钥
- output_dir: 输出目录
- top_n: 每个原始特征取评分最高的N个搜索词(默认10)
- max_total_searches: 全局最大搜索次数限制(默认None不限制)
- search_max_workers: 搜索并发数(默认3)
- """
- self.how_json_path = how_json_path
- self.dimension_associations_path = dimension_associations_path
- self.optimized_clustered_data_path = optimized_clustered_data_path
- self.output_dir = output_dir
- self.top_n = top_n
- self.max_total_searches = max_total_searches
- self.search_max_workers = search_max_workers
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- # 加载数据
- logger.info("加载数据文件...")
- self.how_data = self._load_json(how_json_path)
- self.dimension_associations = self._load_json(dimension_associations_path)
- self.optimized_clustered_data = self._load_json(optimized_clustered_data_path)
- # 初始化组件
- logger.info("初始化组件...")
- self.openrouter_client = OpenRouterClient(
- api_key=openrouter_api_key,
- model="google/gemini-2.5-flash",
- retry_delay=5 # 增加重试延迟避免限流
- )
- self.llm_evaluator = LLMEvaluator(self.openrouter_client)
- self.search_client = XiaohongshuSearch()
- logger.info("系统初始化完成")
- def _load_json(self, file_path: str) -> Any:
- """加载JSON文件"""
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"加载文件失败 {file_path}: {e}")
- raise
- def _save_json(self, data: Any, file_path: str):
- """保存JSON文件"""
- try:
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- logger.info(f"已保存: {file_path}")
- except Exception as e:
- logger.error(f"保存文件失败 {file_path}: {e}")
- raise
- # ========== 阶段1:筛选 0.5 <= 相似度 < 0.8 的特征 ==========
- def stage1_filter_features(self) -> List[Dict[str, Any]]:
- """
- 阶段1:筛选中等匹配度特征
- 筛选条件:0.5 <= 最高相似度 < 0.8
- Returns:
- 筛选后的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段1:筛选中等匹配度特征 (0.5 <= 相似度 < 0.8)")
- logger.info("=" * 60)
- results = []
- how_result = self.how_data.get('how解构结果', {})
- total_features = 0
- filtered_out_low = 0 # < 0.5
- filtered_out_high = 0 # >= 0.8
- selected_count = 0
- # 遍历三个维度
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- logger.info(f"\n处理 {level_name}...")
- for item_idx, item in enumerate(level_list):
- item_name = item.get('名称', f'未命名-{item_idx}')
- how_steps = item.get('how步骤列表', [])
- for step in how_steps:
- features = step.get('特征列表', [])
- for feature in features:
- feature_name = feature.get('特征名称', '')
- match_results = feature.get('匹配结果', [])
- total_features += 1
- if not match_results:
- continue
- # 找到最高相似度
- max_similarity = max(
- (m.get('匹配结果', {}).get('相似度', 0) for m in match_results),
- default=0
- )
- # 筛选条件
- if max_similarity < 0.5:
- filtered_out_low += 1
- continue
- elif max_similarity >= 0.8:
- filtered_out_high += 1
- continue
- # 0.5 <= max_similarity < 0.8,保留
- # 按相似度降序排序,取前3个
- sorted_matches = sorted(
- match_results,
- key=lambda x: x.get('匹配结果', {}).get('相似度', 0),
- reverse=True
- )
- top3_matches = sorted_matches[:3] # 取前3个
- # 构建top3匹配信息列表
- top3_match_info = []
- for match in top3_matches:
- feature_classification = match.get('特征分类', [])
- classification_path = self._build_classification_path(feature_classification)
- # 如果路径为空且是分类类型,搜索补全路径
- if not classification_path and match.get('特征类型') == '分类':
- feature_name_to_search = match.get('人设特征名称', '')
- classification_path = self._search_classification_path(feature_name_to_search)
- is_classification = self._is_classification(match.get('人设特征名称', ''), classification_path)
- top3_match_info.append({
- '人设特征名称': match.get('人设特征名称'),
- '人设特征层级': match.get('人设特征层级'),
- '特征类型': match.get('特征类型'),
- '特征分类': feature_classification,
- '相似度': match.get('匹配结果', {}).get('相似度', 0),
- '匹配说明': match.get('匹配结果', {}).get('说明', ''),
- '是分类': is_classification,
- '所属分类路径': classification_path
- })
- result_item = {
- '原始特征名称': feature_name,
- '来源层级': level_name,
- '权重': feature.get('权重', 0),
- '所属点名称': item_name,
- '最高匹配信息': top3_match_info[0], # 保留第1个用于Stage2
- 'top3匹配信息': top3_match_info # 新增字段
- }
- results.append(result_item)
- selected_count += 1
- # 显示top3匹配信息
- top3_names = [m['人设特征名称'] for m in top3_match_info]
- logger.info(f" ✓ {feature_name} → Top{len(top3_match_info)}: {', '.join(top3_names)}")
- # 统计信息
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段1完成")
- logger.info(f" 总特征数: {total_features}")
- logger.info(f" 过滤掉(<0.5): {filtered_out_low}")
- logger.info(f" 过滤掉(>=0.8): {filtered_out_high}")
- logger.info(f" 保留(0.5-0.8): {selected_count}")
- logger.info("=" * 60)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage1_filtered_features.json")
- self._save_json(results, output_path)
- return results
- def _build_classification_path(self, feature_classification: List[str]) -> str:
- """
- 构建分类路径
- Args:
- feature_classification: 特征分类数组
- Returns:
- 分类路径
- """
- if not feature_classification:
- return ""
- # 步骤1: 去掉中间元素的"实质"后缀
- cleaned = []
- for i, item in enumerate(feature_classification):
- if i == len(feature_classification) - 1: # 最后一个保留
- cleaned.append(item)
- elif item.endswith("实质") and i != 0: # 中间的去掉"实质"
- cleaned.append(item[:-2])
- else:
- cleaned.append(item)
- # 步骤2: 反转数组
- reversed_list = list(reversed(cleaned))
- # 步骤3: 拼接路径
- path = "/".join(reversed_list)
- return path
- def _is_classification(self, persona_feature_name: str, classification_path: str) -> bool:
- """
- 判断是分类还是特征
- Args:
- persona_feature_name: 人设特征名称
- classification_path: 分类路径
- Returns:
- True: 是分类, False: 是特征
- """
- # 在optimized_clustered_data中查找
- # 如果在特征列表中找到,就是特征
- # 如果作为节点存在且有子节点,就是分类
- # 导航到节点
- node = self._navigate_to_node(classification_path)
- if not node:
- return False
- # 检查是否在特征列表中
- features = node.get('特征列表', [])
- for f in features:
- if f.get('特征名称') == persona_feature_name:
- return False # 在特征列表中,是特征
- # 检查是否作为子节点存在
- if persona_feature_name in node:
- sub_node = node[persona_feature_name]
- if isinstance(sub_node, dict):
- return True # 是子节点,是分类
- return False # 默认是特征
- def _navigate_to_node(self, path: str) -> Optional[Dict[str, Any]]:
- """
- 导航到指定路径的节点
- Args:
- path: 路径,如 "实质/猫咪宠物"
- Returns:
- 节点,未找到返回None
- """
- if not path:
- return None
- parts = path.split('/')
- first_part = parts[0]
- # 确定顶层key
- top_level_map = {
- '意图': '目的点',
- '要素': '目的点',
- '实质': None,
- '形式': None,
- '场景': None
- }
- top_keys = []
- if first_part in top_level_map:
- mapped = top_level_map[first_part]
- if mapped:
- top_keys.append(mapped)
- if not top_keys:
- top_keys = ['灵感点列表', '关键点列表', '目的点']
- # 尝试在每个顶层中查找
- for top_key in top_keys:
- current = self.optimized_clustered_data.get(top_key)
- if not current:
- continue
- # 逐层导航
- found = True
- for part in parts:
- if isinstance(current, dict) and part in current:
- current = current[part]
- else:
- found = False
- break
- if found and isinstance(current, dict):
- return current
- return None
- def _recursive_search(
- self,
- obj: Dict[str, Any],
- target_name: str,
- current_path: str = ""
- ) -> Optional[str]:
- """
- 递归搜索分类节点
- Args:
- obj: 当前搜索的对象
- target_name: 目标分类名称
- current_path: 当前路径
- Returns:
- 找到的完整路径,未找到返回None
- """
- if not isinstance(obj, dict):
- return None
- # 遍历所有键
- for key in obj.keys():
- # 跳过元数据和特征列表
- if key in ['_meta', '特征列表']:
- continue
- # 检查是否匹配
- if target_name in key or key in target_name:
- # 找到匹配,返回路径
- if current_path:
- return f"{current_path}/{key}"
- else:
- return key
- # 递归搜索子节点
- if isinstance(obj[key], dict):
- next_path = f"{current_path}/{key}" if current_path else key
- result = self._recursive_search(obj[key], target_name, next_path)
- if result:
- return result
- return None
- def _search_classification_path(self, classification_name: str) -> str:
- """
- 在optimized_clustered_data中搜索分类节点路径
- Args:
- classification_name: 分类名称,如"实体物品实质"
- Returns:
- 完整路径,如"实质/实体物品",未找到返回空字符串
- """
- if not classification_name:
- return ""
- # 清理名称:去掉常见后缀
- clean_name = classification_name
- for suffix in ['实质', '意图', '形式', '要素']:
- if clean_name.endswith(suffix) and len(clean_name) > len(suffix):
- clean_name = clean_name[:-len(suffix)]
- break
- logger.info(f" 搜索分类: {classification_name} → 清理为: {clean_name}")
- # 在三个顶级列表中搜索
- for top_key in ['灵感点列表', '关键点列表', '目的点']:
- top_data = self.optimized_clustered_data.get(top_key, {})
- if not top_data:
- continue
- # 递归搜索
- path = self._recursive_search(top_data, clean_name, "")
- if path:
- logger.info(f" ✓ 找到路径: {path}")
- return path
- logger.warning(f" ✗ 未找到分类路径: {classification_name}")
- return ""
- # ========== 阶段2:收集关联分类+标签+子分类 ==========
- def stage2_find_associations(self, filtered_features: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段2:查找关联分类,收集分类名称、标签、子分类
- 改进: 为top3的每个base_word都查找关联
- Args:
- filtered_features: 阶段1筛选的特征
- Returns:
- 带关联信息的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段2:查找关联分类(为每个base_word)")
- logger.info("=" * 60)
- for idx, feature in enumerate(filtered_features, 1):
- logger.info(f"\n[{idx}/{len(filtered_features)}] 处理: {feature['原始特征名称']}")
- # 获取top3 base_words
- top3_info = feature.get('top3匹配信息', [])
- if not top3_info:
- logger.warning(f" 无top3匹配信息,跳过")
- feature['找到的关联_按base_word'] = {}
- continue
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 为每个base_word查找关联
- associations_by_base_word = {}
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- is_classification = base_info['是分类']
- classification_path = base_info['所属分类路径']
- source_level = base_info['人设特征层级']
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word}")
- if is_classification:
- search_path = classification_path
- logger.info(f" 匹配到分类: {search_path}")
- else:
- search_path = classification_path
- logger.info(f" 匹配到特征,使用所属分类: {search_path}")
- # 查找关联
- associations = self._find_associations(search_path, source_level)
- # 收集关联信息
- base_word_associations = []
- for assoc in associations:
- target_path = assoc['目标分类']
- # 收集分类信息
- classification_info = self._collect_classification_info(target_path)
- if classification_info:
- base_word_associations.append({
- '来源方向': assoc['来源方向'],
- '关联类型': assoc['关联类型'],
- '目标分类路径': target_path,
- '共同帖子数': assoc['共同帖子数'],
- 'Jaccard相似度': assoc['Jaccard相似度'],
- '分类名称': classification_info['classification_name'],
- '标签列表': classification_info['tags'],
- '子分类列表': classification_info['sub_classifications']
- })
- associations_by_base_word[base_word] = base_word_associations
- logger.info(f" 找到 {len(base_word_associations)} 个关联")
- # 保存结果
- feature['找到的关联_按base_word'] = associations_by_base_word
- # 向后兼容:保留基于最高匹配信息的关联(即第1个base_word的关联)
- first_base_word = top3_info[0].get('人设特征名称', '')
- feature['找到的关联'] = associations_by_base_word.get(first_base_word, [])
- total_associations = sum(len(v) for v in associations_by_base_word.values())
- logger.info(f" 总共找到 {total_associations} 个关联({len(associations_by_base_word)} 个base_word)")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage2_associations.json")
- self._save_json(filtered_features, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段2完成")
- logger.info("=" * 60)
- return filtered_features
- def _find_associations(self, classification_path: str, source_level: str) -> List[Dict[str, Any]]:
- """
- 查找关联节点
- Args:
- classification_path: 分类路径
- source_level: 源层级
- Returns:
- 关联节点列表
- """
- associations = []
- # 确定维度名称
- if '灵感点' in source_level:
- dimension_key = '灵感点维度'
- elif '关键点' in source_level:
- dimension_key = '关键点维度'
- elif '目的点' in source_level:
- dimension_key = '目的点维度'
- else:
- return associations
- # 获取维度数据
- single_dim = self.dimension_associations.get('单维度关联分析', {})
- dimension_data = single_dim.get(dimension_key, {})
- if not dimension_data:
- return associations
- # 遍历所有方向
- for direction_key, direction_data in dimension_data.items():
- if direction_key == '说明':
- continue
- # 查找源分类
- if classification_path in direction_data:
- source_data = direction_data[classification_path]
- # 获取关联节点
- for assoc_key in source_data.keys():
- if assoc_key.startswith('与') and assoc_key.endswith('的关联'):
- assoc_list = source_data[assoc_key]
- for assoc_item in assoc_list:
- associations.append({
- '来源方向': direction_key,
- '关联类型': assoc_key,
- '目标分类': assoc_item.get('目标分类'),
- '目标层级': assoc_item.get('目标层级'),
- '共同帖子数': assoc_item.get('共同帖子数'),
- 'Jaccard相似度': assoc_item.get('Jaccard相似度'),
- '共同帖子ID': assoc_item.get('共同帖子ID', [])
- })
- return associations
- def _collect_classification_info(self, classification_path: str) -> Optional[Dict[str, Any]]:
- """
- 收集分类信息:分类名 + 标签 + 子分类
- Args:
- classification_path: 分类路径
- Returns:
- 分类信息
- """
- node = self._navigate_to_node(classification_path)
- if not node:
- return None
- # 分类名称(路径最后一段)
- classification_name = classification_path.split('/')[-1]
- # 标签(特征列表)
- tags = [f.get('特征名称', '') for f in node.get('特征列表', [])]
- # 子分类(子节点,排除_meta和特征列表)
- sub_classifications = [
- key for key in node.keys()
- if isinstance(node[key], dict) and key not in ['_meta', '特征列表']
- ]
- return {
- 'classification_name': classification_name,
- 'tags': tags,
- 'sub_classifications': sub_classifications
- }
- # ========== 阶段3:筛选高相似度匹配(>0.8) ==========
- def stage3_filter_high_similarity_matches(self, associations_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段3:筛选高相似度匹配(>0.8)
- 改进:为每个base_word独立筛选候选词
- 基于该base_word的关联范围,在how解构中找出相似度>0.8的匹配
- Args:
- associations_data: 阶段2的关联数据
- Returns:
- 带高相似度候选的数据
- """
- logger.info("=" * 60)
- logger.info("阶段3:筛选高相似度匹配(>0.8,为每个base_word)")
- logger.info("=" * 60)
- for idx, feature_result in enumerate(associations_data, 1):
- original_feature_name = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{len(associations_data)}] 处理: {original_feature_name}")
- # 获取top3 base_words
- top3_info = feature_result.get('top3匹配信息', [])
- associations_by_base_word = feature_result.get('找到的关联_按base_word', {})
- if not top3_info or not associations_by_base_word:
- logger.warning(f" 无top3匹配信息或关联数据,跳过")
- feature_result['高相似度候选_按base_word'] = {}
- continue
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 为每个base_word独立筛选候选词
- candidates_by_base_word = {}
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word}")
- # 步骤1: 收集该base_word的关联范围
- base_word_associations = associations_by_base_word.get(base_word, [])
- base_word_scope = self._collect_scope_from_associations(base_word_associations)
- logger.info(f" 关联范围包含 {len(base_word_scope)} 个分类/标签")
- if not base_word_scope:
- logger.warning(f" 无关联范围,跳过")
- candidates_by_base_word[base_word] = []
- continue
- # 步骤2: 遍历how解构,找出高相似度匹配
- high_sim_candidates = []
- total_checked = 0
- high_sim_found = 0
- how_result = self.how_data.get('how解构结果', {})
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- for item in level_list:
- for step in item.get('how步骤列表', []):
- for feature in step.get('特征列表', []):
- matches = feature.get('匹配结果', [])
- total_checked += len(matches)
- # 筛选相似度>0.8且在该base_word的范围内的匹配
- for match in matches:
- sim = match.get('匹配结果', {}).get('相似度', 0)
- persona_feature_name = match.get('人设特征名称', '')
- if sim > 0.8 and persona_feature_name in base_word_scope:
- high_sim_found += 1
- high_sim_candidates.append({
- '人设特征名称': persona_feature_name,
- '相似度': sim,
- '特征类型': match.get('特征类型', ''),
- '特征分类': match.get('特征分类', []),
- '人设特征层级': match.get('人设特征层级', ''),
- '来源路径': self._build_classification_path(match.get('特征分类', [])),
- '匹配说明': match.get('匹配结果', {}).get('说明', ''),
- '来源原始特征': feature.get('特征名称', '')
- })
- logger.info(f" 检查了 {total_checked} 个匹配")
- logger.info(f" 找到 {high_sim_found} 个相似度>0.8的匹配")
- # 按相似度降序排序并去重
- seen_names = set()
- unique_candidates = []
- high_sim_candidates.sort(key=lambda x: x['相似度'], reverse=True)
- for candidate in high_sim_candidates:
- name = candidate['人设特征名称']
- if name not in seen_names:
- seen_names.add(name)
- unique_candidates.append(candidate)
- candidates_by_base_word[base_word] = unique_candidates
- logger.info(f" 去重后筛选出 {len(unique_candidates)} 个候选")
- # 显示前5个
- if unique_candidates:
- logger.info(f" Top 5:")
- for c in unique_candidates[:5]:
- logger.info(f" • {c['人设特征名称']} ({c['相似度']:.3f}) ← 来自\"{c['来源原始特征']}\"")
- # 保存结果
- feature_result['高相似度候选_按base_word'] = candidates_by_base_word
- # 向后兼容:保留第1个base_word的候选
- first_base_word = top3_info[0].get('人设特征名称', '')
- feature_result['高相似度候选'] = candidates_by_base_word.get(first_base_word, [])
- total_candidates = sum(len(v) for v in candidates_by_base_word.values())
- logger.info(f" 总共筛选出 {total_candidates} 个候选({len(candidates_by_base_word)} 个base_word)")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage3_high_similarity.json")
- self._save_json(associations_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段3完成")
- logger.info("=" * 60)
- return associations_data
- def _collect_scope_from_associations(self, associations: List[Dict[str, Any]]) -> Set[str]:
- """
- 从关联列表中收集所有分类名和标签,形成范围集合
- Args:
- associations: 关联列表
- Returns:
- 包含所有分类名和标签的集合
- """
- scope = set()
- for assoc in associations:
- # 添加分类名
- scope.add(assoc['分类名称'])
- # 添加所有标签
- tags = assoc.get('标签列表', [])
- scope.update(tags)
- return scope
- def _collect_stage2_scope(self, feature_result: Dict[str, Any]) -> Set[str]:
- """
- 收集Stage2找到的所有分类名和标签,形成范围集合(兼容旧方法)
- Args:
- feature_result: 特征结果数据
- Returns:
- 包含所有分类名和标签的集合
- """
- associations = feature_result.get('找到的关联', [])
- return self._collect_scope_from_associations(associations)
- def _find_features_by_path(self, target_classification: str) -> List[Dict[str, Any]]:
- """
- 根据路径查找特征列表
- Args:
- target_classification: 目标分类路径
- Returns:
- 特征列表
- """
- node = self._navigate_to_node(target_classification)
- if not node:
- return []
- features = node.get('特征列表', [])
- # 深拷贝
- return copy.deepcopy(features)
- # ========== 阶段4:多词组合 + LLM评估 ==========
- def stage4_generate_and_evaluate_search_words(
- self,
- features_data: List[Dict[str, Any]],
- max_workers: int = 4,
- max_candidates: int = 20,
- max_combo_length: int = 4
- ) -> List[Dict[str, Any]]:
- """
- 阶段4:多词组合 + LLM评估
- 基于Stage1的基础词和Stage3的高相似度候选,
- 生成所有2-N词组合,通过LLM评估选出Top10
- Args:
- features_data: 阶段3的数据(包含高相似度候选)
- max_workers: 并发评估的原始特征数(默认4)
- max_candidates: 参与组合的最大候选词数(默认20)
- max_combo_length: 最大组合词数(默认4,即基础词+3个候选)
- Returns:
- 带LLM评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段4:多词组合 + LLM评估")
- logger.info(f" 最大候选词数: {max_candidates}")
- logger.info(f" 最大组合长度: {max_combo_length} 词")
- logger.info(f" 并发数: {max_workers} 个原始特征")
- logger.info("=" * 60)
- total_features = len(features_data)
- # 使用ThreadPoolExecutor并行处理不同的原始特征
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- futures = []
- for idx, feature_result in enumerate(features_data, 1):
- future = executor.submit(
- self._process_single_feature_combinations,
- idx,
- total_features,
- feature_result,
- max_candidates,
- max_combo_length
- )
- futures.append((future, feature_result))
- # 等待所有任务完成并收集结果
- for future, feature_result in futures:
- try:
- _ = future.result() # 等待完成,结果已经写回到feature_result中
- except Exception as e:
- logger.error(f" 评估失败: {feature_result['原始特征名称']}, 错误: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage4_combinations_evaluated.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段4完成")
- logger.info("=" * 60)
- return features_data
- def _process_single_feature_combinations(
- self,
- idx: int,
- total: int,
- feature_result: Dict[str, Any],
- max_candidates: int,
- max_combo_length: int
- ) -> None:
- """
- 处理单个原始特征的组合生成和评估
- 改进: 每个base_word使用自己的候选词(而不是共享)
- Steps:
- 1. Get top3 base_words from Stage1's top3匹配信息
- 2. For each base_word:
- a. Get candidates from Stage3's 高相似度候选_按base_word
- b. Generate combinations
- c. LLM evaluation
- d. Select Top 10
- 3. Save grouped results
- Args:
- idx: 特征索引
- total: 总特征数
- feature_result: 特征结果数据
- max_candidates: 参与组合的最大候选词数
- max_combo_length: 最大组合词数
- """
- original_feature = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{total}] 处理: {original_feature}")
- # 步骤1: 获取top3基础词
- top3_info = feature_result.get('top3匹配信息', [])
- if not top3_info:
- logger.info(f" 无top3匹配信息,跳过")
- feature_result['组合评估结果_分组'] = []
- return
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 步骤2: 获取按base_word分组的候选词
- candidates_by_base_word = feature_result.get('高相似度候选_按base_word', {})
- if not candidates_by_base_word:
- logger.warning(f" 无按base_word分组的候选词,跳过")
- feature_result['组合评估结果_分组'] = []
- return
- # 步骤3: 为每个base_word独立处理
- grouped_results = []
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- base_similarity = base_info.get('相似度', 0)
- if not base_word:
- continue
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word} (相似度: {base_similarity:.3f})")
- # 获取该base_word的候选词
- base_candidates = candidates_by_base_word.get(base_word, [])
- candidates = base_candidates[:max_candidates]
- candidate_words = [c['人设特征名称'] for c in candidates]
- if not candidate_words:
- logger.warning(f" 该base_word无候选词,跳过")
- grouped_results.append({
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'base_word_info': base_info,
- 'top10_searches': [],
- 'available_words': []
- })
- continue
- logger.info(f" 候选词数量: {len(candidate_words)} (限制: {max_candidates})")
- # 生成组合
- combinations_for_base = []
- for length in range(1, min(max_combo_length, len(candidate_words) + 1)):
- for combo in combinations(candidate_words, length):
- search_phrase = base_word + ' ' + ' '.join(combo)
- combinations_for_base.append({
- 'search_word': search_phrase,
- 'base_word': base_word,
- 'candidate_words': list(combo),
- 'combo_length': length + 1
- })
- logger.info(f" 生成 {len(combinations_for_base)} 个组合")
- # LLM评估
- logger.info(f" 开始LLM评估...")
- evaluated = self.llm_evaluator.evaluate_search_words_in_batches(
- original_feature=original_feature,
- search_words=[c['search_word'] for c in combinations_for_base],
- batch_size=50
- )
- # 选出Top 10
- top_10 = evaluated[:10]
- max_score = top_10[0]['score'] if top_10 else 0.0
- logger.info(f" 评估完成,Top 10 最高分: {max_score:.3f}")
- # 保存分组结果 - 每个base_word有自己的available_words
- grouped_results.append({
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'base_word_info': base_info,
- 'top10_searches': top_10,
- 'available_words': candidate_words # 该base_word自己的候选词
- })
- # 写回结果
- feature_result['组合评估结果_分组'] = grouped_results
- total_searches = sum(len(g['top10_searches']) for g in grouped_results)
- logger.info(f" 完成!共 {len(grouped_results)} 个base_word,{total_searches} 个搜索词")
- # ========== 阶段5:执行搜索 ==========
- def _execute_single_search(
- self,
- idx: int,
- total: int,
- search_word: str,
- feature_ref: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 执行单个搜索任务(用于并发执行)
- Args:
- idx: 搜索索引
- total: 总搜索数
- search_word: 搜索词
- feature_ref: 特征引用(用于写入结果)
- Returns:
- 搜索结果信息
- """
- logger.info(f"[{idx}/{total}] 搜索: {search_word}")
- try:
- result = self.search_client.search(
- keyword=search_word,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 写入结果
- feature_ref['search_result'] = result
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count,
- 'search_params': {
- 'keyword': search_word,
- 'content_type': '图文',
- 'sort_type': '综合'
- }
- }
- return {'status': 'success', 'search_word': search_word, 'note_count': note_count}
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- feature_ref['search_result'] = None
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'failed',
- 'note_count': 0,
- 'error': str(e)
- }
- return {'status': 'failed', 'search_word': search_word, 'error': str(e)}
- def stage5_execute_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0,
- top_n: int = 10
- ) -> List[Dict[str, Any]]:
- """
- 阶段5:执行小红书搜索
- Args:
- features_data: 阶段4的数据
- search_delay: 搜索延迟
- top_n: 每个原始特征取评分最高的N个搜索词
- Returns:
- 带搜索结果的数据
- """
- logger.info("=" * 60)
- logger.info("阶段5:执行小红书搜索")
- logger.info("=" * 60)
- # 按原始特征分组收集搜索词(从Stage4的组合评估结果_分组读取)
- feature_search_groups = {}
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- if original_feature not in feature_search_groups:
- feature_search_groups[original_feature] = []
- # 从Stage4的组合评估结果_分组读取(新结构)
- grouped_results = feature_result.get('组合评估结果_分组', [])
- if grouped_results:
- # 使用分组结构:每个base_word的top10都执行
- for group in grouped_results:
- base_word = group.get('base_word', '')
- base_similarity = group.get('base_word_similarity', 0)
- for eval_item in group.get('top10_searches', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- feature_search_groups[original_feature].append({
- 'search_word': sw,
- 'score': score,
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'feature_ref': eval_item # 引用评估项,用于写入搜索结果
- })
- else:
- # 兼容旧结构(组合评估结果)
- for eval_item in feature_result.get('组合评估结果', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- feature_search_groups[original_feature].append({
- 'search_word': sw,
- 'score': score,
- 'feature_ref': eval_item
- })
- # 收集所有搜索任务(分组结构下执行所有base_word的top10,不再过滤)
- all_searches = []
- total_count = 0
- for original_feature, search_list in feature_search_groups.items():
- total_count += len(search_list)
- all_searches.extend(search_list)
- logger.info(f" {original_feature}: {len(search_list)} 个搜索词")
- # 应用全局搜索次数限制
- if self.max_total_searches and len(all_searches) > self.max_total_searches:
- logger.info(f" 应用全局限制:从 {len(all_searches)} 个减少到 {self.max_total_searches} 个")
- all_searches = all_searches[:self.max_total_searches]
- logger.info(f"\n共 {len(all_searches)} 个搜索任务")
- logger.info(f" 并发执行搜索(并发数: {self.search_max_workers})")
- # 使用ThreadPoolExecutor并发执行搜索
- with ThreadPoolExecutor(max_workers=self.search_max_workers) as executor:
- # 提交所有搜索任务
- futures = []
- for idx, item in enumerate(all_searches, 1):
- future = executor.submit(
- self._execute_single_search,
- idx,
- len(all_searches),
- item['search_word'],
- item['feature_ref']
- )
- futures.append(future)
- # 等待所有搜索完成
- for future in as_completed(futures):
- try:
- result = future.result()
- # 结果已经写入feature_ref,无需额外处理
- except Exception as e:
- logger.error(f" 搜索任务失败: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage5_with_search_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段5完成")
- logger.info("=" * 60)
- return features_data
- # ========== 阶段6:LLM评估搜索结果 ==========
- def stage6_evaluate_search_results(
- self,
- features_data: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """
- 阶段6:用LLM评估搜索结果(多模态)
- Args:
- features_data: 阶段5的数据
- Returns:
- 带结果评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段6:LLM评估搜索结果")
- logger.info("=" * 60)
- # 收集所有需要评估的特征节点
- features_to_evaluate = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- if feature.get('search_result') and feature['search_metadata']['status'] == 'success':
- features_to_evaluate.append({
- 'original_feature': original_feature,
- 'feature_node': feature
- })
- logger.info(f"共 {len(features_to_evaluate)} 个搜索结果需要评估")
- # 并行评估(并发数较低)
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = []
- for item in features_to_evaluate:
- future = executor.submit(
- self._evaluate_single_search_result,
- item['original_feature'],
- item['feature_node']
- )
- futures.append((future, item))
- # 收集结果
- for idx, (future, item) in enumerate(futures, 1):
- try:
- evaluation = future.result()
- item['feature_node']['result_evaluation'] = evaluation
- logger.info(f" [{idx}/{len(futures)}] {item['feature_node']['search_word']}: "
- f"relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" 评估失败: {item['feature_node']['search_word']}, 错误: {e}")
- item['feature_node']['result_evaluation'] = None
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage6_with_evaluations.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段6完成")
- logger.info("=" * 60)
- return features_data
- def _evaluate_single_search_result(
- self,
- original_feature: str,
- feature_node: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 评估单个搜索结果(使用并行评估)
- Args:
- original_feature: 原始特征
- feature_node: 特征节点
- Returns:
- 评估结果
- """
- search_word = feature_node.get('search_word', '')
- notes = feature_node['search_result'].get('data', {}).get('data', [])
- return self.llm_evaluator.evaluate_search_results_parallel(
- original_feature=original_feature,
- search_word=search_word,
- notes=notes,
- max_notes=20,
- max_workers=20 # 20个并发评估每个帖子
- )
- # ========== 阶段7:扩展搜索 ==========
- def stage7_extended_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0
- ) -> List[Dict[str, Any]]:
- """
- 阶段7:基于评估结果扩展搜索(多个)
- Args:
- features_data: 阶段6的数据
- search_delay: 搜索延迟
- Returns:
- 带扩展搜索的数据
- """
- logger.info("=" * 60)
- logger.info("阶段7:扩展搜索")
- logger.info("=" * 60)
- # 收集需要扩展搜索的任务
- extension_tasks = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- result_eval = feature.get('result_evaluation')
- if not result_eval:
- continue
- extracted_elements = result_eval.get('extracted_elements', [])
- if not extracted_elements:
- continue
- # 为每个提取的元素创建扩展搜索
- base_search_word = feature.get('search_word', '')
- for element in extracted_elements:
- extended_keyword = f"{base_search_word} {element}"
- extension_tasks.append({
- 'extended_keyword': extended_keyword,
- 'original_feature': original_feature,
- 'feature_node': feature,
- 'element': element
- })
- logger.info(f"共 {len(extension_tasks)} 个扩展搜索任务")
- # 执行扩展搜索
- for idx, task in enumerate(extension_tasks, 1):
- extended_kw = task['extended_keyword']
- logger.info(f"[{idx}/{len(extension_tasks)}] 扩展搜索: {extended_kw}")
- try:
- result = self.search_client.search(
- keyword=extended_kw,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 评估扩展搜索结果
- logger.info(f" 评估扩展搜索结果...")
- evaluation = self.llm_evaluator.evaluate_search_results(
- original_feature=task['original_feature'],
- search_word=extended_kw,
- notes=result.get('data', {}).get('data', []),
- max_notes=20,
- max_images_per_note=2
- )
- # 存储扩展搜索结果
- feature_node = task['feature_node']
- if 'extended_searches' not in feature_node:
- feature_node['extended_searches'] = []
- feature_node['extended_searches'].append({
- 'extended_keyword': extended_kw,
- 'based_on_element': task['element'],
- 'search_result': result,
- 'search_metadata': {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count
- },
- 'result_evaluation': evaluation
- })
- logger.info(f" 评估完成,relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- # 延迟
- if idx < len(extension_tasks):
- time.sleep(search_delay)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage7_final_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段7完成")
- logger.info("=" * 60)
- return features_data
- # ========== 主流程 ==========
- def run_full_pipeline(self):
- """执行完整流程"""
- logger.info("\n" + "=" * 60)
- logger.info("开始执行完整流程")
- logger.info("=" * 60)
- try:
- # 阶段1
- stage1_results = self.stage1_filter_features()
- # 阶段2
- stage2_results = self.stage2_find_associations(stage1_results)
- # 阶段3 - 使用新方法:筛选高相似度匹配
- stage3_results = self.stage3_filter_high_similarity_matches(stage2_results)
- # 阶段4
- stage4_results = self.stage4_generate_and_evaluate_search_words(
- stage3_results,
- max_workers=8, # 提高并发从4到8
- max_combo_length=3 # 降低组合长度从4到3
- )
- # 阶段5
- stage5_results = self.stage5_execute_searches(stage4_results, search_delay=2.0, top_n=self.top_n)
- # 阶段6 - 暂时切断执行(代码保留)
- # stage6_results = self.stage6_evaluate_search_results(stage5_results)
- # 阶段7 - 暂时切断执行(代码保留)
- # final_results = self.stage7_extended_searches(stage6_results, search_delay=2.0)
- logger.info("\n" + "=" * 60)
- logger.info("✓ 完整流程执行完成(Stage1-5)")
- logger.info("=" * 60)
- # 自动执行可视化
- logger.info("\n" + "=" * 60)
- logger.info("开始生成可视化...")
- logger.info("=" * 60)
- try:
- result = subprocess.run(
- ['python3', 'visualize_stage5_results.py'],
- capture_output=True,
- text=True,
- timeout=60
- )
- if result.returncode == 0:
- logger.info("✓ 可视化生成成功")
- logger.info(result.stdout)
- else:
- logger.error(f"可视化生成失败: {result.stderr}")
- except subprocess.TimeoutExpired:
- logger.error("可视化生成超时")
- except Exception as e:
- logger.error(f"可视化生成异常: {e}")
- return stage5_results
- except Exception as e:
- logger.error(f"流程执行失败: {e}")
- raise
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='增强搜索系统V2')
- parser.add_argument(
- '--how-json',
- default='69114f150000000007001f30_how copy.json',
- help='How解构文件路径'
- )
- parser.add_argument(
- '--dimension-associations',
- default='dimension_associations_analysis.json',
- help='维度关联文件路径'
- )
- parser.add_argument(
- '--optimized-clustered',
- default='optimized_clustered_data_gemini-3-pro-preview.json',
- help='人设特征库路径'
- )
- parser.add_argument(
- '--api-key',
- default=None,
- help='OpenRouter API密钥(默认从环境变量读取)'
- )
- parser.add_argument(
- '--output-dir',
- default='output_v2',
- help='输出目录'
- )
- parser.add_argument(
- '--top-n',
- type=int,
- default=10,
- help='每个原始特征取评分最高的N个搜索词(默认10)'
- )
- parser.add_argument(
- '--max-total-searches',
- type=int,
- default=None,
- help='全局最大搜索次数限制(默认None不限制)'
- )
- parser.add_argument(
- '--search-workers',
- type=int,
- default=3,
- help='搜索并发数(默认3)'
- )
- args = parser.parse_args()
- # 创建系统实例
- system = EnhancedSearchV2(
- how_json_path=args.how_json,
- dimension_associations_path=args.dimension_associations,
- optimized_clustered_data_path=args.optimized_clustered,
- openrouter_api_key=args.api_key,
- output_dir=args.output_dir,
- top_n=args.top_n,
- max_total_searches=args.max_total_searches,
- search_max_workers=args.search_workers
- )
- # 执行完整流程
- system.run_full_pipeline()
- if __name__ == '__main__':
- main()
|