#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Stage 7 分析器 对 Stage 6 中完全匹配的帖子进行深度解构分析 """ import os import json import time import logging from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Any, Optional from stage7_api_client import DeconstructionAPIClient, map_note_to_api_format try: from tqdm import tqdm TQDM_AVAILABLE = True except ImportError: TQDM_AVAILABLE = False logger.warning("tqdm 未安装,将使用简单进度显示。安装命令: pip install tqdm") logger = logging.getLogger(__name__) class Stage7DeconstructionAnalyzer: """Stage 7: 完全匹配帖子的深度解构分析""" def __init__( self, api_url: str = "http://192.168.245.150:7000/what/analysis/single", max_workers: int = 5, max_notes: Optional[int] = None, min_score: float = 8.0, skip_count: int = 0, sort_by: str = 'score', timeout: int = 30, max_retries: int = 3, output_dir: str = "output_v2", enable_image_download: bool = True, image_server_url: str = "http://localhost:8765", image_download_dir: str = "downloaded_images", target_features: Optional[List[str]] = None ): """ 初始化 Stage 7 分析器 Args: api_url: API 地址 max_workers: 并发数 max_notes: 最多处理多少个帖子(None = 不限制) min_score: 最低分数阈值(只处理 >= 此分数的帖子) skip_count: 跳过前 N 个 sort_by: 排序方式 ('score' | 'time' | 'engagement') timeout: API 超时时间 max_retries: API 最大重试次数 output_dir: 输出目录 enable_image_download: 是否启用图片下载(下载小红书图片并转换为本地URL) image_server_url: 图片服务器URL image_download_dir: 图片下载目录 target_features: 指定要处理的原始特征列表(None = 处理所有特征) """ self.max_workers = max_workers self.max_notes = max_notes self.min_score = min_score self.skip_count = skip_count self.sort_by = sort_by self.output_dir = output_dir self.enable_image_download = enable_image_download self.target_features = target_features # 新增:目标特征过滤 # 初始化 API 客户端 self.api_client = DeconstructionAPIClient( api_url=api_url, timeout=timeout, max_retries=max_retries ) # 图片下载功能已弃用,直接使用原始图片URL # 保留参数以向后兼容,但不再使用 if self.enable_image_download: logger.warning(" 注意: enable_image_download 参数已弃用,将直接使用原始图片URL") def extract_matched_notes_from_stage6( self, stage6_results: List[Dict] ) -> List[Dict]: """ 从 Stage 6 结果中提取所有完全匹配的帖子 Args: stage6_results: Stage 6 结果(列表) Returns: 完全匹配的帖子列表 """ matched_notes = [] # Stage 6 结果是一个列表,每个元素是一个 feature_group for feature_group in stage6_results: original_feature = feature_group.get('原始特征名称', '') # 如果指定了 target_features,只处理指定的特征 if self.target_features and original_feature not in self.target_features: continue # 遍历 组合评估结果_分组(这一层包含了 top10_searches) for combo_group in feature_group.get('组合评估结果_分组', []): # top10_searches 包含所有搜索结果 for search_item in combo_group.get('top10_searches', []): search_word = search_item.get('search_word', '') source_word = search_item.get('source_word', '') evaluation = search_item.get('evaluation_with_filter', {}) # 检查是否有搜索结果 if 'search_result' not in search_item: continue notes = search_item['search_result'].get('data', {}).get('data', []) # 遍历评估结果 for note_eval in evaluation.get('notes_evaluation', []): score = note_eval.get('综合得分', 0) # 只处理完全匹配的(分数 >= min_score) if score >= self.min_score: note_index = note_eval.get('note_index', -1) if 0 <= note_index < len(notes): note = notes[note_index] matched_notes.append({ 'note': note, 'note_card': note.get('note_card', {}), 'evaluation': note_eval, 'search_word': search_word, 'source_word': source_word, 'original_feature': original_feature, 'top3_persona_features': feature_group.get('top3匹配信息', []) }) return matched_notes def sort_matched_notes( self, matched_notes: List[Dict] ) -> List[Dict]: """ 对完全匹配的帖子进行排序 Args: matched_notes: 匹配的帖子列表 Returns: 排序后的帖子列表 """ if self.sort_by == 'score': # 按评分降序(优先处理高分帖子) return sorted( matched_notes, key=lambda x: x['evaluation'].get('综合得分', 0), reverse=True ) elif self.sort_by == 'time': # 按时间降序(优先处理最新帖子) return sorted( matched_notes, key=lambda x: x['note_card'].get('publish_timestamp', 0), reverse=True ) elif self.sort_by == 'engagement': # 按互动量降序(点赞+收藏+评论) def calc_engagement(note_data): interact = note_data['note_card'].get('interact_info', {}) return ( interact.get('liked_count', 0) + interact.get('collected_count', 0) + interact.get('comment_count', 0) ) return sorted( matched_notes, key=calc_engagement, reverse=True ) return matched_notes def _save_intermediate_results( self, results: List[Dict], output_path: str, processed_count: int, total_count: int, start_time: float ): """ 保存中间结果 Args: results: 当前结果列表 output_path: 输出路径 processed_count: 已处理数量 total_count: 总数量 start_time: 开始时间 """ # 构建中间结果文件路径 base_dir = os.path.dirname(output_path) or 'output_v2' base_name = os.path.basename(output_path) name_without_ext = os.path.splitext(base_name)[0] intermediate_path = os.path.join( base_dir, f"{name_without_ext}_partial_{processed_count}of{total_count}.json" ) # 统计成功失败数 success_count = sum(1 for r in results if r['api_response']['status'] == 'success') failed_count = len(results) - success_count # 构建中间结果 intermediate_result = { 'metadata': { 'stage': 'stage7_partial', 'description': f'部分结果({processed_count}/{total_count})', 'processed_notes': len(results), 'success_count': success_count, 'failed_count': failed_count, 'saved_at': datetime.now().isoformat(), 'processing_time_seconds': round(time.time() - start_time, 2) }, 'results': results } # 保存 os.makedirs(base_dir, exist_ok=True) with open(intermediate_path, 'w', encoding='utf-8') as f: json.dump(intermediate_result, f, ensure_ascii=False, indent=2) logger.info(f" 已保存中间结果: {intermediate_path} ({processed_count}/{total_count})") def process_single_note( self, matched_note_data: Dict, index: int, total: int ) -> Dict: """ 处理单个帖子的解构分析 Args: matched_note_data: 匹配的帖子数据 index: 当前索引(用于日志) total: 总数(用于日志) Returns: 处理结果 """ note = matched_note_data['note'] note_card = matched_note_data['note_card'] evaluation = matched_note_data['evaluation'] search_word = matched_note_data['search_word'] original_feature = matched_note_data['original_feature'] note_id = note.get('id', '') note_title = note_card.get('display_title', '')[:30] # 前30个字符 logger.info(f"[{index}/{total}] 解构分析: {note_id}") logger.info(f" 标题: {note_title}...") logger.info(f" 搜索词: {search_word}") logger.info(f" 原始特征: {original_feature}") # 获取关键匹配点(用于保存到结果中) key_points = evaluation.get('关键匹配点', []) # 获取 top3 人设特征 top3_features = matched_note_data.get('top3_persona_features', []) # 构建 start_points - 只使用 top3 的第一个人设特征名称 start_points = [] if top3_features: first_feature = top3_features[0].get('人设特征名称', '') if first_feature: start_points = [first_feature] logger.info(f" start_points: {start_points}") if top3_features: logger.info(f" top3人设特征: {[f.get('人设特征名称', '') for f in top3_features[:3]]}") # 直接使用原始图片URL,不做任何处理 original_images = note_card.get('image_list', []) if original_images: logger.info(f" 图片数量: {len(original_images)}") # 映射数据为 API 格式(直接使用原始图片URL) api_payload = map_note_to_api_format( note=note, note_card=note_card, evaluation=evaluation, search_word=search_word, original_feature=original_feature, start_points=start_points, processed_image_urls=None # 不传递处理后的URL,使用原始URL ) # 调用 API start_time = time.time() api_response = self.api_client.call_api(api_payload) processing_time = (time.time() - start_time) * 1000 # 毫秒 # 构建结果 result = { 'note_id': note_id, 'search_word': search_word, 'original_feature': original_feature, 'source_word': matched_note_data['source_word'], 'evaluation_score': evaluation.get('综合得分', 0), 'evaluation_type': evaluation.get('匹配类型', ''), 'evaluation_confidence': evaluation.get('置信度', ''), 'key_matching_points': key_points, 'note_data': { 'title': note_card.get('display_title', ''), 'author': note_card.get('user', {}).get('nick_name', ''), 'link': f"https://www.xiaohongshu.com/explore/{note_id}" }, 'api_request': api_payload, 'api_response': api_response, 'processed_at': datetime.now().isoformat(), 'processing_time_ms': round(processing_time, 2) } if api_response['status'] == 'success': logger.info(f" ✓ 成功 ({processing_time:.0f}ms)") else: logger.error(f" ✗ 失败: {api_response['error']}") return result def run( self, stage6_results: Dict, output_path: Optional[str] = None ) -> Dict: """ 执行 Stage 7 解构分析 Args: stage6_results: Stage 6 结果 output_path: 输出路径(可选) Returns: Stage 7 结果 """ logger.info("\n" + "=" * 60) logger.info("Stage 7: 完全匹配帖子的深度解构分析") logger.info("=" * 60) # 打印配置参数 logger.info("配置参数:") logger.info(f" API 地址: {self.api_client.api_url}") if self.target_features: logger.info(f" 目标特征: {', '.join(self.target_features)}") else: logger.info(f" 目标特征: 全部") logger.info(f" 最低分数阈值: {self.min_score}") logger.info(f" 并发数: {self.max_workers}") logger.info(f" 最多处理帖子数: {self.max_notes if self.max_notes else '不限制'}") logger.info(f" 跳过前 N 个: {self.skip_count}") logger.info(f" 排序方式: {self.sort_by}") logger.info(f" API 超时: {self.api_client.timeout}秒") logger.info(f" 最大重试次数: {self.api_client.max_retries}") # 默认输出路径 if output_path is None: output_path = os.path.join(self.output_dir, "stage7_with_deconstruction.json") # 1. 提取完全匹配的帖子 matched_notes = self.extract_matched_notes_from_stage6(stage6_results) total_matched = len(matched_notes) logger.info(f" 完全匹配帖子总数: {total_matched} (分数 >= {self.min_score})") if total_matched == 0: logger.warning(" 没有找到完全匹配的帖子") return { 'metadata': { 'stage': 'stage7', 'total_matched_notes': 0, 'processed_notes': 0 }, 'results': [] } # 2. 排序 matched_notes = self.sort_matched_notes(matched_notes) logger.info(f" 排序方式: {self.sort_by}") # 3. 跳过前 N 个 if self.skip_count > 0: logger.info(f" 跳过前 {self.skip_count} 个") matched_notes = matched_notes[self.skip_count:] # 4. 限制数量 if self.max_notes is not None and len(matched_notes) > self.max_notes: logger.info(f" 数量限制: {self.max_notes}") matched_notes = matched_notes[:self.max_notes] to_process = len(matched_notes) logger.info(f" 实际处理: {to_process} 个") logger.info(f" 并发数: {self.max_workers}") logger.info(f" API: {self.api_client.api_url}") if to_process == 0: logger.warning(" 没有需要处理的帖子") return { 'metadata': { 'stage': 'stage7', 'total_matched_notes': total_matched, 'processed_notes': 0, 'skipped_notes': self.skip_count }, 'results': [] } # 5. 并行处理 results = [] start_time = time.time() save_interval = 10 # 每处理10个帖子保存一次 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for idx, note_data in enumerate(matched_notes, start=1): future = executor.submit( self.process_single_note, note_data, idx, to_process ) futures.append(future) # 收集结果(带进度显示) if TQDM_AVAILABLE: # 使用 tqdm 进度条 logger.info(" 使用进度条显示...") iterator = tqdm( as_completed(futures), total=len(futures), desc=" 处理进度", unit="帖子", ncols=100 ) else: # 简单进度显示 iterator = as_completed(futures) processed_count = 0 for future in iterator: try: result = future.result() results.append(result) processed_count += 1 # 增量保存(每处理一定数量保存一次) if processed_count % save_interval == 0: self._save_intermediate_results( results, output_path, processed_count, to_process, start_time ) # 简单进度显示(如果没有 tqdm) if not TQDM_AVAILABLE and processed_count % 5 == 0: logger.info(f" 进度: {processed_count}/{to_process}") except Exception as e: logger.error(f" 处理失败: {e}") processing_time = time.time() - start_time # 6. 统计 success_count = sum(1 for r in results if r['api_response']['status'] == 'success') failed_count = len(results) - success_count logger.info(f"\n 总耗时: {processing_time:.1f}s") logger.info(f" 成功: {success_count}") logger.info(f" 失败: {failed_count}") # 7. 构建最终结果 final_result = { 'metadata': { 'stage': 'stage7', 'description': '完全匹配帖子的深度解构分析', 'target_features': self.target_features if self.target_features else '全部', 'total_matched_notes': total_matched, 'processed_notes': len(results), 'skipped_notes': self.skip_count, 'max_notes_limit': self.max_notes, 'sort_by': self.sort_by, 'success_count': success_count, 'failed_count': failed_count, 'api_url': self.api_client.api_url, 'min_score_threshold': self.min_score, 'created_at': datetime.now().isoformat(), 'processing_time_seconds': round(processing_time, 2) }, 'results': results } # 8. 保存结果 os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(final_result, f, ensure_ascii=False, indent=2) logger.info(f" 结果已保存: {output_path}") return final_result def test_stage7_analyzer(): """测试 Stage 7 分析器""" # 读取 Stage 6 结果 stage6_path = "output_v2/stage6_with_evaluations.json" if not os.path.exists(stage6_path): print(f"Stage 6 结果不存在: {stage6_path}") return with open(stage6_path, 'r', encoding='utf-8') as f: stage6_results = json.load(f) # 创建分析器 analyzer = Stage7DeconstructionAnalyzer( max_workers=3, max_notes=5, # 只测试 5 个 skip_count=0, sort_by='score' ) # 运行分析 stage7_results = analyzer.run(stage6_results) print(f"\n处理了 {stage7_results['metadata']['processed_notes']} 个帖子") print(f"成功: {stage7_results['metadata']['success_count']}") print(f"失败: {stage7_results['metadata']['failed_count']}") if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) test_stage7_analyzer()