| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 增强搜索系统 V2
- 支持LLM评估和扩展搜索的完整流程
- """
- import json
- import logging
- import time
- import os
- import argparse
- import subprocess
- from typing import Dict, List, Any, Optional
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from openrouter_client import OpenRouterClient
- from llm_evaluator import LLMEvaluator
- from xiaohongshu_search import XiaohongshuSearch
- from stage7_analyzer import Stage7DeconstructionAnalyzer
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- logging.FileHandler('enhanced_search_v2.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class EnhancedSearchV2:
- """增强搜索系统V2"""
- def __init__(
- self,
- how_json_path: str,
- openrouter_api_key: Optional[str] = None,
- output_dir: str = "output_v2",
- top_n: int = 10,
- max_total_searches: Optional[int] = None,
- search_max_workers: int = 3,
- max_searches_per_feature: Optional[int] = None,
- max_searches_per_base_word: Optional[int] = None,
- enable_stage5: bool = True,
- stage5_max_workers: int = 10,
- stage5_max_notes: int = 20,
- enable_stage6: bool = False,
- stage6_only: bool = False,
- stage6_max_workers: int = 5,
- stage6_max_notes: Optional[int] = None,
- stage6_skip: int = 0,
- stage6_sort_by: str = 'score',
- stage6_api_url: str = "http://192.168.245.150:7000/what/analysis/single",
- stage6_min_score: float = 0.8
- ):
- """
- 初始化系统
- Args:
- how_json_path: How解构文件路径
- openrouter_api_key: OpenRouter API密钥
- output_dir: 输出目录
- top_n: 每个原始特征取评分最高的N个搜索词(默认10)
- max_total_searches: 全局最大搜索次数限制(默认None不限制)
- search_max_workers: 搜索并发数(默认3)
- max_searches_per_feature: 每个原始特征的最大搜索次数(默认None不限制)
- max_searches_per_base_word: 每个base_word的最大搜索次数(默认None不限制)
- enable_stage5: 是否启用Stage 5评估(默认False)
- stage5_max_workers: Stage 5并发评估数(默认10)
- stage5_max_notes: 每个搜索结果评估的最大帖子数(默认20)
- enable_stage6: 是否启用Stage 6深度解构(默认False)
- stage6_only: 只运行Stage 6(从Stage 5结果开始,默认False)
- stage6_max_workers: Stage 6并发数(默认5)
- stage6_max_notes: Stage 6最多处理多少个帖子(默认None不限制)
- stage6_skip: Stage 6跳过前N个帖子(默认0)
- stage6_sort_by: Stage 6排序方式:score/time/engagement(默认score)
- stage6_api_url: Stage 6解构API地址
- stage6_min_score: Stage 6处理的最低分数阈值(默认0.8,0-1分制)
- """
- self.how_json_path = how_json_path
- self.output_dir = output_dir
- self.top_n = top_n
- self.max_total_searches = max_total_searches
- self.search_max_workers = search_max_workers
- self.max_searches_per_feature = max_searches_per_feature
- self.max_searches_per_base_word = max_searches_per_base_word
- self.enable_stage5 = enable_stage5
- self.stage5_max_workers = stage5_max_workers
- self.stage5_max_notes = stage5_max_notes
- self.enable_stage6 = enable_stage6
- self.stage6_only = stage6_only
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- # 加载数据
- logger.info("加载数据文件...")
- self.how_data = self._load_json(how_json_path)
- logger.info(" ✓ 已加载 how.json")
- # 初始化组件
- logger.info("初始化组件...")
- self.openrouter_client = OpenRouterClient(
- api_key=openrouter_api_key,
- model="google/gemini-2.5-flash",
- retry_delay=5 # 增加重试延迟避免限流
- )
- self.llm_evaluator = LLMEvaluator(self.openrouter_client)
- self.search_client = XiaohongshuSearch()
- # 初始化 Stage 6 分析器(深度解构)
- self.stage6_analyzer = Stage7DeconstructionAnalyzer(
- api_url=stage6_api_url,
- max_workers=stage6_max_workers,
- max_notes=stage6_max_notes,
- min_score=stage6_min_score,
- skip_count=stage6_skip,
- sort_by=stage6_sort_by,
- output_dir=output_dir,
- enable_image_download=False, # 直接使用原始图片URL,不做代理
- image_server_url="http://localhost:8765", # 图片服务器URL(已弃用)
- image_download_dir="downloaded_images" # 图片下载目录(已弃用)
- )
- logger.info("系统初始化完成")
- def _load_json(self, file_path: str) -> Any:
- """加载JSON文件"""
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"加载文件失败 {file_path}: {e}")
- raise
- def _save_json(self, data: Any, file_path: str):
- """保存JSON文件"""
- try:
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- logger.info(f"已保存: {file_path}")
- except Exception as e:
- logger.error(f"保存文件失败 {file_path}: {e}")
- raise
- # ========== 阶段1:筛选 0.5 <= 相似度 < 0.8 的特征 ==========
- def stage1_filter_features(self) -> List[Dict[str, Any]]:
- """
- 阶段1:筛选中等匹配度特征
- 筛选条件:0.5 <= 最高相似度 < 0.8
- Returns:
- 筛选后的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段1:筛选中等匹配度特征 (0.5 <= 相似度 < 0.8)")
- logger.info("=" * 60)
- results = []
- how_result = self.how_data.get('解构结果', {})
- total_features = 0
- filtered_out_low = 0 # < 0.5
- filtered_out_high = 0 # >= 0.8
- selected_count = 0
- # 遍历三个维度
- for level_name, level_list in how_result.items():
- if not isinstance(level_list, list):
- continue
- logger.info(f"\n处理 {level_name}...")
- for item_idx, item in enumerate(level_list):
- item_name = item.get('名称', f'未命名-{item_idx}')
- # 新格式:直接读取点层级的匹配人设结果
- match_results = item.get('匹配人设结果', [])
- total_features += 1
- if not match_results:
- logger.info(f" ✗ {item_name}: 无匹配结果")
- continue
- # 找到最高相似度(新格式:相似度是直接字段)
- max_similarity = max(
- (m.get('相似度', 0) for m in match_results),
- default=0
- )
- # 筛选条件
- if max_similarity < 0.5:
- filtered_out_low += 1
- logger.info(f" ✗ {item_name}: 最高相似度 {max_similarity:.3f} < 0.5(过滤)")
- continue
- elif max_similarity >= 0.8:
- filtered_out_high += 1
- logger.info(f" ✗ {item_name}: 最高相似度 {max_similarity:.3f} >= 0.8(过滤)")
- continue
- # 0.5 <= max_similarity < 0.8,保留
- # 按相似度降序排序,取前3个
- sorted_matches = sorted(
- match_results,
- key=lambda x: x.get('相似度', 0),
- reverse=True
- )
- top3_matches = sorted_matches[:3] # 取前3个
- # 构建top3匹配信息列表
- top3_match_info = []
- for match in top3_matches:
- feature_classification = match.get('特征分类', [])
- classification_path = self._build_classification_path(feature_classification)
- # 直接从匹配结果读取特征类型
- is_classification = (match.get('特征类型') == '分类')
- top3_match_info.append({
- '人设特征名称': match.get('人设特征名称'),
- '人设特征层级': match.get('人设特征层级'),
- '特征类型': match.get('特征类型'),
- '特征分类': feature_classification,
- '相似度': match.get('相似度', 0), # 直接字段
- '匹配说明': match.get('说明', ''), # 直接字段
- '是分类': is_classification,
- '所属分类路径': classification_path
- })
- result_item = {
- '原始特征名称': item_name, # 使用点的名称作为特征名
- '来源层级': level_name,
- '权重': 1.0, # 新格式没有权重字段,默认1.0
- '所属点名称': item_name,
- '最高匹配信息': top3_match_info[0], # 保留第1个用于Stage2
- 'top3匹配信息': top3_match_info # 新增字段
- }
- results.append(result_item)
- selected_count += 1
- # 显示top3匹配信息
- top3_names = [m['人设特征名称'] for m in top3_match_info]
- logger.info(f" ✓ {item_name} → Top{len(top3_match_info)}: {', '.join(top3_names)}")
- # 统计信息
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段1完成")
- logger.info(f" 总特征数: {total_features}")
- logger.info(f" 过滤掉(<0.5): {filtered_out_low}")
- logger.info(f" 过滤掉(>=0.8): {filtered_out_high}")
- logger.info(f" 保留(0.5-0.8): {selected_count}")
- logger.info("=" * 60)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage1_filtered_features.json")
- self._save_json(results, output_path)
- return results
- def _build_classification_path(self, feature_classification: List[str]) -> str:
- """
- 构建分类路径
- Args:
- feature_classification: 特征分类数组
- Returns:
- 分类路径
- """
- if not feature_classification:
- return ""
- # 步骤1: 去掉中间元素的"实质"后缀
- cleaned = []
- for i, item in enumerate(feature_classification):
- if i == len(feature_classification) - 1: # 最后一个保留
- cleaned.append(item)
- elif item.endswith("实质") and i != 0: # 中间的去掉"实质"
- cleaned.append(item[:-2])
- else:
- cleaned.append(item)
- # 步骤2: 反转数组
- reversed_list = list(reversed(cleaned))
- # 步骤3: 拼接路径
- path = "/".join(reversed_list)
- return path
- # ========== 阶段2:从how文件提取高相似度候选词 ==========
- def stage2_extract_candidates(self, filtered_features: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 阶段2:从how文件提取相似度>=0.8的候选词
- 处理流程:
- 1. 遍历 how_data['解构结果'] 所有特征的匹配结果
- 2. 筛选 相似度 >= 0.8 的人设特征名称
- 3. 去重(按最高相似度保留)
- 4. 按相似度降序排序
- 5. 为每个中心词分配候选词列表
- 6. 构造 '高相似度候选_按base_word' 结构
- Args:
- filtered_features: Stage 1筛选的特征列表
- Returns:
- 带高相似度候选的特征列表
- """
- logger.info("=" * 60)
- logger.info("阶段2:从how文件提取高相似度候选词")
- logger.info("=" * 60)
- # Step 1: 从整个how文件提取候选词
- candidates_dict = {} # {人设特征名称: {候选词信息}}
- how_result = self.how_data.get('解构结果', {})
- # 遍历三个维度
- for dimension in ['灵感点列表', '关键点列表', '目的点列表']:
- items_list = how_result.get(dimension, [])
- for item in items_list:
- item_name = item.get('名称', '')
- # 新格式:直接读取点层级的匹配人设结果
- matches = item.get('匹配人设结果', [])
- for match in matches:
- # 新格式:相似度是直接字段
- similarity = match.get('相似度', 0)
- persona_feature_name = match.get('人设特征名称', '')
- # 筛选相似度 >= 0.8
- if similarity >= 0.8 and persona_feature_name:
- # 去重逻辑:保留最高相似度
- if persona_feature_name not in candidates_dict or \
- similarity > candidates_dict[persona_feature_name]['相似度']:
- candidates_dict[persona_feature_name] = {
- '人设特征名称': persona_feature_name,
- '相似度': similarity,
- '特征类型': match.get('特征类型', ''),
- '特征分类': match.get('特征分类', []),
- '人设特征层级': match.get('人设特征层级', ''),
- '来源路径': self._build_classification_path(match.get('特征分类', [])),
- '匹配说明': match.get('说明', ''), # 直接字段
- '来源原始特征': item_name # 使用点的名称
- }
- # Step 2: 转为列表并按相似度降序排序
- global_candidates = sorted(
- candidates_dict.values(),
- key=lambda x: x['相似度'],
- reverse=True
- )
- logger.info(f"从how文件提取到 {len(global_candidates)} 个唯一的高相似度候选词")
- # 显示Top 10候选词
- if global_candidates:
- logger.info("Top 10 候选词:")
- for i, candidate in enumerate(global_candidates[:10], 1):
- logger.info(f" {i}. {candidate['人设特征名称']} (相似度: {candidate['相似度']:.3f})")
- # Step 3: 为每个特征构造输出结构
- results = []
- for idx, feature_data in enumerate(filtered_features, 1):
- original_feature_name = feature_data.get('原始特征名称', '')
- logger.info(f"\n[{idx}/{len(filtered_features)}] 处理: {original_feature_name}")
- top3_matches = feature_data.get('top3匹配信息', [])
- # 提取3个中心词
- base_words = [match.get('人设特征名称', '') for match in top3_matches[:3]]
- logger.info(f" 中心词: {', '.join(base_words)}")
- # 所有中心词共享相同的候选词列表
- high_similarity_by_base = {}
- for base_word in base_words:
- if base_word:
- high_similarity_by_base[base_word] = global_candidates.copy()
- logger.info(f" 每个中心词分配 {len(global_candidates)} 个候选词")
- result = {
- '原始特征名称': original_feature_name,
- '来源层级': feature_data.get('来源层级', ''), # 保留元数据
- '权重': feature_data.get('权重', 0), # 保留元数据
- 'top3匹配信息': top3_matches,
- '找到的关联_按base_word': {}, # 新方式不需要关联分析
- '高相似度候选_按base_word': high_similarity_by_base
- }
- results.append(result)
- # 保存结果
- output_path = os.path.join(self.output_dir, 'stage2_candidates.json')
- self._save_json(results, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段2完成")
- logger.info(f" 提取候选词: {len(global_candidates)} 个")
- logger.info(f" 处理特征: {len(results)} 个")
- logger.info("=" * 60)
- return results
- # ========== 阶段4:多词组合 + LLM评估 ==========
- def stage4_generate_and_evaluate_search_words(
- self,
- features_data: List[Dict[str, Any]],
- max_workers: int = 4,
- max_candidates: int = 20,
- max_combo_length: int = 4
- ) -> List[Dict[str, Any]]:
- """
- 阶段4:多词组合 + LLM评估
- 基于Stage1的基础词和Stage3的高相似度候选,
- 生成所有2-N词组合,通过LLM评估选出Top10
- Args:
- features_data: 阶段3的数据(包含高相似度候选)
- max_workers: 并发评估的原始特征数(默认4)
- max_candidates: 参与组合的最大候选词数(默认20)
- max_combo_length: 最大组合词数(默认4,即基础词+3个候选)
- Returns:
- 带LLM评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段4:多词组合 + LLM评估")
- logger.info(f" 最大候选词数: {max_candidates}")
- logger.info(f" 最大组合长度: {max_combo_length} 词")
- logger.info(f" 并发数: {max_workers} 个原始特征")
- logger.info("=" * 60)
- total_features = len(features_data)
- # 使用ThreadPoolExecutor并行处理不同的原始特征
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有任务
- futures = []
- for idx, feature_result in enumerate(features_data, 1):
- future = executor.submit(
- self._process_single_feature_combinations,
- idx,
- total_features,
- feature_result,
- max_candidates,
- max_combo_length
- )
- futures.append((future, feature_result))
- # 等待所有任务完成并收集结果
- for future, feature_result in futures:
- try:
- _ = future.result() # 等待完成,结果已经写回到feature_result中
- except Exception as e:
- logger.error(f" 评估失败: {feature_result['原始特征名称']}, 错误: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage4_combinations_evaluated.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段4完成")
- logger.info("=" * 60)
- return features_data
- def _process_single_feature_combinations(
- self,
- idx: int,
- total: int,
- feature_result: Dict[str, Any],
- max_candidates: int,
- max_combo_length: int
- ) -> None:
- """
- 处理单个原始特征的组合生成和评估
- 改进: 每个base_word使用自己的候选词(而不是共享)
- Steps:
- 1. Get top3 base_words from Stage1's top3匹配信息
- 2. For each base_word:
- a. Get candidates from Stage3's 高相似度候选_按base_word
- b. Generate combinations
- c. LLM evaluation
- d. Select Top 10
- 3. Save grouped results
- Args:
- idx: 特征索引
- total: 总特征数
- feature_result: 特征结果数据
- max_candidates: 参与组合的最大候选词数
- max_combo_length: 最大组合词数
- """
- original_feature = feature_result['原始特征名称']
- logger.info(f"\n[{idx}/{total}] 处理: {original_feature}")
- # 步骤1: 获取top3基础词
- top3_info = feature_result.get('top3匹配信息', [])
- if not top3_info:
- logger.info(f" 无top3匹配信息,跳过")
- feature_result['组合评估结果_分组'] = []
- return
- logger.info(f" 找到 {len(top3_info)} 个base_word")
- # 步骤2: 获取按base_word分组的候选词
- candidates_by_base_word = feature_result.get('高相似度候选_按base_word', {})
- if not candidates_by_base_word:
- logger.warning(f" 无按base_word分组的候选词,跳过")
- feature_result['组合评估结果_分组'] = []
- return
- # 步骤3: 为每个base_word独立处理
- grouped_results = []
- for base_idx, base_info in enumerate(top3_info, 1):
- base_word = base_info.get('人设特征名称', '')
- base_similarity = base_info.get('相似度', 0)
- if not base_word:
- continue
- logger.info(f" [{base_idx}/{len(top3_info)}] Base Word: {base_word} (相似度: {base_similarity:.3f})")
- # 获取该base_word的候选词
- base_candidates = candidates_by_base_word.get(base_word, [])
- candidates = base_candidates[:max_candidates]
- candidate_words = [c['人设特征名称'] for c in candidates]
- if not candidate_words:
- logger.warning(f" 该base_word无候选词,跳过")
- grouped_results.append({
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'base_word_info': base_info,
- 'top10_searches': [],
- 'available_words': []
- })
- continue
- logger.info(f" 候选词数量: {len(candidate_words)} (限制: {max_candidates})")
- # LLM生成query(新方式:直接让LLM基于候选词生成query)
- logger.info(f" 使用LLM生成query(中心词: {base_word})...")
- evaluated = self.llm_evaluator.generate_queries_from_candidates(
- original_feature=original_feature,
- base_word=base_word,
- candidate_words=candidate_words,
- max_queries=10
- )
- # 选出Top 10(已经由LLM生成方法控制数量)
- top_10 = evaluated[:10]
- logger.info(f" 生成完成,共 {len(top_10)} 个query")
- # 保存分组结果 - 每个base_word有自己的available_words
- grouped_results.append({
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'base_word_info': base_info,
- 'top10_searches': top_10,
- 'available_words': candidate_words # 该base_word自己的候选词
- })
- # 写回结果
- feature_result['组合评估结果_分组'] = grouped_results
- total_searches = sum(len(g['top10_searches']) for g in grouped_results)
- logger.info(f" 完成!共 {len(grouped_results)} 个base_word,{total_searches} 个搜索词")
- # ========== 阶段5:执行搜索 ==========
- def _execute_single_search(
- self,
- idx: int,
- total: int,
- search_word: str,
- feature_ref: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 执行单个搜索任务(用于并发执行)
- Args:
- idx: 搜索索引
- total: 总搜索数
- search_word: 搜索词
- feature_ref: 特征引用(用于写入结果)
- Returns:
- 搜索结果信息
- """
- logger.info(f"[{idx}/{total}] 搜索: {search_word}")
- try:
- result = self.search_client.search(
- keyword=search_word,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 写入结果
- feature_ref['search_result'] = result
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count,
- 'search_params': {
- 'keyword': search_word,
- 'content_type': '图文',
- 'sort_type': '综合'
- }
- }
- return {'status': 'success', 'search_word': search_word, 'note_count': note_count}
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- feature_ref['search_result'] = None
- feature_ref['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'failed',
- 'note_count': 0,
- 'error': str(e)
- }
- return {'status': 'failed', 'search_word': search_word, 'error': str(e)}
- def stage4_execute_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0,
- top_n: int = 10
- ) -> List[Dict[str, Any]]:
- """
- 阶段4:执行小红书搜索
- Args:
- features_data: 阶段3的数据
- search_delay: 搜索延迟
- top_n: 每个原始特征取评分最高的N个搜索词
- Returns:
- 带搜索结果的数据
- """
- logger.info("=" * 60)
- logger.info("阶段4:执行小红书搜索")
- logger.info("=" * 60)
- # 按原始特征分组收集搜索词(从Stage4的组合评估结果_分组读取)
- feature_search_groups = {}
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- if original_feature not in feature_search_groups:
- feature_search_groups[original_feature] = []
- # 从Stage4的组合评估结果_分组读取(新结构)
- grouped_results = feature_result.get('组合评估结果_分组', [])
- if grouped_results:
- # 使用分组结构:每个base_word的top10都执行
- for group in grouped_results:
- base_word = group.get('base_word', '')
- base_similarity = group.get('base_word_similarity', 0)
- base_word_searches = []
- for eval_item in group.get('top10_searches', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- base_word_searches.append({
- 'search_word': sw,
- 'score': score,
- 'base_word': base_word,
- 'base_word_similarity': base_similarity,
- 'feature_ref': eval_item # 引用评估项,用于写入搜索结果
- })
- # 应用每个base_word的搜索次数限制
- if self.max_searches_per_base_word and len(base_word_searches) > self.max_searches_per_base_word:
- logger.info(f" 应用base_word限制: {base_word} 从 {len(base_word_searches)} 减少到 {self.max_searches_per_base_word}")
- base_word_searches = base_word_searches[:self.max_searches_per_base_word]
- feature_search_groups[original_feature].extend(base_word_searches)
- else:
- # 兼容旧结构(组合评估结果)
- for eval_item in feature_result.get('组合评估结果', []):
- sw = eval_item.get('search_word')
- if not sw:
- continue
- score = eval_item.get('score', 0.0)
- feature_search_groups[original_feature].append({
- 'search_word': sw,
- 'score': score,
- 'feature_ref': eval_item
- })
- # 应用每个原始特征的搜索次数限制
- if self.max_searches_per_feature and len(feature_search_groups[original_feature]) > self.max_searches_per_feature:
- logger.info(f" 应用特征限制: {original_feature} 从 {len(feature_search_groups[original_feature])} 减少到 {self.max_searches_per_feature}")
- feature_search_groups[original_feature] = feature_search_groups[original_feature][:self.max_searches_per_feature]
- # 收集所有搜索任务(分组结构下执行所有base_word的top10,不再过滤)
- all_searches = []
- total_count = 0
- for original_feature, search_list in feature_search_groups.items():
- total_count += len(search_list)
- all_searches.extend(search_list)
- logger.info(f" {original_feature}: {len(search_list)} 个搜索词")
- # 应用全局搜索次数限制
- if self.max_total_searches and len(all_searches) > self.max_total_searches:
- logger.info(f" 应用全局限制:从 {len(all_searches)} 个减少到 {self.max_total_searches} 个")
- all_searches = all_searches[:self.max_total_searches]
- logger.info(f"\n共 {len(all_searches)} 个搜索任务")
- logger.info(f" 并发执行搜索(并发数: {self.search_max_workers})")
- # 使用ThreadPoolExecutor并发执行搜索
- with ThreadPoolExecutor(max_workers=self.search_max_workers) as executor:
- # 提交所有搜索任务
- futures = []
- for idx, item in enumerate(all_searches, 1):
- future = executor.submit(
- self._execute_single_search,
- idx,
- len(all_searches),
- item['search_word'],
- item['feature_ref']
- )
- futures.append(future)
- # 等待所有搜索完成
- for future in as_completed(futures):
- try:
- result = future.result()
- # 结果已经写入feature_ref,无需额外处理
- except Exception as e:
- logger.error(f" 搜索任务失败: {e}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage4_with_search_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段4完成")
- logger.info("=" * 60)
- return features_data
- # ========== 阶段5:LLM评估搜索结果 ==========
- def stage5_evaluate_search_results(
- self,
- features_data: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """
- 阶段5:用LLM评估搜索结果(多模态)
- Args:
- features_data: 阶段4的数据
- Returns:
- 带结果评估的数据
- """
- logger.info("=" * 60)
- logger.info("阶段5:LLM评估搜索结果")
- logger.info("=" * 60)
- # 收集所有需要评估的特征节点
- features_to_evaluate = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- if feature.get('search_result') and feature['search_metadata']['status'] == 'success':
- features_to_evaluate.append({
- 'original_feature': original_feature,
- 'feature_node': feature
- })
- logger.info(f"共 {len(features_to_evaluate)} 个搜索结果需要评估")
- # 并行评估(并发数较低)
- with ThreadPoolExecutor(max_workers=8) as executor:
- futures = []
- for item in features_to_evaluate:
- future = executor.submit(
- self._evaluate_single_search_result,
- item['original_feature'],
- item['feature_node']
- )
- futures.append((future, item))
- # 收集结果
- for idx, (future, item) in enumerate(futures, 1):
- try:
- evaluation = future.result()
- item['feature_node']['result_evaluation'] = evaluation
- logger.info(f" [{idx}/{len(futures)}] {item['feature_node']['search_word']}: "
- f"relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" 评估失败: {item['feature_node']['search_word']}, 错误: {e}")
- item['feature_node']['result_evaluation'] = None
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage5_with_evaluations.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段5完成")
- logger.info("=" * 60)
- return features_data
- def _evaluate_single_search_result(
- self,
- original_feature: str,
- feature_node: Dict[str, Any]
- ) -> Dict[str, Any]:
- """
- 评估单个搜索结果(使用并行评估)
- Args:
- original_feature: 原始特征
- feature_node: 特征节点
- Returns:
- 评估结果
- """
- search_word = feature_node.get('search_word', '')
- notes = feature_node['search_result'].get('data', {}).get('data', [])
- return self.llm_evaluator.evaluate_search_results_parallel(
- original_feature=original_feature,
- search_word=search_word,
- notes=notes,
- max_notes=20,
- max_workers=20 # 20个并发评估每个帖子
- )
- def stage5_evaluate_search_results_with_filter(
- self,
- features_data: List[Dict[str, Any]]
- ) -> List[Dict[str, Any]]:
- """
- 阶段5:用LLM评估搜索结果(使用两层过滤评估)
- 遍历所有搜索结果,使用两层评估机制:
- 1. 第一层:过滤与搜索Query无关的结果
- 2. 第二层:评估与目标特征的匹配度(0.8-1.0/0.6-0.79/0.5-0.59/≤0.4)
- Args:
- features_data: 阶段4的数据
- Returns:
- 带评估结果的数据
- """
- logger.info("=" * 60)
- logger.info("阶段5:LLM评估搜索结果(两层过滤评估)")
- logger.info(f" 并发数: {self.stage5_max_workers}")
- logger.info(f" 每个搜索最多评估: {self.stage5_max_notes} 个帖子")
- logger.info("=" * 60)
- # 收集所有需要评估的搜索项
- search_items_to_evaluate = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- # 从组合评估结果_分组中读取搜索结果
- grouped_results = feature_result.get('组合评估结果_分组', [])
- if grouped_results:
- for group in grouped_results:
- for eval_item in group.get('top10_searches', []):
- # 检查是否有搜索结果
- if eval_item.get('search_result') and eval_item.get('search_metadata', {}).get('status') == 'success':
- search_items_to_evaluate.append({
- 'original_feature': original_feature,
- 'search_item': eval_item,
- 'base_word': group.get('base_word', '')
- })
- else:
- # 兼容旧结构
- for eval_item in feature_result.get('组合评估结果', []):
- if eval_item.get('search_result') and eval_item.get('search_metadata', {}).get('status') == 'success':
- search_items_to_evaluate.append({
- 'original_feature': original_feature,
- 'search_item': eval_item,
- 'base_word': ''
- })
- logger.info(f"共 {len(search_items_to_evaluate)} 个搜索结果需要评估")
- # 并行评估所有搜索结果
- with ThreadPoolExecutor(max_workers=self.stage5_max_workers) as executor:
- futures = []
- for idx, item in enumerate(search_items_to_evaluate, 1):
- future = executor.submit(
- self._evaluate_single_search_with_filter,
- idx,
- len(search_items_to_evaluate),
- item['original_feature'],
- item['search_item'],
- item['base_word']
- )
- futures.append((future, item))
- # 收集结果
- success_count = 0
- failed_count = 0
- for future, item in futures:
- try:
- evaluation = future.result()
- item['search_item']['evaluation_with_filter'] = evaluation
- success_count += 1
- except Exception as e:
- logger.error(f" 评估失败: {item['search_item'].get('search_word', 'unknown')}, 错误: {e}")
- item['search_item']['evaluation_with_filter'] = None
- failed_count += 1
- logger.info(f"\n评估完成: 成功 {success_count}, 失败 {failed_count}")
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage5_with_evaluations.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段5完成")
- logger.info("=" * 60)
- return features_data
- def _evaluate_single_search_with_filter(
- self,
- idx: int,
- total: int,
- original_feature: str,
- search_item: Dict[str, Any],
- base_word: str
- ) -> Dict[str, Any]:
- """
- 评估单个搜索结果(使用两层过滤)
- Args:
- idx: 索引
- total: 总数
- original_feature: 原始特征
- search_item: 搜索项(包含search_word和search_result)
- base_word: 基础词
- Returns:
- 评估结果
- """
- search_word = search_item.get('search_word', '')
- notes = search_item['search_result'].get('data', {}).get('data', [])
- logger.info(f"[{idx}/{total}] 评估: {search_word} (帖子数: {len(notes)})")
- # 调用LLM评估器的批量评估方法
- evaluation = self.llm_evaluator.batch_evaluate_notes_with_filter(
- search_query=search_word,
- target_feature=original_feature,
- notes=notes,
- max_notes=self.stage5_max_notes,
- max_workers=self.stage5_max_workers
- )
- # 统计信息
- filtered_count = evaluation.get('filtered_count', 0)
- evaluated_count = evaluation.get('evaluated_count', 0)
- match_dist = evaluation.get('match_distribution', {})
- logger.info(f" ✓ 完成: 过滤 {filtered_count}, 评估 {evaluated_count}, "
- f"完全匹配 {match_dist.get('完全匹配(0.8-1.0)', 0)}, "
- f"相似匹配 {match_dist.get('相似匹配(0.6-0.79)', 0)}")
- return evaluation
- # ========== 阶段7:扩展搜索 ==========
- def stage7_extended_searches(
- self,
- features_data: List[Dict[str, Any]],
- search_delay: float = 2.0
- ) -> List[Dict[str, Any]]:
- """
- 阶段7:基于评估结果扩展搜索(多个)
- Args:
- features_data: 阶段6的数据
- search_delay: 搜索延迟
- Returns:
- 带扩展搜索的数据
- """
- logger.info("=" * 60)
- logger.info("阶段7:扩展搜索")
- logger.info("=" * 60)
- # 收集需要扩展搜索的任务
- extension_tasks = []
- for feature_result in features_data:
- original_feature = feature_result['原始特征名称']
- for assoc in feature_result.get('找到的关联', []):
- for feature in assoc.get('特征列表', []):
- result_eval = feature.get('result_evaluation')
- if not result_eval:
- continue
- extracted_elements = result_eval.get('extracted_elements', [])
- if not extracted_elements:
- continue
- # 为每个提取的元素创建扩展搜索
- base_search_word = feature.get('search_word', '')
- for element in extracted_elements:
- extended_keyword = f"{base_search_word} {element}"
- extension_tasks.append({
- 'extended_keyword': extended_keyword,
- 'original_feature': original_feature,
- 'feature_node': feature,
- 'element': element
- })
- logger.info(f"共 {len(extension_tasks)} 个扩展搜索任务")
- # 执行扩展搜索
- for idx, task in enumerate(extension_tasks, 1):
- extended_kw = task['extended_keyword']
- logger.info(f"[{idx}/{len(extension_tasks)}] 扩展搜索: {extended_kw}")
- try:
- result = self.search_client.search(
- keyword=extended_kw,
- content_type='不限',
- sort_type='综合',
- max_retries=3,
- use_cache=True # 启用搜索缓存
- )
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- # 评估扩展搜索结果
- logger.info(f" 评估扩展搜索结果...")
- evaluation = self.llm_evaluator.evaluate_search_results(
- original_feature=task['original_feature'],
- search_word=extended_kw,
- notes=result.get('data', {}).get('data', []),
- max_notes=20,
- max_images_per_note=2
- )
- # 存储扩展搜索结果
- feature_node = task['feature_node']
- if 'extended_searches' not in feature_node:
- feature_node['extended_searches'] = []
- feature_node['extended_searches'].append({
- 'extended_keyword': extended_kw,
- 'based_on_element': task['element'],
- 'search_result': result,
- 'search_metadata': {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count
- },
- 'result_evaluation': evaluation
- })
- logger.info(f" 评估完成,relevance={evaluation['overall_relevance']:.3f}")
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- # 延迟
- if idx < len(extension_tasks):
- time.sleep(search_delay)
- # 保存结果
- output_path = os.path.join(self.output_dir, "stage7_final_results.json")
- self._save_json(features_data, output_path)
- logger.info(f"\n" + "=" * 60)
- logger.info(f"阶段7完成")
- logger.info("=" * 60)
- return features_data
- # ========== 主流程 ==========
- def run_full_pipeline(self):
- """执行完整流程"""
- logger.info("\n" + "=" * 60)
- logger.info("开始执行完整流程")
- logger.info("=" * 60)
- try:
- # Stage 6 Only 模式:只运行深度解构分析(从 Stage 5 结果开始)
- if self.stage6_only:
- logger.info("运行模式: Stage 6 Only (从 Stage 5 结果开始)")
- stage5_path = os.path.join(self.output_dir, "stage5_with_evaluations.json")
- if not os.path.exists(stage5_path):
- raise FileNotFoundError(f"Stage 5 结果不存在: {stage5_path}")
- with open(stage5_path, 'r', encoding='utf-8') as f:
- stage5_results = json.load(f)
- stage6_results = self.stage6_analyzer.run(stage5_results)
- return stage6_results
- # 正常流程:从 Stage 1 开始
- # 阶段1
- stage1_results = self.stage1_filter_features()
- # 阶段2:从how文件提取候选词
- stage2_results = self.stage2_extract_candidates(stage1_results)
- # 阶段3:多词组合 + LLM评估
- stage3_results = self.stage4_generate_and_evaluate_search_words(
- stage2_results,
- max_workers=8, # 提高并发从4到8
- max_combo_length=3 # 降低组合长度从4到3
- )
- # 阶段4:执行搜索
- stage4_results = self.stage4_execute_searches(stage3_results, search_delay=2.0, top_n=self.top_n)
- # 阶段5:LLM评估搜索结果 - 条件执行
- if self.enable_stage5:
- stage5_results = self.stage5_evaluate_search_results_with_filter(stage4_results)
- else:
- stage5_results = stage4_results
- logger.info("\n" + "=" * 60)
- logger.info("阶段5:跳过(未启用)")
- logger.info("=" * 60)
- # 阶段6:深度解构分析 - 条件执行
- if self.enable_stage6:
- stage6_results = self.stage6_analyzer.run(stage5_results)
- final_results = stage6_results
- else:
- final_results = stage5_results
- logger.info("\n" + "=" * 60)
- if self.enable_stage6:
- logger.info("✓ 完整流程执行完成(Stage1-6)")
- elif self.enable_stage5:
- logger.info("✓ 完整流程执行完成(Stage1-5)")
- else:
- logger.info("✓ 完整流程执行完成(Stage1-4)")
- logger.info("=" * 60)
- # 自动执行可视化
- logger.info("\n" + "=" * 60)
- logger.info("开始生成可视化...")
- logger.info("=" * 60)
- try:
- # 使用统一的可视化脚本
- viz_script = 'visualize_stage6_results.py'
- logger.info(f" 使用可视化脚本: {viz_script}")
- result = subprocess.run(
- ['python3', viz_script],
- capture_output=True,
- text=True,
- timeout=60
- )
- if result.returncode == 0:
- logger.info("✓ 可视化生成成功")
- logger.info(result.stdout)
- else:
- logger.error(f"可视化生成失败: {result.stderr}")
- except subprocess.TimeoutExpired:
- logger.error("可视化生成超时")
- except Exception as e:
- logger.error(f"可视化生成异常: {e}")
- return final_results
- except Exception as e:
- logger.error(f"流程执行失败: {e}")
- raise
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='增强搜索系统V2')
- parser.add_argument(
- '--how-json',
- default='input/690d977d0000000007036331_how.json',
- help='How解构文件路径'
- )
- parser.add_argument(
- '--api-key',
- default=None,
- help='OpenRouter API密钥(默认从环境变量读取)'
- )
- parser.add_argument(
- '--output-dir',
- default='output_v2',
- help='输出目录'
- )
- parser.add_argument(
- '--top-n',
- type=int,
- default=10,
- help='每个原始特征取评分最高的N个搜索词(默认10)'
- )
- parser.add_argument(
- '--max-total-searches',
- type=int,
- default=None,
- help='全局最大搜索次数限制(默认None不限制)'
- )
- parser.add_argument(
- '--search-workers',
- type=int,
- default=3,
- help='搜索并发数(默认3)'
- )
- parser.add_argument(
- '--max-searches-per-feature',
- type=int,
- default=None,
- help='每个原始特征的最大搜索次数(默认None不限制)'
- )
- parser.add_argument(
- '--max-searches-per-base-word',
- type=int,
- default=None,
- help='每个base_word的最大搜索次数(默认None不限制)'
- )
- parser.add_argument(
- '--enable-stage5',
- action='store_true',
- help='启用Stage 5评估(默认False)'
- )
- parser.add_argument(
- '--stage5-max-workers',
- type=int,
- default=10,
- help='Stage 5并发评估数(默认10)'
- )
- parser.add_argument(
- '--stage5-max-notes',
- type=int,
- default=20,
- help='每个搜索结果评估的最大帖子数(默认20)'
- )
- parser.add_argument(
- '--enable-stage6',
- action='store_true',
- help='启用 Stage 6 深度解构分析'
- )
- parser.add_argument(
- '--stage6-only',
- action='store_true',
- help='只运行 Stage 6(从 Stage 5 结果开始)'
- )
- parser.add_argument(
- '--stage6-max-workers',
- type=int,
- default=5,
- help='Stage 6 并发数(默认5)'
- )
- parser.add_argument(
- '--stage6-max-notes',
- type=int,
- default=None,
- help='Stage 6 最多处理多少个完全匹配的帖子(默认None不限制)'
- )
- parser.add_argument(
- '--stage6-skip',
- type=int,
- default=0,
- help='Stage 6 跳过前 N 个完全匹配的帖子(默认0)'
- )
- parser.add_argument(
- '--stage6-sort-by',
- type=str,
- choices=['score', 'time', 'engagement'],
- default='score',
- help='Stage 6 排序方式: score(评分), time(时间), engagement(互动量)'
- )
- parser.add_argument(
- '--stage6-api-url',
- type=str,
- default='http://192.168.245.150:7000/what/analysis/single',
- help='Stage 6 解构 API 地址'
- )
- parser.add_argument(
- '--stage6-min-score',
- type=float,
- default=0.8,
- help='Stage 6 处理的最低分数阈值(默认0.8,0-1分制)'
- )
- args = parser.parse_args()
- # 创建系统实例
- system = EnhancedSearchV2(
- how_json_path=args.how_json,
- openrouter_api_key=args.api_key,
- output_dir=args.output_dir,
- top_n=args.top_n,
- max_total_searches=args.max_total_searches,
- search_max_workers=args.search_workers,
- max_searches_per_feature=args.max_searches_per_feature,
- max_searches_per_base_word=args.max_searches_per_base_word,
- enable_stage5=args.enable_stage5,
- stage5_max_workers=args.stage5_max_workers,
- stage5_max_notes=args.stage5_max_notes,
- enable_stage6=args.enable_stage6,
- stage6_only=args.stage6_only,
- stage6_max_workers=args.stage6_max_workers,
- stage6_max_notes=args.stage6_max_notes,
- stage6_skip=args.stage6_skip,
- stage6_sort_by=args.stage6_sort_by,
- stage6_api_url=args.stage6_api_url,
- stage6_min_score=args.stage6_min_score
- )
- # 执行完整流程
- system.run_full_pipeline()
- if __name__ == '__main__':
- main()
|