stage7_analyzer.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Stage 7 分析器
  5. 对 Stage 6 中完全匹配的帖子进行深度解构分析
  6. """
  7. import os
  8. import json
  9. import time
  10. import logging
  11. from datetime import datetime
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. from typing import Dict, List, Any, Optional
  14. from stage7_api_client import DeconstructionAPIClient, map_note_to_api_format
  15. try:
  16. from tqdm import tqdm
  17. TQDM_AVAILABLE = True
  18. except ImportError:
  19. TQDM_AVAILABLE = False
  20. logger.warning("tqdm 未安装,将使用简单进度显示。安装命令: pip install tqdm")
  21. logger = logging.getLogger(__name__)
  22. class Stage7DeconstructionAnalyzer:
  23. """Stage 7: 完全匹配帖子的深度解构分析"""
  24. def __init__(
  25. self,
  26. api_url: str = "http://192.168.245.150:7000/what/analysis/single",
  27. max_workers: int = 5,
  28. max_notes: Optional[int] = None,
  29. min_score: float = 8.0,
  30. skip_count: int = 0,
  31. sort_by: str = 'score',
  32. timeout: int = 800,
  33. max_retries: int = 3,
  34. output_dir: str = "output_v2",
  35. enable_image_download: bool = True,
  36. image_server_url: str = "http://localhost:8765",
  37. image_download_dir: str = "downloaded_images",
  38. target_features: Optional[List[str]] = None
  39. ):
  40. """
  41. 初始化 Stage 7 分析器
  42. Args:
  43. api_url: API 地址
  44. max_workers: 并发数
  45. max_notes: 最多处理多少个帖子(None = 不限制)
  46. min_score: 最低分数阈值(只处理 >= 此分数的帖子)
  47. skip_count: 跳过前 N 个
  48. sort_by: 排序方式 ('none' | 'score' | 'time' | 'engagement')
  49. - 'none': 不排序,保持Stage6数据原始顺序
  50. - 'score': 按评分降序
  51. - 'time': 按时间降序
  52. - 'engagement': 按互动量降序
  53. timeout: API 超时时间
  54. max_retries: API 最大重试次数
  55. output_dir: 输出目录
  56. enable_image_download: 是否启用图片下载(下载小红书图片并转换为本地URL)
  57. image_server_url: 图片服务器URL
  58. image_download_dir: 图片下载目录
  59. target_features: 指定要处理的原始特征列表(None = 处理所有特征)
  60. """
  61. self.max_workers = max_workers
  62. self.max_notes = max_notes
  63. self.min_score = min_score
  64. self.skip_count = skip_count
  65. self.sort_by = sort_by
  66. self.output_dir = output_dir
  67. self.enable_image_download = enable_image_download
  68. self.target_features = target_features # 新增:目标特征过滤
  69. # 初始化 API 客户端
  70. self.api_client = DeconstructionAPIClient(
  71. api_url=api_url,
  72. timeout=timeout,
  73. max_retries=max_retries
  74. )
  75. # 图片下载功能已弃用,直接使用原始图片URL
  76. # 保留参数以向后兼容,但不再使用
  77. if self.enable_image_download:
  78. logger.warning(" 注意: enable_image_download 参数已弃用,将直接使用原始图片URL")
  79. def extract_matched_notes_from_stage6(
  80. self,
  81. stage6_results: List[Dict]
  82. ) -> List[Dict]:
  83. """
  84. 从 Stage 6 结果中提取所有完全匹配的帖子
  85. Args:
  86. stage6_results: Stage 6 结果(列表)
  87. Returns:
  88. 完全匹配的帖子列表
  89. """
  90. matched_notes = []
  91. # Stage 6 结果是一个列表,每个元素是一个 feature_group
  92. for feature_group in stage6_results:
  93. original_feature = feature_group.get('原始特征名称', '')
  94. # 如果指定了 target_features,只处理指定的特征
  95. if self.target_features and original_feature not in self.target_features:
  96. continue
  97. # 遍历 组合评估结果_分组(这一层包含了 top10_searches)
  98. for combo_group in feature_group.get('组合评估结果_分组', []):
  99. # top10_searches 包含所有搜索结果
  100. for search_item in combo_group.get('top10_searches', []):
  101. search_word = search_item.get('search_word', '')
  102. source_word = search_item.get('source_word', '')
  103. evaluation = search_item.get('evaluation_with_filter', {})
  104. # 检查是否有搜索结果
  105. if 'search_result' not in search_item:
  106. continue
  107. notes = search_item['search_result'].get('data', {}).get('data', [])
  108. # 遍历评估结果
  109. for note_eval in evaluation.get('notes_evaluation', []):
  110. score = note_eval.get('综合得分', 0)
  111. # 只处理完全匹配的(分数 >= min_score)
  112. if score >= self.min_score:
  113. note_index = note_eval.get('note_index', -1)
  114. if 0 <= note_index < len(notes):
  115. note = notes[note_index]
  116. matched_notes.append({
  117. 'note': note,
  118. 'note_card': note.get('note_card', {}),
  119. 'evaluation': note_eval,
  120. 'search_word': search_word,
  121. 'source_word': source_word,
  122. 'original_feature': original_feature,
  123. 'top3_persona_features': feature_group.get('top3匹配信息', [])
  124. })
  125. return matched_notes
  126. def sort_matched_notes(
  127. self,
  128. matched_notes: List[Dict]
  129. ) -> List[Dict]:
  130. """
  131. 对完全匹配的帖子进行排序
  132. Args:
  133. matched_notes: 匹配的帖子列表
  134. Returns:
  135. 排序后的帖子列表
  136. """
  137. if self.sort_by == 'none':
  138. # 不排序,保持数据原始顺序
  139. return matched_notes
  140. elif self.sort_by == 'score':
  141. # 按评分降序(优先处理高分帖子)
  142. return sorted(
  143. matched_notes,
  144. key=lambda x: x['evaluation'].get('综合得分', 0),
  145. reverse=True
  146. )
  147. elif self.sort_by == 'time':
  148. # 按时间降序(优先处理最新帖子)
  149. return sorted(
  150. matched_notes,
  151. key=lambda x: x['note_card'].get('publish_timestamp', 0),
  152. reverse=True
  153. )
  154. elif self.sort_by == 'engagement':
  155. # 按互动量降序(点赞+收藏+评论)
  156. def calc_engagement(note_data):
  157. interact = note_data['note_card'].get('interact_info', {})
  158. return (
  159. interact.get('liked_count', 0) +
  160. interact.get('collected_count', 0) +
  161. interact.get('comment_count', 0)
  162. )
  163. return sorted(
  164. matched_notes,
  165. key=calc_engagement,
  166. reverse=True
  167. )
  168. return matched_notes
  169. def _save_intermediate_results(
  170. self,
  171. results: List[Dict],
  172. output_path: str,
  173. processed_count: int,
  174. total_count: int,
  175. start_time: float
  176. ):
  177. """
  178. 保存中间结果
  179. Args:
  180. results: 当前结果列表
  181. output_path: 输出路径
  182. processed_count: 已处理数量
  183. total_count: 总数量
  184. start_time: 开始时间
  185. """
  186. # 构建中间结果文件路径
  187. base_dir = os.path.dirname(output_path) or 'output_v2'
  188. base_name = os.path.basename(output_path)
  189. name_without_ext = os.path.splitext(base_name)[0]
  190. intermediate_path = os.path.join(
  191. base_dir,
  192. f"{name_without_ext}_partial_{processed_count}of{total_count}.json"
  193. )
  194. # 统计成功失败数
  195. success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
  196. failed_count = len(results) - success_count
  197. # 构建中间结果
  198. intermediate_result = {
  199. 'metadata': {
  200. 'stage': 'stage7_partial',
  201. 'description': f'部分结果({processed_count}/{total_count})',
  202. 'processed_notes': len(results),
  203. 'success_count': success_count,
  204. 'failed_count': failed_count,
  205. 'saved_at': datetime.now().isoformat(),
  206. 'processing_time_seconds': round(time.time() - start_time, 2)
  207. },
  208. 'results': results
  209. }
  210. # 保存
  211. os.makedirs(base_dir, exist_ok=True)
  212. with open(intermediate_path, 'w', encoding='utf-8') as f:
  213. json.dump(intermediate_result, f, ensure_ascii=False, indent=2)
  214. logger.info(f" 已保存中间结果: {intermediate_path} ({processed_count}/{total_count})")
  215. def process_single_note(
  216. self,
  217. matched_note_data: Dict,
  218. index: int,
  219. total: int
  220. ) -> Dict:
  221. """
  222. 处理单个帖子的解构分析
  223. Args:
  224. matched_note_data: 匹配的帖子数据
  225. index: 当前索引(用于日志)
  226. total: 总数(用于日志)
  227. Returns:
  228. 处理结果
  229. """
  230. note = matched_note_data['note']
  231. note_card = matched_note_data['note_card']
  232. evaluation = matched_note_data['evaluation']
  233. search_word = matched_note_data['search_word']
  234. original_feature = matched_note_data['original_feature']
  235. note_id = note.get('id', '')
  236. note_title = note_card.get('display_title', '')[:30] # 前30个字符
  237. logger.info(f"[{index}/{total}] 解构分析: {note_id}")
  238. logger.info(f" 标题: {note_title}...")
  239. logger.info(f" 搜索词: {search_word}")
  240. logger.info(f" 原始特征: {original_feature}")
  241. # 获取关键匹配点(用于保存到结果中)
  242. key_points = evaluation.get('关键匹配点', [])
  243. # 获取 top3 人设特征
  244. top3_features = matched_note_data.get('top3_persona_features', [])
  245. # 构建 start_points - 只使用 top3 的第一个人设特征名称
  246. start_points = []
  247. if top3_features:
  248. first_feature = top3_features[0].get('人设特征名称', '')
  249. if first_feature:
  250. start_points = [first_feature]
  251. logger.info(f" start_points: {start_points}")
  252. if top3_features:
  253. logger.info(f" top3人设特征: {[f.get('人设特征名称', '') for f in top3_features[:3]]}")
  254. # 直接使用原始图片URL,不做任何处理
  255. original_images = note_card.get('image_list', [])
  256. if original_images:
  257. logger.info(f" 图片数量: {len(original_images)}")
  258. # 映射数据为 API 格式(直接使用原始图片URL)
  259. api_payload = map_note_to_api_format(
  260. note=note,
  261. note_card=note_card,
  262. evaluation=evaluation,
  263. search_word=search_word,
  264. original_feature=original_feature,
  265. start_points=start_points,
  266. processed_image_urls=None # 不传递处理后的URL,使用原始URL
  267. )
  268. # 调用 API
  269. start_time = time.time()
  270. api_response = self.api_client.call_api(api_payload)
  271. processing_time = (time.time() - start_time) * 1000 # 毫秒
  272. # 构建结果
  273. result = {
  274. 'note_id': note_id,
  275. 'search_word': search_word,
  276. 'original_feature': original_feature,
  277. 'source_word': matched_note_data['source_word'],
  278. 'evaluation_score': evaluation.get('综合得分', 0),
  279. 'evaluation_type': evaluation.get('匹配类型', ''),
  280. 'evaluation_confidence': evaluation.get('置信度', ''),
  281. 'key_matching_points': key_points,
  282. 'note_data': {
  283. 'title': note_card.get('display_title', ''),
  284. 'author': note_card.get('user', {}).get('nick_name', ''),
  285. 'link': f"https://www.xiaohongshu.com/explore/{note_id}"
  286. },
  287. 'api_request': api_payload,
  288. 'api_response': api_response,
  289. 'processed_at': datetime.now().isoformat(),
  290. 'processing_time_ms': round(processing_time, 2)
  291. }
  292. if api_response['status'] == 'success':
  293. logger.info(f" ✓ 成功 ({processing_time:.0f}ms)")
  294. else:
  295. logger.error(f" ✗ 失败: {api_response['error']}")
  296. return result
  297. def run(
  298. self,
  299. stage6_results: Dict,
  300. output_path: Optional[str] = None
  301. ) -> Dict:
  302. """
  303. 执行 Stage 7 解构分析
  304. Args:
  305. stage6_results: Stage 6 结果
  306. output_path: 输出路径(可选)
  307. Returns:
  308. Stage 7 结果
  309. """
  310. logger.info("\n" + "=" * 60)
  311. logger.info("Stage 7: 完全匹配帖子的深度解构分析")
  312. logger.info("=" * 60)
  313. # 打印配置参数
  314. logger.info("配置参数:")
  315. logger.info(f" API 地址: {self.api_client.api_url}")
  316. if self.target_features:
  317. logger.info(f" 目标特征: {', '.join(self.target_features)}")
  318. else:
  319. logger.info(f" 目标特征: 全部")
  320. logger.info(f" 最低分数阈值: {self.min_score}")
  321. logger.info(f" 并发数: {self.max_workers}")
  322. logger.info(f" 最多处理帖子数: {self.max_notes if self.max_notes else '不限制'}")
  323. logger.info(f" 跳过前 N 个: {self.skip_count}")
  324. logger.info(f" 排序方式: {self.sort_by}")
  325. logger.info(f" API 超时: {self.api_client.timeout}秒")
  326. logger.info(f" 最大重试次数: {self.api_client.max_retries}")
  327. # 默认输出路径
  328. if output_path is None:
  329. output_path = os.path.join(self.output_dir, "stage7_with_deconstruction.json")
  330. # 1. 提取完全匹配的帖子
  331. matched_notes = self.extract_matched_notes_from_stage6(stage6_results)
  332. total_matched = len(matched_notes)
  333. logger.info(f" 完全匹配帖子总数: {total_matched} (分数 >= {self.min_score})")
  334. if total_matched == 0:
  335. logger.warning(" 没有找到完全匹配的帖子")
  336. return {
  337. 'metadata': {
  338. 'stage': 'stage7',
  339. 'total_matched_notes': 0,
  340. 'processed_notes': 0
  341. },
  342. 'results': []
  343. }
  344. # 2. 排序
  345. matched_notes = self.sort_matched_notes(matched_notes)
  346. logger.info(f" 排序方式: {self.sort_by}")
  347. # 3. 跳过前 N 个
  348. if self.skip_count > 0:
  349. logger.info(f" 跳过前 {self.skip_count} 个")
  350. matched_notes = matched_notes[self.skip_count:]
  351. # 4. 限制数量
  352. if self.max_notes is not None and len(matched_notes) > self.max_notes:
  353. logger.info(f" 数量限制: {self.max_notes}")
  354. matched_notes = matched_notes[:self.max_notes]
  355. to_process = len(matched_notes)
  356. logger.info(f" 实际处理: {to_process} 个")
  357. logger.info(f" 并发数: {self.max_workers}")
  358. logger.info(f" API: {self.api_client.api_url}")
  359. if to_process == 0:
  360. logger.warning(" 没有需要处理的帖子")
  361. return {
  362. 'metadata': {
  363. 'stage': 'stage7',
  364. 'total_matched_notes': total_matched,
  365. 'processed_notes': 0,
  366. 'skipped_notes': self.skip_count
  367. },
  368. 'results': []
  369. }
  370. # 5. 并行处理
  371. results = []
  372. start_time = time.time()
  373. save_interval = 10 # 每处理10个帖子保存一次
  374. with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
  375. futures = []
  376. for idx, note_data in enumerate(matched_notes, start=1):
  377. future = executor.submit(
  378. self.process_single_note,
  379. note_data,
  380. idx,
  381. to_process
  382. )
  383. futures.append(future)
  384. # 收集结果(带进度显示)
  385. if TQDM_AVAILABLE:
  386. # 使用 tqdm 进度条
  387. logger.info(" 使用进度条显示...")
  388. iterator = tqdm(
  389. as_completed(futures),
  390. total=len(futures),
  391. desc=" 处理进度",
  392. unit="帖子",
  393. ncols=100
  394. )
  395. else:
  396. # 简单进度显示
  397. iterator = as_completed(futures)
  398. processed_count = 0
  399. for future in iterator:
  400. try:
  401. result = future.result()
  402. results.append(result)
  403. processed_count += 1
  404. # 增量保存(每处理一定数量保存一次)
  405. if processed_count % save_interval == 0:
  406. self._save_intermediate_results(
  407. results,
  408. output_path,
  409. processed_count,
  410. to_process,
  411. start_time
  412. )
  413. # 简单进度显示(如果没有 tqdm)
  414. if not TQDM_AVAILABLE and processed_count % 5 == 0:
  415. logger.info(f" 进度: {processed_count}/{to_process}")
  416. except Exception as e:
  417. logger.error(f" 处理失败: {e}")
  418. processing_time = time.time() - start_time
  419. # 6. 统计
  420. success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
  421. failed_count = len(results) - success_count
  422. logger.info(f"\n 总耗时: {processing_time:.1f}s")
  423. logger.info(f" 成功: {success_count}")
  424. logger.info(f" 失败: {failed_count}")
  425. # 6.5. 加载已有结果(如果存在)并合并
  426. existing_results = []
  427. if os.path.exists(output_path):
  428. logger.info(f"\n 检测到已有结果文件,准备合并...")
  429. try:
  430. with open(output_path, 'r', encoding='utf-8') as f:
  431. existing_data = json.load(f)
  432. existing_results = existing_data.get('results', [])
  433. logger.info(f" 已有结果数: {len(existing_results)}")
  434. except Exception as e:
  435. logger.warning(f" 加载已有结果失败: {e},将覆盖写入")
  436. existing_results = []
  437. # 6.6. 合并新旧结果(基于 note_id 去重)
  438. if existing_results:
  439. # 建立已有结果的 note_id 索引
  440. existing_note_ids = {r['note_id']: r for r in existing_results}
  441. # 统计更新数量
  442. updated_count = 0
  443. for new_result in results:
  444. if new_result['note_id'] in existing_note_ids:
  445. updated_count += 1
  446. # 用新结果更新已有结果(新结果优先)
  447. existing_note_ids[new_result['note_id']] = new_result
  448. # 合并后的完整结果
  449. merged_results = list(existing_note_ids.values())
  450. logger.info(f" 合并后总结果数: {len(merged_results)}")
  451. logger.info(f" 本次新增: {len(results) - updated_count} 条")
  452. logger.info(f" 本次更新: {updated_count} 条")
  453. else:
  454. merged_results = results
  455. logger.info(f" 无已有结果,直接保存")
  456. # 7. 构建最终结果
  457. final_result = {
  458. 'metadata': {
  459. 'stage': 'stage7',
  460. 'description': '完全匹配帖子的深度解构分析',
  461. 'target_features': self.target_features if self.target_features else '全部',
  462. 'total_matched_notes': total_matched,
  463. 'processed_notes': len(results),
  464. 'total_results_count': len(merged_results),
  465. 'new_results_count': len(results),
  466. 'skipped_notes': self.skip_count,
  467. 'max_notes_limit': self.max_notes,
  468. 'sort_by': self.sort_by,
  469. 'success_count': success_count,
  470. 'failed_count': failed_count,
  471. 'api_url': self.api_client.api_url,
  472. 'min_score_threshold': self.min_score,
  473. 'created_at': datetime.now().isoformat(),
  474. 'processing_time_seconds': round(processing_time, 2)
  475. },
  476. 'results': merged_results
  477. }
  478. # 8. 保存结果
  479. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  480. with open(output_path, 'w', encoding='utf-8') as f:
  481. json.dump(final_result, f, ensure_ascii=False, indent=2)
  482. logger.info(f" 结果已保存: {output_path}")
  483. return final_result
  484. def test_stage7_analyzer():
  485. """测试 Stage 7 分析器"""
  486. # 读取 Stage 6 结果
  487. stage6_path = "output_v2/stage6_with_evaluations.json"
  488. if not os.path.exists(stage6_path):
  489. print(f"Stage 6 结果不存在: {stage6_path}")
  490. return
  491. with open(stage6_path, 'r', encoding='utf-8') as f:
  492. stage6_results = json.load(f)
  493. # 创建分析器
  494. analyzer = Stage7DeconstructionAnalyzer(
  495. max_workers=3,
  496. max_notes=5, # 只测试 5 个
  497. skip_count=0,
  498. sort_by='score'
  499. )
  500. # 运行分析
  501. stage7_results = analyzer.run(stage6_results)
  502. print(f"\n处理了 {stage7_results['metadata']['processed_notes']} 个帖子")
  503. print(f"成功: {stage7_results['metadata']['success_count']}")
  504. print(f"失败: {stage7_results['metadata']['failed_count']}")
  505. if __name__ == '__main__':
  506. logging.basicConfig(
  507. level=logging.INFO,
  508. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  509. )
  510. test_stage7_analyzer()