| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Stage 7 分析器
- 对 Stage 6 中完全匹配的帖子进行深度解构分析
- """
- import os
- import json
- import time
- import logging
- from datetime import datetime
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import Dict, List, Any, Optional
- from stage7_api_client import DeconstructionAPIClient, map_note_to_api_format
- try:
- from tqdm import tqdm
- TQDM_AVAILABLE = True
- except ImportError:
- TQDM_AVAILABLE = False
- logger.warning("tqdm 未安装,将使用简单进度显示。安装命令: pip install tqdm")
- logger = logging.getLogger(__name__)
- class Stage7DeconstructionAnalyzer:
- """Stage 7: 完全匹配帖子的深度解构分析"""
- def __init__(
- self,
- api_url: str = "http://192.168.245.150:7000/what/analysis/single",
- max_workers: int = 5,
- max_notes: Optional[int] = None,
- min_score: float = 8.0,
- skip_count: int = 0,
- sort_by: str = 'score',
- timeout: int = 30,
- max_retries: int = 3,
- output_dir: str = "output_v2",
- enable_image_download: bool = True,
- image_server_url: str = "http://localhost:8765",
- image_download_dir: str = "downloaded_images",
- target_features: Optional[List[str]] = None
- ):
- """
- 初始化 Stage 7 分析器
- Args:
- api_url: API 地址
- max_workers: 并发数
- max_notes: 最多处理多少个帖子(None = 不限制)
- min_score: 最低分数阈值(只处理 >= 此分数的帖子)
- skip_count: 跳过前 N 个
- sort_by: 排序方式 ('score' | 'time' | 'engagement')
- timeout: API 超时时间
- max_retries: API 最大重试次数
- output_dir: 输出目录
- enable_image_download: 是否启用图片下载(下载小红书图片并转换为本地URL)
- image_server_url: 图片服务器URL
- image_download_dir: 图片下载目录
- target_features: 指定要处理的原始特征列表(None = 处理所有特征)
- """
- self.max_workers = max_workers
- self.max_notes = max_notes
- self.min_score = min_score
- self.skip_count = skip_count
- self.sort_by = sort_by
- self.output_dir = output_dir
- self.enable_image_download = enable_image_download
- self.target_features = target_features # 新增:目标特征过滤
- # 初始化 API 客户端
- self.api_client = DeconstructionAPIClient(
- api_url=api_url,
- timeout=timeout,
- max_retries=max_retries
- )
- # 图片下载功能已弃用,直接使用原始图片URL
- # 保留参数以向后兼容,但不再使用
- if self.enable_image_download:
- logger.warning(" 注意: enable_image_download 参数已弃用,将直接使用原始图片URL")
- def extract_matched_notes_from_stage6(
- self,
- stage6_results: List[Dict]
- ) -> List[Dict]:
- """
- 从 Stage 6 结果中提取所有完全匹配的帖子
- Args:
- stage6_results: Stage 6 结果(列表)
- Returns:
- 完全匹配的帖子列表
- """
- matched_notes = []
- # Stage 6 结果是一个列表,每个元素是一个 feature_group
- for feature_group in stage6_results:
- original_feature = feature_group.get('原始特征名称', '')
- # 如果指定了 target_features,只处理指定的特征
- if self.target_features and original_feature not in self.target_features:
- continue
- # 遍历 组合评估结果_分组(这一层包含了 top10_searches)
- for combo_group in feature_group.get('组合评估结果_分组', []):
- # top10_searches 包含所有搜索结果
- for search_item in combo_group.get('top10_searches', []):
- search_word = search_item.get('search_word', '')
- source_word = search_item.get('source_word', '')
- evaluation = search_item.get('evaluation_with_filter', {})
- # 检查是否有搜索结果
- if 'search_result' not in search_item:
- continue
- notes = search_item['search_result'].get('data', {}).get('data', [])
- # 遍历评估结果
- for note_eval in evaluation.get('notes_evaluation', []):
- score = note_eval.get('综合得分', 0)
- # 只处理完全匹配的(分数 >= min_score)
- if score >= self.min_score:
- note_index = note_eval.get('note_index', -1)
- if 0 <= note_index < len(notes):
- note = notes[note_index]
- matched_notes.append({
- 'note': note,
- 'note_card': note.get('note_card', {}),
- 'evaluation': note_eval,
- 'search_word': search_word,
- 'source_word': source_word,
- 'original_feature': original_feature,
- 'top3_persona_features': feature_group.get('top3匹配信息', [])
- })
- return matched_notes
- def sort_matched_notes(
- self,
- matched_notes: List[Dict]
- ) -> List[Dict]:
- """
- 对完全匹配的帖子进行排序
- Args:
- matched_notes: 匹配的帖子列表
- Returns:
- 排序后的帖子列表
- """
- if self.sort_by == 'score':
- # 按评分降序(优先处理高分帖子)
- return sorted(
- matched_notes,
- key=lambda x: x['evaluation'].get('综合得分', 0),
- reverse=True
- )
- elif self.sort_by == 'time':
- # 按时间降序(优先处理最新帖子)
- return sorted(
- matched_notes,
- key=lambda x: x['note_card'].get('publish_timestamp', 0),
- reverse=True
- )
- elif self.sort_by == 'engagement':
- # 按互动量降序(点赞+收藏+评论)
- def calc_engagement(note_data):
- interact = note_data['note_card'].get('interact_info', {})
- return (
- interact.get('liked_count', 0) +
- interact.get('collected_count', 0) +
- interact.get('comment_count', 0)
- )
- return sorted(
- matched_notes,
- key=calc_engagement,
- reverse=True
- )
- return matched_notes
- def _save_intermediate_results(
- self,
- results: List[Dict],
- output_path: str,
- processed_count: int,
- total_count: int,
- start_time: float
- ):
- """
- 保存中间结果
- Args:
- results: 当前结果列表
- output_path: 输出路径
- processed_count: 已处理数量
- total_count: 总数量
- start_time: 开始时间
- """
- # 构建中间结果文件路径
- base_dir = os.path.dirname(output_path) or 'output_v2'
- base_name = os.path.basename(output_path)
- name_without_ext = os.path.splitext(base_name)[0]
- intermediate_path = os.path.join(
- base_dir,
- f"{name_without_ext}_partial_{processed_count}of{total_count}.json"
- )
- # 统计成功失败数
- success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
- failed_count = len(results) - success_count
- # 构建中间结果
- intermediate_result = {
- 'metadata': {
- 'stage': 'stage7_partial',
- 'description': f'部分结果({processed_count}/{total_count})',
- 'processed_notes': len(results),
- 'success_count': success_count,
- 'failed_count': failed_count,
- 'saved_at': datetime.now().isoformat(),
- 'processing_time_seconds': round(time.time() - start_time, 2)
- },
- 'results': results
- }
- # 保存
- os.makedirs(base_dir, exist_ok=True)
- with open(intermediate_path, 'w', encoding='utf-8') as f:
- json.dump(intermediate_result, f, ensure_ascii=False, indent=2)
- logger.info(f" 已保存中间结果: {intermediate_path} ({processed_count}/{total_count})")
- def process_single_note(
- self,
- matched_note_data: Dict,
- index: int,
- total: int
- ) -> Dict:
- """
- 处理单个帖子的解构分析
- Args:
- matched_note_data: 匹配的帖子数据
- index: 当前索引(用于日志)
- total: 总数(用于日志)
- Returns:
- 处理结果
- """
- note = matched_note_data['note']
- note_card = matched_note_data['note_card']
- evaluation = matched_note_data['evaluation']
- search_word = matched_note_data['search_word']
- original_feature = matched_note_data['original_feature']
- note_id = note.get('id', '')
- note_title = note_card.get('display_title', '')[:30] # 前30个字符
- logger.info(f"[{index}/{total}] 解构分析: {note_id}")
- logger.info(f" 标题: {note_title}...")
- logger.info(f" 搜索词: {search_word}")
- logger.info(f" 原始特征: {original_feature}")
- # 获取关键匹配点(用于保存到结果中)
- key_points = evaluation.get('关键匹配点', [])
- # 获取 top3 人设特征
- top3_features = matched_note_data.get('top3_persona_features', [])
- # 构建 start_points - 只使用 top3 的第一个人设特征名称
- start_points = []
- if top3_features:
- first_feature = top3_features[0].get('人设特征名称', '')
- if first_feature:
- start_points = [first_feature]
- logger.info(f" start_points: {start_points}")
- if top3_features:
- logger.info(f" top3人设特征: {[f.get('人设特征名称', '') for f in top3_features[:3]]}")
- # 直接使用原始图片URL,不做任何处理
- original_images = note_card.get('image_list', [])
- if original_images:
- logger.info(f" 图片数量: {len(original_images)}")
- # 映射数据为 API 格式(直接使用原始图片URL)
- api_payload = map_note_to_api_format(
- note=note,
- note_card=note_card,
- evaluation=evaluation,
- search_word=search_word,
- original_feature=original_feature,
- start_points=start_points,
- processed_image_urls=None # 不传递处理后的URL,使用原始URL
- )
- # 调用 API
- start_time = time.time()
- api_response = self.api_client.call_api(api_payload)
- processing_time = (time.time() - start_time) * 1000 # 毫秒
- # 构建结果
- result = {
- 'note_id': note_id,
- 'search_word': search_word,
- 'original_feature': original_feature,
- 'source_word': matched_note_data['source_word'],
- 'evaluation_score': evaluation.get('综合得分', 0),
- 'evaluation_type': evaluation.get('匹配类型', ''),
- 'evaluation_confidence': evaluation.get('置信度', ''),
- 'key_matching_points': key_points,
- 'note_data': {
- 'title': note_card.get('display_title', ''),
- 'author': note_card.get('user', {}).get('nick_name', ''),
- 'link': f"https://www.xiaohongshu.com/explore/{note_id}"
- },
- 'api_request': api_payload,
- 'api_response': api_response,
- 'processed_at': datetime.now().isoformat(),
- 'processing_time_ms': round(processing_time, 2)
- }
- if api_response['status'] == 'success':
- logger.info(f" ✓ 成功 ({processing_time:.0f}ms)")
- else:
- logger.error(f" ✗ 失败: {api_response['error']}")
- return result
- def run(
- self,
- stage6_results: Dict,
- output_path: Optional[str] = None
- ) -> Dict:
- """
- 执行 Stage 7 解构分析
- Args:
- stage6_results: Stage 6 结果
- output_path: 输出路径(可选)
- Returns:
- Stage 7 结果
- """
- logger.info("\n" + "=" * 60)
- logger.info("Stage 7: 完全匹配帖子的深度解构分析")
- logger.info("=" * 60)
- # 打印配置参数
- logger.info("配置参数:")
- logger.info(f" API 地址: {self.api_client.api_url}")
- if self.target_features:
- logger.info(f" 目标特征: {', '.join(self.target_features)}")
- else:
- logger.info(f" 目标特征: 全部")
- logger.info(f" 最低分数阈值: {self.min_score}")
- logger.info(f" 并发数: {self.max_workers}")
- logger.info(f" 最多处理帖子数: {self.max_notes if self.max_notes else '不限制'}")
- logger.info(f" 跳过前 N 个: {self.skip_count}")
- logger.info(f" 排序方式: {self.sort_by}")
- logger.info(f" API 超时: {self.api_client.timeout}秒")
- logger.info(f" 最大重试次数: {self.api_client.max_retries}")
- # 默认输出路径
- if output_path is None:
- output_path = os.path.join(self.output_dir, "stage7_with_deconstruction.json")
- # 1. 提取完全匹配的帖子
- matched_notes = self.extract_matched_notes_from_stage6(stage6_results)
- total_matched = len(matched_notes)
- logger.info(f" 完全匹配帖子总数: {total_matched} (分数 >= {self.min_score})")
- if total_matched == 0:
- logger.warning(" 没有找到完全匹配的帖子")
- return {
- 'metadata': {
- 'stage': 'stage7',
- 'total_matched_notes': 0,
- 'processed_notes': 0
- },
- 'results': []
- }
- # 2. 排序
- matched_notes = self.sort_matched_notes(matched_notes)
- logger.info(f" 排序方式: {self.sort_by}")
- # 3. 跳过前 N 个
- if self.skip_count > 0:
- logger.info(f" 跳过前 {self.skip_count} 个")
- matched_notes = matched_notes[self.skip_count:]
- # 4. 限制数量
- if self.max_notes is not None and len(matched_notes) > self.max_notes:
- logger.info(f" 数量限制: {self.max_notes}")
- matched_notes = matched_notes[:self.max_notes]
- to_process = len(matched_notes)
- logger.info(f" 实际处理: {to_process} 个")
- logger.info(f" 并发数: {self.max_workers}")
- logger.info(f" API: {self.api_client.api_url}")
- if to_process == 0:
- logger.warning(" 没有需要处理的帖子")
- return {
- 'metadata': {
- 'stage': 'stage7',
- 'total_matched_notes': total_matched,
- 'processed_notes': 0,
- 'skipped_notes': self.skip_count
- },
- 'results': []
- }
- # 5. 并行处理
- results = []
- start_time = time.time()
- save_interval = 10 # 每处理10个帖子保存一次
- with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
- futures = []
- for idx, note_data in enumerate(matched_notes, start=1):
- future = executor.submit(
- self.process_single_note,
- note_data,
- idx,
- to_process
- )
- futures.append(future)
- # 收集结果(带进度显示)
- if TQDM_AVAILABLE:
- # 使用 tqdm 进度条
- logger.info(" 使用进度条显示...")
- iterator = tqdm(
- as_completed(futures),
- total=len(futures),
- desc=" 处理进度",
- unit="帖子",
- ncols=100
- )
- else:
- # 简单进度显示
- iterator = as_completed(futures)
- processed_count = 0
- for future in iterator:
- try:
- result = future.result()
- results.append(result)
- processed_count += 1
- # 增量保存(每处理一定数量保存一次)
- if processed_count % save_interval == 0:
- self._save_intermediate_results(
- results,
- output_path,
- processed_count,
- to_process,
- start_time
- )
- # 简单进度显示(如果没有 tqdm)
- if not TQDM_AVAILABLE and processed_count % 5 == 0:
- logger.info(f" 进度: {processed_count}/{to_process}")
- except Exception as e:
- logger.error(f" 处理失败: {e}")
- processing_time = time.time() - start_time
- # 6. 统计
- success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
- failed_count = len(results) - success_count
- logger.info(f"\n 总耗时: {processing_time:.1f}s")
- logger.info(f" 成功: {success_count}")
- logger.info(f" 失败: {failed_count}")
- # 7. 构建最终结果
- final_result = {
- 'metadata': {
- 'stage': 'stage7',
- 'description': '完全匹配帖子的深度解构分析',
- 'target_features': self.target_features if self.target_features else '全部',
- 'total_matched_notes': total_matched,
- 'processed_notes': len(results),
- 'skipped_notes': self.skip_count,
- 'max_notes_limit': self.max_notes,
- 'sort_by': self.sort_by,
- 'success_count': success_count,
- 'failed_count': failed_count,
- 'api_url': self.api_client.api_url,
- 'min_score_threshold': self.min_score,
- 'created_at': datetime.now().isoformat(),
- 'processing_time_seconds': round(processing_time, 2)
- },
- 'results': results
- }
- # 8. 保存结果
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(final_result, f, ensure_ascii=False, indent=2)
- logger.info(f" 结果已保存: {output_path}")
- return final_result
- def test_stage7_analyzer():
- """测试 Stage 7 分析器"""
- # 读取 Stage 6 结果
- stage6_path = "output_v2/stage6_with_evaluations.json"
- if not os.path.exists(stage6_path):
- print(f"Stage 6 结果不存在: {stage6_path}")
- return
- with open(stage6_path, 'r', encoding='utf-8') as f:
- stage6_results = json.load(f)
- # 创建分析器
- analyzer = Stage7DeconstructionAnalyzer(
- max_workers=3,
- max_notes=5, # 只测试 5 个
- skip_count=0,
- sort_by='score'
- )
- # 运行分析
- stage7_results = analyzer.run(stage6_results)
- print(f"\n处理了 {stage7_results['metadata']['processed_notes']} 个帖子")
- print(f"成功: {stage7_results['metadata']['success_count']}")
- print(f"失败: {stage7_results['metadata']['failed_count']}")
- if __name__ == '__main__':
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- test_stage7_analyzer()
|