#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 增强搜索系统 V2 支持LLM评估和扩展搜索的完整流程 """ import json import logging import copy import time import os import argparse import subprocess from typing import Dict, List, Any, Optional, Set, Tuple from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from itertools import combinations from openrouter_client import OpenRouterClient from llm_evaluator import LLMEvaluator from xiaohongshu_search import XiaohongshuSearch # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler('enhanced_search_v2.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class EnhancedSearchV2: """增强搜索系统V2""" def __init__( self, how_json_path: str, dimension_associations_path: str, optimized_clustered_data_path: str, openrouter_api_key: Optional[str] = None, output_dir: str = "output_v2", top_n: int = 10, max_total_searches: Optional[int] = None, search_max_workers: int = 3 ): """ 初始化系统 Args: how_json_path: How解构文件路径 dimension_associations_path: 维度关联文件路径 optimized_clustered_data_path: 人设特征库路径 openrouter_api_key: OpenRouter API密钥 output_dir: 输出目录 top_n: 每个原始特征取评分最高的N个搜索词(默认10) max_total_searches: 全局最大搜索次数限制(默认None不限制) search_max_workers: 搜索并发数(默认3) """ self.how_json_path = how_json_path self.dimension_associations_path = dimension_associations_path self.optimized_clustered_data_path = optimized_clustered_data_path self.output_dir = output_dir self.top_n = top_n self.max_total_searches = max_total_searches self.search_max_workers = search_max_workers # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 加载数据 logger.info("加载数据文件...") self.how_data = self._load_json(how_json_path) self.dimension_associations = self._load_json(dimension_associations_path) self.optimized_clustered_data = self._load_json(optimized_clustered_data_path) # 初始化组件 logger.info("初始化组件...") self.openrouter_client = OpenRouterClient( api_key=openrouter_api_key, model="google/gemini-2.5-flash", retry_delay=5 # 增加重试延迟避免限流 ) self.llm_evaluator = LLMEvaluator(self.openrouter_client) self.search_client = XiaohongshuSearch() logger.info("系统初始化完成") def _load_json(self, file_path: str) -> Any: """加载JSON文件""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"加载文件失败 {file_path}: {e}") raise def _save_json(self, data: Any, file_path: str): """保存JSON文件""" try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"已保存: {file_path}") except Exception as e: logger.error(f"保存文件失败 {file_path}: {e}") raise # ========== 阶段1:筛选 0.5 <= 相似度 < 0.8 的特征 ========== def stage1_filter_features(self) -> List[Dict[str, Any]]: """ 阶段1:筛选中等匹配度特征 筛选条件:0.5 <= 最高相似度 < 0.8 Returns: 筛选后的特征列表 """ logger.info("=" * 60) logger.info("阶段1:筛选中等匹配度特征 (0.5 <= 相似度 < 0.8)") logger.info("=" * 60) results = [] how_result = self.how_data.get('how解构结果', {}) total_features = 0 filtered_out_low = 0 # < 0.5 filtered_out_high = 0 # >= 0.8 selected_count = 0 # 遍历三个维度 for level_name, level_list in how_result.items(): if not isinstance(level_list, list): continue logger.info(f"\n处理 {level_name}...") for item_idx, item in enumerate(level_list): item_name = item.get('名称', f'未命名-{item_idx}') how_steps = item.get('how步骤列表', []) for step in how_steps: features = step.get('特征列表', []) for feature in features: feature_name = feature.get('特征名称', '') match_results = feature.get('匹配结果', []) total_features += 1 if not match_results: continue # 找到最高相似度 max_similarity = max( (m.get('匹配结果', {}).get('相似度', 0) for m in match_results), default=0 ) # 筛选条件 if max_similarity < 0.5: filtered_out_low += 1 continue elif max_similarity >= 0.8: filtered_out_high += 1 continue # 0.5 <= max_similarity < 0.8,保留 # 按相似度降序排序,取前3个 sorted_matches = sorted( match_results, key=lambda x: x.get('匹配结果', {}).get('相似度', 0), reverse=True ) top3_matches = sorted_matches[:3] # 取前3个 # 构建top3匹配信息列表 top3_match_info = [] for match in top3_matches: feature_classification = match.get('特征分类', []) classification_path = self._build_classification_path(feature_classification) # 如果路径为空且是分类类型,搜索补全路径 if not classification_path and match.get('特征类型') == '分类': feature_name_to_search = match.get('人设特征名称', '') classification_path = self._search_classification_path(feature_name_to_search) is_classification = self._is_classification(match.get('人设特征名称', ''), classification_path) top3_match_info.append({ '人设特征名称': match.get('人设特征名称'), '人设特征层级': match.get('人设特征层级'), '特征类型': match.get('特征类型'), '特征分类': feature_classification, '相似度': match.get('匹配结果', {}).get('相似度', 0), '匹配说明': match.get('匹配结果', {}).get('说明', ''), '是分类': is_classification, '所属分类路径': classification_path }) result_item = { '原始特征名称': feature_name, '来源层级': level_name, '权重': feature.get('权重', 0), '所属点名称': item_name, '最高匹配信息': top3_match_info[0], # 保留第1个用于Stage2 'top3匹配信息': top3_match_info # 新增字段 } results.append(result_item) selected_count += 1 # 显示top3匹配信息 top3_names = [m['人设特征名称'] for m in top3_match_info] logger.info(f" ✓ {feature_name} → Top{len(top3_match_info)}: {', '.join(top3_names)}") # 统计信息 logger.info(f"\n" + "=" * 60) logger.info(f"阶段1完成") logger.info(f" 总特征数: {total_features}") logger.info(f" 过滤掉(<0.5): {filtered_out_low}") logger.info(f" 过滤掉(>=0.8): {filtered_out_high}") logger.info(f" 保留(0.5-0.8): {selected_count}") logger.info("=" * 60) # 保存结果 output_path = os.path.join(self.output_dir, "stage1_filtered_features.json") self._save_json(results, output_path) return results def _build_classification_path(self, feature_classification: List[str]) -> str: """ 构建分类路径 Args: feature_classification: 特征分类数组 Returns: 分类路径 """ if not feature_classification: return "" # 步骤1: 去掉中间元素的"实质"后缀 cleaned = [] for i, item in enumerate(feature_classification): if i == len(feature_classification) - 1: # 最后一个保留 cleaned.append(item) elif item.endswith("实质") and i != 0: # 中间的去掉"实质" cleaned.append(item[:-2]) else: cleaned.append(item) # 步骤2: 反转数组 reversed_list = list(reversed(cleaned)) # 步骤3: 拼接路径 path = "/".join(reversed_list) return path def _is_classification(self, persona_feature_name: str, classification_path: str) -> bool: """ 判断是分类还是特征 Args: persona_feature_name: 人设特征名称 classification_path: 分类路径 Returns: True: 是分类, False: 是特征 """ # 在optimized_clustered_data中查找 # 如果在特征列表中找到,就是特征 # 如果作为节点存在且有子节点,就是分类 # 导航到节点 node = self._navigate_to_node(classification_path) if not node: return False # 检查是否在特征列表中 features = node.get('特征列表', []) for f in features: if f.get('特征名称') == persona_feature_name: return False # 在特征列表中,是特征 # 检查是否作为子节点存在 if persona_feature_name in node: sub_node = node[persona_feature_name] if isinstance(sub_node, dict): return True # 是子节点,是分类 return False # 默认是特征 def _navigate_to_node(self, path: str) -> Optional[Dict[str, Any]]: """ 导航到指定路径的节点 Args: path: 路径,如 "实质/猫咪宠物" Returns: 节点,未找到返回None """ if not path: return None parts = path.split('/') first_part = parts[0] # 确定顶层key top_level_map = { '意图': '目的点', '要素': '目的点', '实质': None, '形式': None, '场景': None } top_keys = [] if first_part in top_level_map: mapped = top_level_map[first_part] if mapped: top_keys.append(mapped) if not top_keys: top_keys = ['灵感点列表', '关键点列表', '目的点'] # 尝试在每个顶层中查找 for top_key in top_keys: current = self.optimized_clustered_data.get(top_key) if not current: continue # 逐层导航 found = True for part in parts: if isinstance(current, dict) and part in current: current = current[part] else: found = False break if found and isinstance(current, dict): return current return None def _recursive_search( self, obj: Dict[str, Any], target_name: str, current_path: str = "" ) -> Optional[str]: """ 递归搜索分类节点 Args: obj: 当前搜索的对象 target_name: 目标分类名称 current_path: 当前路径 Returns: 找到的完整路径,未找到返回None """ if not isinstance(obj, dict): return None # 遍历所有键 for key in obj.keys(): # 跳过元数据和特征列表 if key in ['_meta', '特征列表']: continue # 检查是否匹配 if target_name in key or key in target_name: # 找到匹配,返回路径 if current_path: return f"{current_path}/{key}" else: return key # 递归搜索子节点 if isinstance(obj[key], dict): next_path = f"{current_path}/{key}" if current_path else key result = self._recursive_search(obj[key], target_name, next_path) if result: return result return None def _search_classification_path(self, classification_name: str) -> str: """ 在optimized_clustered_data中搜索分类节点路径 Args: classification_name: 分类名称,如"实体物品实质" Returns: 完整路径,如"实质/实体物品",未找到返回空字符串 """ if not classification_name: return "" # 清理名称:去掉常见后缀 clean_name = classification_name for suffix in ['实质', '意图', '形式', '要素']: if clean_name.endswith(suffix) and len(clean_name) > len(suffix): clean_name = clean_name[:-len(suffix)] break logger.info(f" 搜索分类: {classification_name} → 清理为: {clean_name}") # 在三个顶级列表中搜索 for top_key in ['灵感点列表', '关键点列表', '目的点']: top_data = self.optimized_clustered_data.get(top_key, {}) if not top_data: continue # 递归搜索 path = self._recursive_search(top_data, clean_name, "") if path: logger.info(f" ✓ 找到路径: {path}") return path logger.warning(f" ✗ 未找到分类路径: {classification_name}") return "" # ========== 阶段2:收集关联分类+标签+子分类 ========== def stage2_find_associations(self, filtered_features: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 阶段2:查找关联分类,收集分类名称、标签、子分类 改进: 为top3的每个base_word都查找关联 Args: filtered_features: 阶段1筛选的特征 Returns: 带关联信息的特征列表 """ logger.info("=" * 60) logger.info("阶段2:查找关联分类(为每个base_word)") logger.info("=" * 60) for idx, feature in enumerate(filtered_features, 1): logger.info(f"\n[{idx}/{len(filtered_features)}] 处理: {feature['原始特征名称']}") # 获取top3 base_words top3_info = feature.get('top3匹配信息', []) if not top3_info: logger.warning(f" 无top3匹配信息,跳过") feature['找到的关联_按base_word'] = {} continue logger.info(f" 找到 {len(top3_info)} 个base_word") # 为每个base_word查找关联 associations_by_base_word = {} for base_idx, base_info in enumerate(top3_info, 1): base_word = base_info.get('人设特征名称', '') is_classification = base_info['是分类'] classification_path = base_info['所属分类路径'] source_level = base_info['人设特征层级'] logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word}") if is_classification: search_path = classification_path logger.info(f" 匹配到分类: {search_path}") else: search_path = classification_path logger.info(f" 匹配到特征,使用所属分类: {search_path}") # 查找关联 associations = self._find_associations(search_path, source_level) # 收集关联信息 base_word_associations = [] for assoc in associations: target_path = assoc['目标分类'] # 收集分类信息 classification_info = self._collect_classification_info(target_path) if classification_info: base_word_associations.append({ '来源方向': assoc['来源方向'], '关联类型': assoc['关联类型'], '目标分类路径': target_path, '共同帖子数': assoc['共同帖子数'], 'Jaccard相似度': assoc['Jaccard相似度'], '分类名称': classification_info['classification_name'], '标签列表': classification_info['tags'], '子分类列表': classification_info['sub_classifications'] }) associations_by_base_word[base_word] = base_word_associations logger.info(f" 找到 {len(base_word_associations)} 个关联") # 保存结果 feature['找到的关联_按base_word'] = associations_by_base_word # 向后兼容:保留基于最高匹配信息的关联(即第1个base_word的关联) first_base_word = top3_info[0].get('人设特征名称', '') feature['找到的关联'] = associations_by_base_word.get(first_base_word, []) total_associations = sum(len(v) for v in associations_by_base_word.values()) logger.info(f" 总共找到 {total_associations} 个关联({len(associations_by_base_word)} 个base_word)") # 保存结果 output_path = os.path.join(self.output_dir, "stage2_associations.json") self._save_json(filtered_features, output_path) logger.info(f"\n" + "=" * 60) logger.info(f"阶段2完成") logger.info("=" * 60) return filtered_features def _find_associations(self, classification_path: str, source_level: str) -> List[Dict[str, Any]]: """ 查找关联节点 Args: classification_path: 分类路径 source_level: 源层级 Returns: 关联节点列表 """ associations = [] # 确定维度名称 if '灵感点' in source_level: dimension_key = '灵感点维度' elif '关键点' in source_level: dimension_key = '关键点维度' elif '目的点' in source_level: dimension_key = '目的点维度' else: return associations # 获取维度数据 single_dim = self.dimension_associations.get('单维度关联分析', {}) dimension_data = single_dim.get(dimension_key, {}) if not dimension_data: return associations # 遍历所有方向 for direction_key, direction_data in dimension_data.items(): if direction_key == '说明': continue # 查找源分类 if classification_path in direction_data: source_data = direction_data[classification_path] # 获取关联节点 for assoc_key in source_data.keys(): if assoc_key.startswith('与') and assoc_key.endswith('的关联'): assoc_list = source_data[assoc_key] for assoc_item in assoc_list: associations.append({ '来源方向': direction_key, '关联类型': assoc_key, '目标分类': assoc_item.get('目标分类'), '目标层级': assoc_item.get('目标层级'), '共同帖子数': assoc_item.get('共同帖子数'), 'Jaccard相似度': assoc_item.get('Jaccard相似度'), '共同帖子ID': assoc_item.get('共同帖子ID', []) }) return associations def _collect_classification_info(self, classification_path: str) -> Optional[Dict[str, Any]]: """ 收集分类信息:分类名 + 标签 + 子分类 Args: classification_path: 分类路径 Returns: 分类信息 """ node = self._navigate_to_node(classification_path) if not node: return None # 分类名称(路径最后一段) classification_name = classification_path.split('/')[-1] # 标签(特征列表) tags = [f.get('特征名称', '') for f in node.get('特征列表', [])] # 子分类(子节点,排除_meta和特征列表) sub_classifications = [ key for key in node.keys() if isinstance(node[key], dict) and key not in ['_meta', '特征列表'] ] return { 'classification_name': classification_name, 'tags': tags, 'sub_classifications': sub_classifications } # ========== 阶段3:筛选高相似度匹配(>0.8) ========== def stage3_filter_high_similarity_matches(self, associations_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 阶段3:筛选高相似度匹配(>0.8) 改进:为每个base_word独立筛选候选词 基于该base_word的关联范围,在how解构中找出相似度>0.8的匹配 Args: associations_data: 阶段2的关联数据 Returns: 带高相似度候选的数据 """ logger.info("=" * 60) logger.info("阶段3:筛选高相似度匹配(>0.8,为每个base_word)") logger.info("=" * 60) for idx, feature_result in enumerate(associations_data, 1): original_feature_name = feature_result['原始特征名称'] logger.info(f"\n[{idx}/{len(associations_data)}] 处理: {original_feature_name}") # 获取top3 base_words top3_info = feature_result.get('top3匹配信息', []) associations_by_base_word = feature_result.get('找到的关联_按base_word', {}) if not top3_info or not associations_by_base_word: logger.warning(f" 无top3匹配信息或关联数据,跳过") feature_result['高相似度候选_按base_word'] = {} continue logger.info(f" 找到 {len(top3_info)} 个base_word") # 为每个base_word独立筛选候选词 candidates_by_base_word = {} for base_idx, base_info in enumerate(top3_info, 1): base_word = base_info.get('人设特征名称', '') logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word}") # 步骤1: 收集该base_word的关联范围 base_word_associations = associations_by_base_word.get(base_word, []) base_word_scope = self._collect_scope_from_associations(base_word_associations) logger.info(f" 关联范围包含 {len(base_word_scope)} 个分类/标签") if not base_word_scope: logger.warning(f" 无关联范围,跳过") candidates_by_base_word[base_word] = [] continue # 步骤2: 遍历how解构,找出高相似度匹配 high_sim_candidates = [] total_checked = 0 high_sim_found = 0 how_result = self.how_data.get('how解构结果', {}) for level_name, level_list in how_result.items(): if not isinstance(level_list, list): continue for item in level_list: for step in item.get('how步骤列表', []): for feature in step.get('特征列表', []): matches = feature.get('匹配结果', []) total_checked += len(matches) # 筛选相似度>0.8且在该base_word的范围内的匹配 for match in matches: sim = match.get('匹配结果', {}).get('相似度', 0) persona_feature_name = match.get('人设特征名称', '') if sim > 0.8 and persona_feature_name in base_word_scope: high_sim_found += 1 high_sim_candidates.append({ '人设特征名称': persona_feature_name, '相似度': sim, '特征类型': match.get('特征类型', ''), '特征分类': match.get('特征分类', []), '人设特征层级': match.get('人设特征层级', ''), '来源路径': self._build_classification_path(match.get('特征分类', [])), '匹配说明': match.get('匹配结果', {}).get('说明', ''), '来源原始特征': feature.get('特征名称', '') }) logger.info(f" 检查了 {total_checked} 个匹配") logger.info(f" 找到 {high_sim_found} 个相似度>0.8的匹配") # 按相似度降序排序并去重 seen_names = set() unique_candidates = [] high_sim_candidates.sort(key=lambda x: x['相似度'], reverse=True) for candidate in high_sim_candidates: name = candidate['人设特征名称'] if name not in seen_names: seen_names.add(name) unique_candidates.append(candidate) candidates_by_base_word[base_word] = unique_candidates logger.info(f" 去重后筛选出 {len(unique_candidates)} 个候选") # 显示前5个 if unique_candidates: logger.info(f" Top 5:") for c in unique_candidates[:5]: logger.info(f" • {c['人设特征名称']} ({c['相似度']:.3f}) ← 来自\"{c['来源原始特征']}\"") # 保存结果 feature_result['高相似度候选_按base_word'] = candidates_by_base_word # 向后兼容:保留第1个base_word的候选 first_base_word = top3_info[0].get('人设特征名称', '') feature_result['高相似度候选'] = candidates_by_base_word.get(first_base_word, []) total_candidates = sum(len(v) for v in candidates_by_base_word.values()) logger.info(f" 总共筛选出 {total_candidates} 个候选({len(candidates_by_base_word)} 个base_word)") # 保存结果 output_path = os.path.join(self.output_dir, "stage3_high_similarity.json") self._save_json(associations_data, output_path) logger.info(f"\n" + "=" * 60) logger.info(f"阶段3完成") logger.info("=" * 60) return associations_data def _collect_scope_from_associations(self, associations: List[Dict[str, Any]]) -> Set[str]: """ 从关联列表中收集所有分类名和标签,形成范围集合 Args: associations: 关联列表 Returns: 包含所有分类名和标签的集合 """ scope = set() for assoc in associations: # 添加分类名 scope.add(assoc['分类名称']) # 添加所有标签 tags = assoc.get('标签列表', []) scope.update(tags) return scope def _collect_stage2_scope(self, feature_result: Dict[str, Any]) -> Set[str]: """ 收集Stage2找到的所有分类名和标签,形成范围集合(兼容旧方法) Args: feature_result: 特征结果数据 Returns: 包含所有分类名和标签的集合 """ associations = feature_result.get('找到的关联', []) return self._collect_scope_from_associations(associations) def _find_features_by_path(self, target_classification: str) -> List[Dict[str, Any]]: """ 根据路径查找特征列表 Args: target_classification: 目标分类路径 Returns: 特征列表 """ node = self._navigate_to_node(target_classification) if not node: return [] features = node.get('特征列表', []) # 深拷贝 return copy.deepcopy(features) # ========== 阶段4:多词组合 + LLM评估 ========== def stage4_generate_and_evaluate_search_words( self, features_data: List[Dict[str, Any]], max_workers: int = 4, max_candidates: int = 20, max_combo_length: int = 4 ) -> List[Dict[str, Any]]: """ 阶段4:多词组合 + LLM评估 基于Stage1的基础词和Stage3的高相似度候选, 生成所有2-N词组合,通过LLM评估选出Top10 Args: features_data: 阶段3的数据(包含高相似度候选) max_workers: 并发评估的原始特征数(默认4) max_candidates: 参与组合的最大候选词数(默认20) max_combo_length: 最大组合词数(默认4,即基础词+3个候选) Returns: 带LLM评估的数据 """ logger.info("=" * 60) logger.info("阶段4:多词组合 + LLM评估") logger.info(f" 最大候选词数: {max_candidates}") logger.info(f" 最大组合长度: {max_combo_length} 词") logger.info(f" 并发数: {max_workers} 个原始特征") logger.info("=" * 60) total_features = len(features_data) # 使用ThreadPoolExecutor并行处理不同的原始特征 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 futures = [] for idx, feature_result in enumerate(features_data, 1): future = executor.submit( self._process_single_feature_combinations, idx, total_features, feature_result, max_candidates, max_combo_length ) futures.append((future, feature_result)) # 等待所有任务完成并收集结果 for future, feature_result in futures: try: _ = future.result() # 等待完成,结果已经写回到feature_result中 except Exception as e: logger.error(f" 评估失败: {feature_result['原始特征名称']}, 错误: {e}") # 保存结果 output_path = os.path.join(self.output_dir, "stage4_combinations_evaluated.json") self._save_json(features_data, output_path) logger.info(f"\n" + "=" * 60) logger.info(f"阶段4完成") logger.info("=" * 60) return features_data def _process_single_feature_combinations( self, idx: int, total: int, feature_result: Dict[str, Any], max_candidates: int, max_combo_length: int ) -> None: """ 处理单个原始特征的组合生成和评估 改进: 每个base_word使用自己的候选词(而不是共享) Steps: 1. Get top3 base_words from Stage1's top3匹配信息 2. For each base_word: a. Get candidates from Stage3's 高相似度候选_按base_word b. Generate combinations c. LLM evaluation d. Select Top 10 3. Save grouped results Args: idx: 特征索引 total: 总特征数 feature_result: 特征结果数据 max_candidates: 参与组合的最大候选词数 max_combo_length: 最大组合词数 """ original_feature = feature_result['原始特征名称'] logger.info(f"\n[{idx}/{total}] 处理: {original_feature}") # 步骤1: 获取top3基础词 top3_info = feature_result.get('top3匹配信息', []) if not top3_info: logger.info(f" 无top3匹配信息,跳过") feature_result['组合评估结果_分组'] = [] return logger.info(f" 找到 {len(top3_info)} 个base_word") # 步骤2: 获取按base_word分组的候选词 candidates_by_base_word = feature_result.get('高相似度候选_按base_word', {}) if not candidates_by_base_word: logger.warning(f" 无按base_word分组的候选词,跳过") feature_result['组合评估结果_分组'] = [] return # 步骤3: 为每个base_word独立处理 grouped_results = [] for base_idx, base_info in enumerate(top3_info, 1): base_word = base_info.get('人设特征名称', '') base_similarity = base_info.get('相似度', 0) if not base_word: continue logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word} (相似度: {base_similarity:.3f})") # 获取该base_word的候选词 base_candidates = candidates_by_base_word.get(base_word, []) candidates = base_candidates[:max_candidates] candidate_words = [c['人设特征名称'] for c in candidates] if not candidate_words: logger.warning(f" 该base_word无候选词,跳过") grouped_results.append({ 'base_word': base_word, 'base_word_similarity': base_similarity, 'base_word_info': base_info, 'top10_searches': [], 'available_words': [] }) continue logger.info(f" 候选词数量: {len(candidate_words)} (限制: {max_candidates})") # 生成组合 combinations_for_base = [] for length in range(1, min(max_combo_length, len(candidate_words) + 1)): for combo in combinations(candidate_words, length): search_phrase = base_word + ' ' + ' '.join(combo) combinations_for_base.append({ 'search_word': search_phrase, 'base_word': base_word, 'candidate_words': list(combo), 'combo_length': length + 1 }) logger.info(f" 生成 {len(combinations_for_base)} 个组合") # LLM评估 logger.info(f" 开始LLM评估...") evaluated = self.llm_evaluator.evaluate_search_words_in_batches( original_feature=original_feature, search_words=[c['search_word'] for c in combinations_for_base], batch_size=50 ) # 选出Top 10 top_10 = evaluated[:10] max_score = top_10[0]['score'] if top_10 else 0.0 logger.info(f" 评估完成,Top 10 最高分: {max_score:.3f}") # 保存分组结果 - 每个base_word有自己的available_words grouped_results.append({ 'base_word': base_word, 'base_word_similarity': base_similarity, 'base_word_info': base_info, 'top10_searches': top_10, 'available_words': candidate_words # 该base_word自己的候选词 }) # 写回结果 feature_result['组合评估结果_分组'] = grouped_results total_searches = sum(len(g['top10_searches']) for g in grouped_results) logger.info(f" 完成!共 {len(grouped_results)} 个base_word,{total_searches} 个搜索词") # ========== 阶段5:执行搜索 ========== def _execute_single_search( self, idx: int, total: int, search_word: str, feature_ref: Dict[str, Any] ) -> Dict[str, Any]: """ 执行单个搜索任务(用于并发执行) Args: idx: 搜索索引 total: 总搜索数 search_word: 搜索词 feature_ref: 特征引用(用于写入结果) Returns: 搜索结果信息 """ logger.info(f"[{idx}/{total}] 搜索: {search_word}") try: result = self.search_client.search( keyword=search_word, content_type='不限', sort_type='综合', max_retries=3, use_cache=True # 启用搜索缓存 ) note_count = len(result.get('data', {}).get('data', [])) logger.info(f" ✓ 成功,获取 {note_count} 条帖子") # 写入结果 feature_ref['search_result'] = result feature_ref['search_metadata'] = { 'searched_at': datetime.now().isoformat(), 'status': 'success', 'note_count': note_count, 'search_params': { 'keyword': search_word, 'content_type': '图文', 'sort_type': '综合' } } return {'status': 'success', 'search_word': search_word, 'note_count': note_count} except Exception as e: logger.error(f" ✗ 失败: {e}") feature_ref['search_result'] = None feature_ref['search_metadata'] = { 'searched_at': datetime.now().isoformat(), 'status': 'failed', 'note_count': 0, 'error': str(e) } return {'status': 'failed', 'search_word': search_word, 'error': str(e)} def stage5_execute_searches( self, features_data: List[Dict[str, Any]], search_delay: float = 2.0, top_n: int = 10 ) -> List[Dict[str, Any]]: """ 阶段5:执行小红书搜索 Args: features_data: 阶段4的数据 search_delay: 搜索延迟 top_n: 每个原始特征取评分最高的N个搜索词 Returns: 带搜索结果的数据 """ logger.info("=" * 60) logger.info("阶段5:执行小红书搜索") logger.info("=" * 60) # 按原始特征分组收集搜索词(从Stage4的组合评估结果_分组读取) feature_search_groups = {} for feature_result in features_data: original_feature = feature_result['原始特征名称'] if original_feature not in feature_search_groups: feature_search_groups[original_feature] = [] # 从Stage4的组合评估结果_分组读取(新结构) grouped_results = feature_result.get('组合评估结果_分组', []) if grouped_results: # 使用分组结构:每个base_word的top10都执行 for group in grouped_results: base_word = group.get('base_word', '') base_similarity = group.get('base_word_similarity', 0) for eval_item in group.get('top10_searches', []): sw = eval_item.get('search_word') if not sw: continue score = eval_item.get('score', 0.0) feature_search_groups[original_feature].append({ 'search_word': sw, 'score': score, 'base_word': base_word, 'base_word_similarity': base_similarity, 'feature_ref': eval_item # 引用评估项,用于写入搜索结果 }) else: # 兼容旧结构(组合评估结果) for eval_item in feature_result.get('组合评估结果', []): sw = eval_item.get('search_word') if not sw: continue score = eval_item.get('score', 0.0) feature_search_groups[original_feature].append({ 'search_word': sw, 'score': score, 'feature_ref': eval_item }) # 收集所有搜索任务(分组结构下执行所有base_word的top10,不再过滤) all_searches = [] total_count = 0 for original_feature, search_list in feature_search_groups.items(): total_count += len(search_list) all_searches.extend(search_list) logger.info(f" {original_feature}: {len(search_list)} 个搜索词") # 应用全局搜索次数限制 if self.max_total_searches and len(all_searches) > self.max_total_searches: logger.info(f" 应用全局限制:从 {len(all_searches)} 个减少到 {self.max_total_searches} 个") all_searches = all_searches[:self.max_total_searches] logger.info(f"\n共 {len(all_searches)} 个搜索任务") logger.info(f" 并发执行搜索(并发数: {self.search_max_workers})") # 使用ThreadPoolExecutor并发执行搜索 with ThreadPoolExecutor(max_workers=self.search_max_workers) as executor: # 提交所有搜索任务 futures = [] for idx, item in enumerate(all_searches, 1): future = executor.submit( self._execute_single_search, idx, len(all_searches), item['search_word'], item['feature_ref'] ) futures.append(future) # 等待所有搜索完成 for future in as_completed(futures): try: result = future.result() # 结果已经写入feature_ref,无需额外处理 except Exception as e: logger.error(f" 搜索任务失败: {e}") # 保存结果 output_path = os.path.join(self.output_dir, "stage5_with_search_results.json") self._save_json(features_data, output_path) logger.info(f"\n" + "=" * 60) logger.info(f"阶段5完成") logger.info("=" * 60) return features_data # ========== 阶段6:LLM评估搜索结果 ========== def stage6_evaluate_search_results( self, features_data: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """ 阶段6:用LLM评估搜索结果(多模态) Args: features_data: 阶段5的数据 Returns: 带结果评估的数据 """ logger.info("=" * 60) logger.info("阶段6:LLM评估搜索结果") logger.info("=" * 60) # 收集所有需要评估的特征节点 features_to_evaluate = [] for feature_result in features_data: original_feature = feature_result['原始特征名称'] for assoc in feature_result.get('找到的关联', []): for feature in assoc.get('特征列表', []): if feature.get('search_result') and feature['search_metadata']['status'] == 'success': features_to_evaluate.append({ 'original_feature': original_feature, 'feature_node': feature }) logger.info(f"共 {len(features_to_evaluate)} 个搜索结果需要评估") # 并行评估(并发数较低) with ThreadPoolExecutor(max_workers=8) as executor: futures = [] for item in features_to_evaluate: future = executor.submit( self._evaluate_single_search_result, item['original_feature'], item['feature_node'] ) futures.append((future, item)) # 收集结果 for idx, (future, item) in enumerate(futures, 1): try: evaluation = future.result() item['feature_node']['result_evaluation'] = evaluation logger.info(f" [{idx}/{len(futures)}] {item['feature_node']['search_word']}: " f"relevance={evaluation['overall_relevance']:.3f}") except Exception as e: logger.error(f" 评估失败: {item['feature_node']['search_word']}, 错误: {e}") item['feature_node']['result_evaluation'] = None # 保存结果 output_path = os.path.join(self.output_dir, "stage6_with_evaluations.json") self._save_json(features_data, output_path) logger.info(f"\n" + "=" * 60) logger.info(f"阶段6完成") logger.info("=" * 60) return features_data def _evaluate_single_search_result( self, original_feature: str, feature_node: Dict[str, Any] ) -> Dict[str, Any]: """ 评估单个搜索结果(使用并行评估) Args: original_feature: 原始特征 feature_node: 特征节点 Returns: 评估结果 """ search_word = feature_node.get('search_word', '') notes = feature_node['search_result'].get('data', {}).get('data', []) return self.llm_evaluator.evaluate_search_results_parallel( original_feature=original_feature, search_word=search_word, notes=notes, max_notes=20, max_workers=20 # 20个并发评估每个帖子 ) # ========== 阶段7:扩展搜索 ========== def stage7_extended_searches( self, features_data: List[Dict[str, Any]], search_delay: float = 2.0 ) -> List[Dict[str, Any]]: """ 阶段7:基于评估结果扩展搜索(多个) Args: features_data: 阶段6的数据 search_delay: 搜索延迟 Returns: 带扩展搜索的数据 """ logger.info("=" * 60) logger.info("阶段7:扩展搜索") logger.info("=" * 60) # 收集需要扩展搜索的任务 extension_tasks = [] for feature_result in features_data: original_feature = feature_result['原始特征名称'] for assoc in feature_result.get('找到的关联', []): for feature in assoc.get('特征列表', []): result_eval = feature.get('result_evaluation') if not result_eval: continue extracted_elements = result_eval.get('extracted_elements', []) if not extracted_elements: continue # 为每个提取的元素创建扩展搜索 base_search_word = feature.get('search_word', '') for element in extracted_elements: extended_keyword = f"{base_search_word} {element}" extension_tasks.append({ 'extended_keyword': extended_keyword, 'original_feature': original_feature, 'feature_node': feature, 'element': element }) logger.info(f"共 {len(extension_tasks)} 个扩展搜索任务") # 执行扩展搜索 for idx, task in enumerate(extension_tasks, 1): extended_kw = task['extended_keyword'] logger.info(f"[{idx}/{len(extension_tasks)}] 扩展搜索: {extended_kw}") try: result = self.search_client.search( keyword=extended_kw, content_type='不限', sort_type='综合', max_retries=3, use_cache=True # 启用搜索缓存 ) note_count = len(result.get('data', {}).get('data', [])) logger.info(f" ✓ 成功,获取 {note_count} 条帖子") # 评估扩展搜索结果 logger.info(f" 评估扩展搜索结果...") evaluation = self.llm_evaluator.evaluate_search_results( original_feature=task['original_feature'], search_word=extended_kw, notes=result.get('data', {}).get('data', []), max_notes=20, max_images_per_note=2 ) # 存储扩展搜索结果 feature_node = task['feature_node'] if 'extended_searches' not in feature_node: feature_node['extended_searches'] = [] feature_node['extended_searches'].append({ 'extended_keyword': extended_kw, 'based_on_element': task['element'], 'search_result': result, 'search_metadata': { 'searched_at': datetime.now().isoformat(), 'status': 'success', 'note_count': note_count }, 'result_evaluation': evaluation }) logger.info(f" 评估完成,relevance={evaluation['overall_relevance']:.3f}") except Exception as e: logger.error(f" ✗ 失败: {e}") # 延迟 if idx < len(extension_tasks): time.sleep(search_delay) # 保存结果 output_path = os.path.join(self.output_dir, "stage7_final_results.json") self._save_json(features_data, output_path) logger.info(f"\n" + "=" * 60) logger.info(f"阶段7完成") logger.info("=" * 60) return features_data # ========== 主流程 ========== def run_full_pipeline(self): """执行完整流程""" logger.info("\n" + "=" * 60) logger.info("开始执行完整流程") logger.info("=" * 60) try: # 阶段1 stage1_results = self.stage1_filter_features() # 阶段2 stage2_results = self.stage2_find_associations(stage1_results) # 阶段3 - 使用新方法:筛选高相似度匹配 stage3_results = self.stage3_filter_high_similarity_matches(stage2_results) # 阶段4 stage4_results = self.stage4_generate_and_evaluate_search_words( stage3_results, max_workers=8, # 提高并发从4到8 max_combo_length=3 # 降低组合长度从4到3 ) # 阶段5 stage5_results = self.stage5_execute_searches(stage4_results, search_delay=2.0, top_n=self.top_n) # 阶段6 - 暂时切断执行(代码保留) # stage6_results = self.stage6_evaluate_search_results(stage5_results) # 阶段7 - 暂时切断执行(代码保留) # final_results = self.stage7_extended_searches(stage6_results, search_delay=2.0) logger.info("\n" + "=" * 60) logger.info("✓ 完整流程执行完成(Stage1-5)") logger.info("=" * 60) # 自动执行可视化 logger.info("\n" + "=" * 60) logger.info("开始生成可视化...") logger.info("=" * 60) try: result = subprocess.run( ['python3', 'visualize_stage5_results.py'], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: logger.info("✓ 可视化生成成功") logger.info(result.stdout) else: logger.error(f"可视化生成失败: {result.stderr}") except subprocess.TimeoutExpired: logger.error("可视化生成超时") except Exception as e: logger.error(f"可视化生成异常: {e}") return stage5_results except Exception as e: logger.error(f"流程执行失败: {e}") raise def main(): """主函数""" parser = argparse.ArgumentParser(description='增强搜索系统V2') parser.add_argument( '--how-json', default='69114f150000000007001f30_how copy.json', help='How解构文件路径' ) parser.add_argument( '--dimension-associations', default='dimension_associations_analysis.json', help='维度关联文件路径' ) parser.add_argument( '--optimized-clustered', default='optimized_clustered_data_gemini-3-pro-preview.json', help='人设特征库路径' ) parser.add_argument( '--api-key', default=None, help='OpenRouter API密钥(默认从环境变量读取)' ) parser.add_argument( '--output-dir', default='output_v2', help='输出目录' ) parser.add_argument( '--top-n', type=int, default=10, help='每个原始特征取评分最高的N个搜索词(默认10)' ) parser.add_argument( '--max-total-searches', type=int, default=None, help='全局最大搜索次数限制(默认None不限制)' ) parser.add_argument( '--search-workers', type=int, default=3, help='搜索并发数(默认3)' ) args = parser.parse_args() # 创建系统实例 system = EnhancedSearchV2( how_json_path=args.how_json, dimension_associations_path=args.dimension_associations, optimized_clustered_data_path=args.optimized_clustered, openrouter_api_key=args.api_key, output_dir=args.output_dir, top_n=args.top_n, max_total_searches=args.max_total_searches, search_max_workers=args.search_workers ) # 执行完整流程 system.run_full_pipeline() if __name__ == '__main__': main()