#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Stage 8 相似度分析器 计算 Stage 7 解构特征与原始特征的相似度评分 """ import os import json import time import logging import asyncio from datetime import datetime from typing import Dict, List, Any, Optional from lib.hybrid_similarity import compare_phrases_cartesian from lib.config import get_cache_dir try: from tqdm import tqdm TQDM_AVAILABLE = True except ImportError: TQDM_AVAILABLE = False logger = logging.getLogger(__name__) def extract_deconstructed_features(api_response: Dict) -> List[Dict]: """ 从三点解构中提取所有特征 Args: api_response: Stage 7 的 api_response 对象 Returns: 特征列表,每个特征包含: - feature_name: 特征名称 - dimension: 维度 (灵感点-全新内容/灵感点-共性差异/灵感点-共性内容/目的点/关键点) - dimension_detail: 维度细分 (实质/形式/意图等) - weight: 权重 - source_index: 在该维度中的索引 - source_*: 溯源信息 (候选编号、目的点描述、关键点描述等) """ features = [] # 检查 API 响应状态 if api_response.get('status') != 'success': logger.warning(" API 响应状态不是 success,无法提取特征") return features result = api_response.get('result', {}) # 检查是否有 data 字段 if 'data' not in result: logger.warning(" API 响应中没有 data 字段") return features data = result['data'] three_point = data.get('三点解构', {}) if not three_point: logger.warning(" 三点解构数据为空") return features # 1. 提取灵感点 (3个子类别) inspiration = three_point.get('灵感点', {}) for category in ['全新内容', '共性差异', '共性内容']: items = inspiration.get(category, []) for idx, item in enumerate(items): extracted_features = item.get('提取的特征', []) for feat in extracted_features: feature_name = feat.get('特征名称', '') if not feature_name: continue features.append({ 'feature_name': feature_name, 'dimension': f'灵感点-{category}', 'dimension_detail': feat.get('维度分类', ''), # 注意字段名 'weight': feat.get('权重', 0), 'source_index': idx, 'source_candidate_number': item.get('候选编号', 0), 'source_inspiration': item.get('灵感点', '') }) # 2. 提取目的点 purpose = three_point.get('目的点', {}) purposes_list = purpose.get('purposes', []) for idx, item in enumerate(purposes_list): extracted_features = item.get('提取的特征', []) for feat in extracted_features: feature_name = feat.get('特征名称', '') if not feature_name: continue features.append({ 'feature_name': feature_name, 'dimension': '目的点', 'dimension_detail': feat.get('特征分类', ''), # 注意字段名 'weight': feat.get('权重', 0), 'source_index': idx, 'source_purpose': item.get('目的点', ''), 'source_purpose_dimension': item.get('维度', {}) }) # 3. 提取关键点 key_points_data = three_point.get('关键点', {}) key_points_list = key_points_data.get('key_points', []) for idx, item in enumerate(key_points_list): extracted_features = item.get('提取的特征', []) for feat in extracted_features: feature_name = feat.get('特征名称', '') if not feature_name: continue features.append({ 'feature_name': feature_name, 'dimension': '关键点', 'dimension_detail': feat.get('维度', ''), # 注意字段名 'weight': feat.get('权重', 0), 'source_index': idx, 'source_candidate_number': item.get('候选编号', 0), 'source_key_point': item.get('关键点', ''), 'source_key_point_dimension': item.get('维度', '') }) logger.info(f" 提取特征数量: {len(features)}") if features: # 统计各维度数量 dimension_counts = {} for feat in features: dim = feat['dimension'] dimension_counts[dim] = dimension_counts.get(dim, 0) + 1 logger.info(f" 维度分布: {dimension_counts}") return features async def calculate_similarity_for_note( note_result: Dict, original_feature: str, weight_embedding: float = 0.5, weight_semantic: float = 0.5, min_similarity: float = 0.0 ) -> Dict: """ 计算单个帖子的所有特征与原始特征的相似度 Args: note_result: Stage 7 的单个 result 对象 original_feature: 原始特征名称 weight_embedding: 向量模型权重 weight_semantic: LLM 模型权重 min_similarity: 最小相似度阈值,低于此值的特征会被过滤 Returns: 包含相似度信息的结果对象 """ note_id = note_result.get('note_id', '') logger.info(f" [{note_id}] 开始计算相似度...") # 1. 提取解构特征 deconstructed_features = extract_deconstructed_features( note_result['api_response'] ) if not deconstructed_features: logger.warning(f" [{note_id}] 没有提取到特征") return { 'note_id': note_id, 'original_feature': original_feature, 'evaluation_score': note_result.get('evaluation_score', 0), 'search_word': note_result.get('search_word', ''), 'note_data': note_result.get('note_data', {}), 'deconstructed_features': [], 'similarity_statistics': { 'total_features': 0, 'max_similarity': 0, 'min_similarity': 0, 'avg_similarity': 0, 'high_similarity_count': 0, 'medium_similarity_count': 0, 'low_similarity_count': 0 } } # 2. 构建特征名称列表 feature_names = [f['feature_name'] for f in deconstructed_features] logger.info(f" [{note_id}] 调用相似度计算 API (1×{len(feature_names)} 笛卡尔积)...") # 3. 批量计算相似度 (1×N 笛卡尔积) try: start_time = time.time() similarity_results = await compare_phrases_cartesian( phrases_a=[original_feature], phrases_b=feature_names, max_concurrent=50 ) elapsed = time.time() - start_time logger.info(f" [{note_id}] 相似度计算完成 ({elapsed:.1f}秒)") # 4. 映射结果回特征对象 for i, feat in enumerate(deconstructed_features): feat['similarity_score'] = similarity_results[0][i]['相似度'] feat['similarity_explanation'] = similarity_results[0][i]['说明'] # 5. 过滤低相似度特征 if min_similarity > 0: original_count = len(deconstructed_features) deconstructed_features = [ f for f in deconstructed_features if f['similarity_score'] >= min_similarity ] filtered_count = original_count - len(deconstructed_features) if filtered_count > 0: logger.info(f" [{note_id}] 过滤掉 {filtered_count} 个低相似度特征 (< {min_similarity})") # 6. 计算统计信息 if deconstructed_features: scores = [f['similarity_score'] for f in deconstructed_features] statistics = { 'total_features': len(scores), 'max_similarity': round(max(scores), 3), 'min_similarity': round(min(scores), 3), 'avg_similarity': round(sum(scores) / len(scores), 3), 'high_similarity_count': sum(1 for s in scores if s >= 0.7), 'medium_similarity_count': sum(1 for s in scores if 0.5 <= s < 0.7), 'low_similarity_count': sum(1 for s in scores if s < 0.5) } # 7. 按相似度降序排序 deconstructed_features.sort(key=lambda x: x['similarity_score'], reverse=True) logger.info(f" [{note_id}] 统计: 最高={statistics['max_similarity']}, " f"平均={statistics['avg_similarity']}, " f"高相似度={statistics['high_similarity_count']}个") else: statistics = { 'total_features': 0, 'max_similarity': 0, 'min_similarity': 0, 'avg_similarity': 0, 'high_similarity_count': 0, 'medium_similarity_count': 0, 'low_similarity_count': 0 } return { 'note_id': note_id, 'original_feature': original_feature, 'evaluation_score': note_result.get('evaluation_score', 0), 'search_word': note_result.get('search_word', ''), 'note_data': note_result.get('note_data', {}), 'deconstructed_features': deconstructed_features, 'similarity_statistics': statistics, 'processing_time_seconds': round(elapsed, 2) } except Exception as e: logger.error(f" [{note_id}] 相似度计算失败: {e}") return { 'note_id': note_id, 'original_feature': original_feature, 'evaluation_score': note_result.get('evaluation_score', 0), 'search_word': note_result.get('search_word', ''), 'note_data': note_result.get('note_data', {}), 'deconstructed_features': [], 'similarity_statistics': { 'total_features': 0, 'error': str(e) } } class Stage8SimilarityAnalyzer: """Stage 8: 解构特征与原始特征的相似度分析""" def __init__( self, weight_embedding: float = 0.5, weight_semantic: float = 0.5, max_workers: int = 5, min_similarity: float = 0.0, output_dir: str = "output_v2", target_features: Optional[List[str]] = None, stage6_path: str = 'output_v2/stage6_with_evaluations.json', update_stage6: bool = True ): """ 初始化 Stage 8 分析器 Args: weight_embedding: 向量模型权重(默认 0.5) weight_semantic: LLM 模型权重(默认 0.5) max_workers: 最大并发数(默认 5) min_similarity: 最小相似度阈值(默认 0.0,保留所有特征) output_dir: 输出目录 target_features: 指定要处理的原始特征列表(None = 处理所有特征) stage6_path: Stage 6 数据文件路径(用于计算综合得分) update_stage6: 是否计算并更新 Stage 6 的综合得分(默认 True) """ self.weight_embedding = weight_embedding self.weight_semantic = weight_semantic self.max_workers = max_workers self.min_similarity = min_similarity self.output_dir = output_dir self.target_features = target_features self.stage6_path = stage6_path self.update_stage6 = update_stage6 # 验证权重 total_weight = weight_embedding + weight_semantic if abs(total_weight - 1.0) > 0.001: raise ValueError(f"权重之和必须为1.0,当前为: {total_weight}") def _save_intermediate_results( self, results: List[Dict], output_path: str, processed_count: int, total_count: int, start_time: float ): """保存中间结果""" base_dir = os.path.dirname(output_path) or self.output_dir base_name = os.path.basename(output_path) name_without_ext = os.path.splitext(base_name)[0] intermediate_path = os.path.join( base_dir, f"{name_without_ext}_partial_{processed_count}of{total_count}.json" ) # 统计 total_features = sum(r['similarity_statistics']['total_features'] for r in results) avg_max_sim = sum(r['similarity_statistics']['max_similarity'] for r in results) / len(results) intermediate_result = { 'metadata': { 'stage': 'stage8_partial', 'description': f'部分结果({processed_count}/{total_count})', 'processed_notes': len(results), 'total_features_extracted': total_features, 'avg_max_similarity': round(avg_max_sim, 3), 'saved_at': datetime.now().isoformat(), 'processing_time_seconds': round(time.time() - start_time, 2) }, 'results': results } os.makedirs(base_dir, exist_ok=True) with open(intermediate_path, 'w', encoding='utf-8') as f: json.dump(intermediate_result, f, ensure_ascii=False, indent=2) logger.info(f" 已保存中间结果: {intermediate_path}") async def run_async( self, stage7_results: Dict, output_path: Optional[str] = None ) -> Dict: """ 执行 Stage 8 相似度分析(异步版本) Args: stage7_results: Stage 7 结果 output_path: 输出路径(可选) Returns: Stage 8 结果 """ logger.info("\n" + "=" * 60) logger.info("Stage 8: 解构特征与原始特征的相似度分析") logger.info("=" * 60) # 打印配置 logger.info("配置参数:") logger.info(f" 向量模型权重: {self.weight_embedding}") logger.info(f" LLM 模型权重: {self.weight_semantic}") logger.info(f" 最大并发数: {self.max_workers}") logger.info(f" 最小相似度阈值: {self.min_similarity}") if self.target_features: logger.info(f" 目标特征: {', '.join(self.target_features)}") else: logger.info(f" 目标特征: 全部") # 默认输出路径 if output_path is None: output_path = os.path.join(self.output_dir, "stage8_similarity_scores.json") # 提取 Stage 7 结果 results_list = stage7_results.get('results', []) # 过滤目标特征 if self.target_features: results_list = [ r for r in results_list if r.get('original_feature') in self.target_features ] total_notes = len(results_list) logger.info(f" 待处理帖子数: {total_notes}") if total_notes == 0: logger.warning(" 没有需要处理的帖子") return { 'metadata': { 'stage': 'stage8', 'processed_notes': 0 }, 'results': [] } # 创建任务列表 start_time = time.time() results = [] # 使用 Semaphore 控制并发数 semaphore = asyncio.Semaphore(self.max_workers) async def bounded_task(result): async with semaphore: return await calculate_similarity_for_note( result, result.get('original_feature', ''), self.weight_embedding, self.weight_semantic, self.min_similarity ) tasks = [bounded_task(result) for result in results_list] # 带进度条执行 if TQDM_AVAILABLE: logger.info(" 使用进度条显示...") processed_count = 0 save_interval = 10 for coro in tqdm( asyncio.as_completed(tasks), total=len(tasks), desc=" 相似度计算进度", unit="帖子", ncols=100 ): result = await coro results.append(result) processed_count += 1 # 增量保存 if processed_count % save_interval == 0: self._save_intermediate_results( results, output_path, processed_count, total_notes, start_time ) else: # 简单执行 results = await asyncio.gather(*tasks) logger.info(f" 完成: {len(results)}/{total_notes}") processing_time = time.time() - start_time # 计算总体统计 total_features = sum(r['similarity_statistics']['total_features'] for r in results) all_max_similarities = [r['similarity_statistics']['max_similarity'] for r in results if r['similarity_statistics']['total_features'] > 0] overall_stats = { 'total_notes': total_notes, 'total_features_extracted': total_features, 'avg_features_per_note': round(total_features / total_notes, 1) if total_notes > 0 else 0, 'avg_max_similarity': round(sum(all_max_similarities) / len(all_max_similarities), 3) if all_max_similarities else 0, 'notes_with_high_similarity': sum(1 for r in results if r['similarity_statistics'].get('high_similarity_count', 0) > 0) } logger.info(f"\n 总耗时: {processing_time:.1f}秒") logger.info(f" 总特征数: {total_features}") logger.info(f" 平均特征数/帖子: {overall_stats['avg_features_per_note']}") logger.info(f" 平均最高相似度: {overall_stats['avg_max_similarity']}") logger.info(f" 包含高相似度特征的帖子: {overall_stats['notes_with_high_similarity']}") # 构建最终结果 final_result = { 'metadata': { 'stage': 'stage8', 'description': '解构特征与原始特征的相似度评分', 'source_file': stage7_results.get('metadata', {}).get('created_at', ''), 'target_features': self.target_features if self.target_features else '全部', 'similarity_config': { 'algorithm': 'hybrid_similarity', 'weight_embedding': self.weight_embedding, 'weight_semantic': self.weight_semantic, 'min_similarity_threshold': self.min_similarity }, 'overall_statistics': overall_stats, 'created_at': datetime.now().isoformat(), 'processing_time_seconds': round(processing_time, 2) }, 'results': results } # 保存结果 os.makedirs(os.path.dirname(output_path) or self.output_dir, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(final_result, f, ensure_ascii=False, indent=2) logger.info(f" 结果已保存: {output_path}") # 计算并更新综合得分P if self.update_stage6: logger.info("\n" + "=" * 60) logger.info("开始计算综合得分P并更新Stage 6数据...") logger.info("=" * 60) self._calculate_and_update_comprehensive_scores(results) return final_result def _calculate_and_update_comprehensive_scores(self, stage8_results: List[Dict]): """ 计算综合得分P并更新Stage 6数据 Args: stage8_results: Stage 8 的结果列表 """ try: # 1. 加载 Stage 6 数据 logger.info(f" 加载 Stage 6 数据: {self.stage6_path}") if not os.path.exists(self.stage6_path): logger.error(f" Stage 6 文件不存在: {self.stage6_path}") return with open(self.stage6_path, 'r', encoding='utf-8') as f: stage6_data = json.load(f) # 2. 构建 Stage 8 映射 (note_id → max_similarity) logger.info(" 构建相似度映射...") similarity_map = {} for result in stage8_results: note_id = result['note_id'] max_similarity = result['similarity_statistics']['max_similarity'] similarity_map[note_id] = max_similarity logger.info(f" 相似度映射条目数: {len(similarity_map)}") # 3. 遍历 Stage 6 中的所有原始特征和搜索词,计算 P 值 # Stage 6 数据是一个列表,每个元素是一个原始特征 updated_count = 0 total_searches = 0 logger.info(f" 开始遍历 {len(stage6_data)} 个原始特征...") for feature_item in stage6_data: original_feature = feature_item.get('原始特征名称', '') logger.info(f"\n 处理原始特征: {original_feature}") # 遍历每个分组 for group in feature_item.get('组合评估结果_分组', []): source_word = group.get('source_word', '') # 遍历该分组的所有搜索词 for search_item in group.get('top10_searches', []): search_word = search_item.get('search_word', '') total_searches += 1 logger.info(f" 处理搜索词: {search_word} (来源: {source_word})") # 计算该搜索词的综合得分 p_score, p_detail = self._calculate_single_query_score( search_item, similarity_map ) # 更新搜索词数据 if p_score is not None: search_item['comprehensive_score'] = round(p_score, 3) search_item['comprehensive_score_detail'] = p_detail updated_count += 1 logger.info(f" 综合得分P = {p_score:.3f} (M={p_detail['M']}, N={p_detail['N']})") else: logger.warning(f" 无法计算综合得分(可能缺少数据)") # 4. 保存更新后的 Stage 6 数据 logger.info(f"\n 保存更新后的 Stage 6 数据...") logger.info(f" 已更新 {updated_count}/{total_searches} 个搜索词") with open(self.stage6_path, 'w', encoding='utf-8') as f: json.dump(stage6_data, f, ensure_ascii=False, indent=2) logger.info(f" 更新完成: {self.stage6_path}") except Exception as e: logger.error(f" 计算综合得分失败: {e}", exc_info=True) def _calculate_single_query_score( self, query: Dict, similarity_map: Dict[str, float] ) -> tuple[Optional[float], Optional[Dict]]: """ 计算单个查询的综合得分P Args: query: Stage 6 中的单个查询对象 similarity_map: note_id → max_similarity 的映射 Returns: (P值, 详细计算信息) 或 (None, None) """ # 获取总帖子数 N evaluation_with_filter = query.get('evaluation_with_filter', {}) N = evaluation_with_filter.get('total_notes', 0) if N == 0: logger.warning(f" 查询总帖子数为0,无法计算P值") return None, None # 获取笔记评估数据和原始笔记数据 notes_evaluation = evaluation_with_filter.get('notes_evaluation', []) search_result = query.get('search_result', {}) notes_data = search_result.get('data', {}).get('data', []) if not notes_evaluation or not notes_data: logger.warning(f" 缺少评估数据或笔记数据") return 0.0, { 'N': N, 'M': 0, 'total_contribution': 0.0, 'complete_matches': [] } # 获取完全匹配的帖子列表 (综合得分 >= 0.8) complete_matches_data = [] for note_eval in notes_evaluation: score = note_eval.get('综合得分', 0) if score >= 0.8: note_index = note_eval.get('note_index', -1) if 0 <= note_index < len(notes_data): # 从原始数据中获取note_id note_id = notes_data[note_index].get('id', '') note_card = notes_data[note_index].get('note_card', {}) note_title = note_card.get('display_title', '') complete_matches_data.append({ 'note_id': note_id, 'note_title': note_title, 'evaluation_score': score, 'note_index': note_index }) M = len(complete_matches_data) logger.info(f" 完全匹配数: M = {M}/{N}") if M == 0: # 没有完全匹配,P = 0 return 0.0, { 'N': N, 'M': 0, 'total_contribution': 0.0, 'complete_matches': [] } # 计算每个完全匹配的贡献 a×b contributions = [] total_contribution = 0.0 for match in complete_matches_data: note_id = match['note_id'] evaluation_score = match['evaluation_score'] # a 值 # 从 similarity_map 获取 b 值 max_similarity = similarity_map.get(note_id, 0) # b 值 # 计算贡献 contribution = evaluation_score * max_similarity total_contribution += contribution # 保存详细信息 contributions.append({ 'note_id': note_id, 'note_title': match['note_title'], 'evaluation_score': round(evaluation_score, 3), 'max_similarity': round(max_similarity, 3), 'contribution': round(contribution, 3) }) # 计算综合得分 P = Σ(a×b) / N P = total_contribution / N # 按贡献降序排序 contributions.sort(key=lambda x: x['contribution'], reverse=True) # 构建详细信息 detail = { 'N': N, 'M': M, 'total_contribution': round(total_contribution, 3), 'complete_matches': contributions } return P, detail def run( self, stage7_results: Dict, output_path: Optional[str] = None ) -> Dict: """ 执行 Stage 8 相似度分析(同步版本) Args: stage7_results: Stage 7 结果 output_path: 输出路径(可选) Returns: Stage 8 结果 """ return asyncio.run(self.run_async(stage7_results, output_path)) def test_stage8_analyzer(): """测试 Stage 8 分析器""" # 读取 Stage 7 结果 stage7_path = "output_v2/stage7_with_deconstruction.json" if not os.path.exists(stage7_path): print(f"Stage 7 结果不存在: {stage7_path}") return with open(stage7_path, 'r', encoding='utf-8') as f: stage7_results = json.load(f) # 创建分析器 analyzer = Stage8SimilarityAnalyzer( weight_embedding=0.5, weight_semantic=0.5, max_workers=3, min_similarity=0.3, target_features=["墨镜"] ) # 运行分析 stage8_results = analyzer.run(stage7_results) print(f"\n处理了 {stage8_results['metadata']['overall_statistics']['total_notes']} 个帖子") print(f"提取了 {stage8_results['metadata']['overall_statistics']['total_features_extracted']} 个特征") print(f"平均最高相似度: {stage8_results['metadata']['overall_statistics']['avg_max_similarity']}") if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) test_stage8_analyzer()