stage7_analyzer.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Stage 7 分析器
  5. 对 Stage 6 中完全匹配的帖子进行深度解构分析
  6. """
  7. import os
  8. import json
  9. import time
  10. import logging
  11. from datetime import datetime
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. from typing import Dict, List, Any, Optional
  14. from stage7_api_client import DeconstructionAPIClient, map_note_to_api_format
  15. try:
  16. from tqdm import tqdm
  17. TQDM_AVAILABLE = True
  18. except ImportError:
  19. TQDM_AVAILABLE = False
  20. logger.warning("tqdm 未安装,将使用简单进度显示。安装命令: pip install tqdm")
  21. logger = logging.getLogger(__name__)
  22. class Stage7DeconstructionAnalyzer:
  23. """Stage 7: 完全匹配帖子的深度解构分析"""
  24. def __init__(
  25. self,
  26. api_url: str = "http://192.168.245.150:7000/what/analysis/single",
  27. max_workers: int = 5,
  28. max_notes: Optional[int] = None,
  29. min_score: float = 8.0,
  30. skip_count: int = 0,
  31. sort_by: str = 'score',
  32. timeout: int = 30,
  33. max_retries: int = 3,
  34. output_dir: str = "output_v2",
  35. enable_image_download: bool = True,
  36. image_server_url: str = "http://localhost:8765",
  37. image_download_dir: str = "downloaded_images",
  38. target_features: Optional[List[str]] = None
  39. ):
  40. """
  41. 初始化 Stage 7 分析器
  42. Args:
  43. api_url: API 地址
  44. max_workers: 并发数
  45. max_notes: 最多处理多少个帖子(None = 不限制)
  46. min_score: 最低分数阈值(只处理 >= 此分数的帖子)
  47. skip_count: 跳过前 N 个
  48. sort_by: 排序方式 ('score' | 'time' | 'engagement')
  49. timeout: API 超时时间
  50. max_retries: API 最大重试次数
  51. output_dir: 输出目录
  52. enable_image_download: 是否启用图片下载(下载小红书图片并转换为本地URL)
  53. image_server_url: 图片服务器URL
  54. image_download_dir: 图片下载目录
  55. target_features: 指定要处理的原始特征列表(None = 处理所有特征)
  56. """
  57. self.max_workers = max_workers
  58. self.max_notes = max_notes
  59. self.min_score = min_score
  60. self.skip_count = skip_count
  61. self.sort_by = sort_by
  62. self.output_dir = output_dir
  63. self.enable_image_download = enable_image_download
  64. self.target_features = target_features # 新增:目标特征过滤
  65. # 初始化 API 客户端
  66. self.api_client = DeconstructionAPIClient(
  67. api_url=api_url,
  68. timeout=timeout,
  69. max_retries=max_retries
  70. )
  71. # 图片下载功能已弃用,直接使用原始图片URL
  72. # 保留参数以向后兼容,但不再使用
  73. if self.enable_image_download:
  74. logger.warning(" 注意: enable_image_download 参数已弃用,将直接使用原始图片URL")
  75. def extract_matched_notes_from_stage6(
  76. self,
  77. stage6_results: List[Dict]
  78. ) -> List[Dict]:
  79. """
  80. 从 Stage 6 结果中提取所有完全匹配的帖子
  81. Args:
  82. stage6_results: Stage 6 结果(列表)
  83. Returns:
  84. 完全匹配的帖子列表
  85. """
  86. matched_notes = []
  87. # Stage 6 结果是一个列表,每个元素是一个 feature_group
  88. for feature_group in stage6_results:
  89. original_feature = feature_group.get('原始特征名称', '')
  90. # 如果指定了 target_features,只处理指定的特征
  91. if self.target_features and original_feature not in self.target_features:
  92. continue
  93. # 遍历 组合评估结果_分组(这一层包含了 top10_searches)
  94. for combo_group in feature_group.get('组合评估结果_分组', []):
  95. # top10_searches 包含所有搜索结果
  96. for search_item in combo_group.get('top10_searches', []):
  97. search_word = search_item.get('search_word', '')
  98. source_word = search_item.get('source_word', '')
  99. evaluation = search_item.get('evaluation_with_filter', {})
  100. # 检查是否有搜索结果
  101. if 'search_result' not in search_item:
  102. continue
  103. notes = search_item['search_result'].get('data', {}).get('data', [])
  104. # 遍历评估结果
  105. for note_eval in evaluation.get('notes_evaluation', []):
  106. score = note_eval.get('综合得分', 0)
  107. # 只处理完全匹配的(分数 >= min_score)
  108. if score >= self.min_score:
  109. note_index = note_eval.get('note_index', -1)
  110. if 0 <= note_index < len(notes):
  111. note = notes[note_index]
  112. matched_notes.append({
  113. 'note': note,
  114. 'note_card': note.get('note_card', {}),
  115. 'evaluation': note_eval,
  116. 'search_word': search_word,
  117. 'source_word': source_word,
  118. 'original_feature': original_feature
  119. })
  120. return matched_notes
  121. def sort_matched_notes(
  122. self,
  123. matched_notes: List[Dict]
  124. ) -> List[Dict]:
  125. """
  126. 对完全匹配的帖子进行排序
  127. Args:
  128. matched_notes: 匹配的帖子列表
  129. Returns:
  130. 排序后的帖子列表
  131. """
  132. if self.sort_by == 'score':
  133. # 按评分降序(优先处理高分帖子)
  134. return sorted(
  135. matched_notes,
  136. key=lambda x: x['evaluation'].get('综合得分', 0),
  137. reverse=True
  138. )
  139. elif self.sort_by == 'time':
  140. # 按时间降序(优先处理最新帖子)
  141. return sorted(
  142. matched_notes,
  143. key=lambda x: x['note_card'].get('publish_timestamp', 0),
  144. reverse=True
  145. )
  146. elif self.sort_by == 'engagement':
  147. # 按互动量降序(点赞+收藏+评论)
  148. def calc_engagement(note_data):
  149. interact = note_data['note_card'].get('interact_info', {})
  150. return (
  151. interact.get('liked_count', 0) +
  152. interact.get('collected_count', 0) +
  153. interact.get('comment_count', 0)
  154. )
  155. return sorted(
  156. matched_notes,
  157. key=calc_engagement,
  158. reverse=True
  159. )
  160. return matched_notes
  161. def _save_intermediate_results(
  162. self,
  163. results: List[Dict],
  164. output_path: str,
  165. processed_count: int,
  166. total_count: int,
  167. start_time: float
  168. ):
  169. """
  170. 保存中间结果
  171. Args:
  172. results: 当前结果列表
  173. output_path: 输出路径
  174. processed_count: 已处理数量
  175. total_count: 总数量
  176. start_time: 开始时间
  177. """
  178. # 构建中间结果文件路径
  179. base_dir = os.path.dirname(output_path) or 'output_v2'
  180. base_name = os.path.basename(output_path)
  181. name_without_ext = os.path.splitext(base_name)[0]
  182. intermediate_path = os.path.join(
  183. base_dir,
  184. f"{name_without_ext}_partial_{processed_count}of{total_count}.json"
  185. )
  186. # 统计成功失败数
  187. success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
  188. failed_count = len(results) - success_count
  189. # 构建中间结果
  190. intermediate_result = {
  191. 'metadata': {
  192. 'stage': 'stage7_partial',
  193. 'description': f'部分结果({processed_count}/{total_count})',
  194. 'processed_notes': len(results),
  195. 'success_count': success_count,
  196. 'failed_count': failed_count,
  197. 'saved_at': datetime.now().isoformat(),
  198. 'processing_time_seconds': round(time.time() - start_time, 2)
  199. },
  200. 'results': results
  201. }
  202. # 保存
  203. os.makedirs(base_dir, exist_ok=True)
  204. with open(intermediate_path, 'w', encoding='utf-8') as f:
  205. json.dump(intermediate_result, f, ensure_ascii=False, indent=2)
  206. logger.info(f" 已保存中间结果: {intermediate_path} ({processed_count}/{total_count})")
  207. def process_single_note(
  208. self,
  209. matched_note_data: Dict,
  210. index: int,
  211. total: int
  212. ) -> Dict:
  213. """
  214. 处理单个帖子的解构分析
  215. Args:
  216. matched_note_data: 匹配的帖子数据
  217. index: 当前索引(用于日志)
  218. total: 总数(用于日志)
  219. Returns:
  220. 处理结果
  221. """
  222. note = matched_note_data['note']
  223. note_card = matched_note_data['note_card']
  224. evaluation = matched_note_data['evaluation']
  225. search_word = matched_note_data['search_word']
  226. original_feature = matched_note_data['original_feature']
  227. note_id = note.get('id', '')
  228. note_title = note_card.get('display_title', '')[:30] # 前30个字符
  229. logger.info(f"[{index}/{total}] 解构分析: {note_id}")
  230. logger.info(f" 标题: {note_title}...")
  231. logger.info(f" 搜索词: {search_word}")
  232. logger.info(f" 原始特征: {original_feature}")
  233. # 构建 start_points(使用组合方案)
  234. key_points = evaluation.get('关键匹配点', [])
  235. start_points = [
  236. original_feature, # 原始特征
  237. search_word, # 搜索词
  238. key_points[0] if key_points else '' # 第一个关键匹配点
  239. ]
  240. start_points = [p for p in start_points if p] # 过滤空值
  241. logger.info(f" start_points: {start_points}")
  242. # 直接使用原始图片URL,不做任何处理
  243. original_images = note_card.get('image_list', [])
  244. if original_images:
  245. logger.info(f" 图片数量: {len(original_images)}")
  246. # 映射数据为 API 格式(直接使用原始图片URL)
  247. api_payload = map_note_to_api_format(
  248. note=note,
  249. note_card=note_card,
  250. evaluation=evaluation,
  251. search_word=search_word,
  252. original_feature=original_feature,
  253. start_points=start_points,
  254. processed_image_urls=None # 不传递处理后的URL,使用原始URL
  255. )
  256. # 调用 API
  257. start_time = time.time()
  258. api_response = self.api_client.call_api(api_payload)
  259. processing_time = (time.time() - start_time) * 1000 # 毫秒
  260. # 构建结果
  261. result = {
  262. 'note_id': note_id,
  263. 'search_word': search_word,
  264. 'original_feature': original_feature,
  265. 'source_word': matched_note_data['source_word'],
  266. 'evaluation_score': evaluation.get('综合得分', 0),
  267. 'evaluation_type': evaluation.get('匹配类型', ''),
  268. 'evaluation_confidence': evaluation.get('置信度', ''),
  269. 'key_matching_points': key_points,
  270. 'note_data': {
  271. 'title': note_card.get('display_title', ''),
  272. 'author': note_card.get('user', {}).get('nick_name', ''),
  273. 'link': f"https://www.xiaohongshu.com/explore/{note_id}"
  274. },
  275. 'api_request': api_payload,
  276. 'api_response': api_response,
  277. 'processed_at': datetime.now().isoformat(),
  278. 'processing_time_ms': round(processing_time, 2)
  279. }
  280. if api_response['status'] == 'success':
  281. logger.info(f" ✓ 成功 ({processing_time:.0f}ms)")
  282. else:
  283. logger.error(f" ✗ 失败: {api_response['error']}")
  284. return result
  285. def run(
  286. self,
  287. stage6_results: Dict,
  288. output_path: Optional[str] = None
  289. ) -> Dict:
  290. """
  291. 执行 Stage 7 解构分析
  292. Args:
  293. stage6_results: Stage 6 结果
  294. output_path: 输出路径(可选)
  295. Returns:
  296. Stage 7 结果
  297. """
  298. logger.info("\n" + "=" * 60)
  299. logger.info("Stage 7: 完全匹配帖子的深度解构分析")
  300. logger.info("=" * 60)
  301. # 打印配置参数
  302. logger.info("配置参数:")
  303. logger.info(f" API 地址: {self.api_client.api_url}")
  304. if self.target_features:
  305. logger.info(f" 目标特征: {', '.join(self.target_features)}")
  306. else:
  307. logger.info(f" 目标特征: 全部")
  308. logger.info(f" 最低分数阈值: {self.min_score}")
  309. logger.info(f" 并发数: {self.max_workers}")
  310. logger.info(f" 最多处理帖子数: {self.max_notes if self.max_notes else '不限制'}")
  311. logger.info(f" 跳过前 N 个: {self.skip_count}")
  312. logger.info(f" 排序方式: {self.sort_by}")
  313. logger.info(f" API 超时: {self.api_client.timeout}秒")
  314. logger.info(f" 最大重试次数: {self.api_client.max_retries}")
  315. # 默认输出路径
  316. if output_path is None:
  317. output_path = os.path.join(self.output_dir, "stage7_with_deconstruction.json")
  318. # 1. 提取完全匹配的帖子
  319. matched_notes = self.extract_matched_notes_from_stage6(stage6_results)
  320. total_matched = len(matched_notes)
  321. logger.info(f" 完全匹配帖子总数: {total_matched} (分数 >= {self.min_score})")
  322. if total_matched == 0:
  323. logger.warning(" 没有找到完全匹配的帖子")
  324. return {
  325. 'metadata': {
  326. 'stage': 'stage7',
  327. 'total_matched_notes': 0,
  328. 'processed_notes': 0
  329. },
  330. 'results': []
  331. }
  332. # 2. 排序
  333. matched_notes = self.sort_matched_notes(matched_notes)
  334. logger.info(f" 排序方式: {self.sort_by}")
  335. # 3. 跳过前 N 个
  336. if self.skip_count > 0:
  337. logger.info(f" 跳过前 {self.skip_count} 个")
  338. matched_notes = matched_notes[self.skip_count:]
  339. # 4. 限制数量
  340. if self.max_notes is not None and len(matched_notes) > self.max_notes:
  341. logger.info(f" 数量限制: {self.max_notes}")
  342. matched_notes = matched_notes[:self.max_notes]
  343. to_process = len(matched_notes)
  344. logger.info(f" 实际处理: {to_process} 个")
  345. logger.info(f" 并发数: {self.max_workers}")
  346. logger.info(f" API: {self.api_client.api_url}")
  347. if to_process == 0:
  348. logger.warning(" 没有需要处理的帖子")
  349. return {
  350. 'metadata': {
  351. 'stage': 'stage7',
  352. 'total_matched_notes': total_matched,
  353. 'processed_notes': 0,
  354. 'skipped_notes': self.skip_count
  355. },
  356. 'results': []
  357. }
  358. # 5. 并行处理
  359. results = []
  360. start_time = time.time()
  361. save_interval = 10 # 每处理10个帖子保存一次
  362. with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
  363. futures = []
  364. for idx, note_data in enumerate(matched_notes, start=1):
  365. future = executor.submit(
  366. self.process_single_note,
  367. note_data,
  368. idx,
  369. to_process
  370. )
  371. futures.append(future)
  372. # 收集结果(带进度显示)
  373. if TQDM_AVAILABLE:
  374. # 使用 tqdm 进度条
  375. logger.info(" 使用进度条显示...")
  376. iterator = tqdm(
  377. as_completed(futures),
  378. total=len(futures),
  379. desc=" 处理进度",
  380. unit="帖子",
  381. ncols=100
  382. )
  383. else:
  384. # 简单进度显示
  385. iterator = as_completed(futures)
  386. processed_count = 0
  387. for future in iterator:
  388. try:
  389. result = future.result()
  390. results.append(result)
  391. processed_count += 1
  392. # 增量保存(每处理一定数量保存一次)
  393. if processed_count % save_interval == 0:
  394. self._save_intermediate_results(
  395. results,
  396. output_path,
  397. processed_count,
  398. to_process,
  399. start_time
  400. )
  401. # 简单进度显示(如果没有 tqdm)
  402. if not TQDM_AVAILABLE and processed_count % 5 == 0:
  403. logger.info(f" 进度: {processed_count}/{to_process}")
  404. except Exception as e:
  405. logger.error(f" 处理失败: {e}")
  406. processing_time = time.time() - start_time
  407. # 6. 统计
  408. success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
  409. failed_count = len(results) - success_count
  410. logger.info(f"\n 总耗时: {processing_time:.1f}s")
  411. logger.info(f" 成功: {success_count}")
  412. logger.info(f" 失败: {failed_count}")
  413. # 7. 构建最终结果
  414. final_result = {
  415. 'metadata': {
  416. 'stage': 'stage7',
  417. 'description': '完全匹配帖子的深度解构分析',
  418. 'target_features': self.target_features if self.target_features else '全部',
  419. 'total_matched_notes': total_matched,
  420. 'processed_notes': len(results),
  421. 'skipped_notes': self.skip_count,
  422. 'max_notes_limit': self.max_notes,
  423. 'sort_by': self.sort_by,
  424. 'success_count': success_count,
  425. 'failed_count': failed_count,
  426. 'api_url': self.api_client.api_url,
  427. 'min_score_threshold': self.min_score,
  428. 'created_at': datetime.now().isoformat(),
  429. 'processing_time_seconds': round(processing_time, 2)
  430. },
  431. 'results': results
  432. }
  433. # 8. 保存结果
  434. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  435. with open(output_path, 'w', encoding='utf-8') as f:
  436. json.dump(final_result, f, ensure_ascii=False, indent=2)
  437. logger.info(f" 结果已保存: {output_path}")
  438. return final_result
  439. def test_stage7_analyzer():
  440. """测试 Stage 7 分析器"""
  441. # 读取 Stage 6 结果
  442. stage6_path = "output_v2/stage6_with_evaluations.json"
  443. if not os.path.exists(stage6_path):
  444. print(f"Stage 6 结果不存在: {stage6_path}")
  445. return
  446. with open(stage6_path, 'r', encoding='utf-8') as f:
  447. stage6_results = json.load(f)
  448. # 创建分析器
  449. analyzer = Stage7DeconstructionAnalyzer(
  450. max_workers=3,
  451. max_notes=5, # 只测试 5 个
  452. skip_count=0,
  453. sort_by='score'
  454. )
  455. # 运行分析
  456. stage7_results = analyzer.run(stage6_results)
  457. print(f"\n处理了 {stage7_results['metadata']['processed_notes']} 个帖子")
  458. print(f"成功: {stage7_results['metadata']['success_count']}")
  459. print(f"失败: {stage7_results['metadata']['failed_count']}")
  460. if __name__ == '__main__':
  461. logging.basicConfig(
  462. level=logging.INFO,
  463. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  464. )
  465. test_stage7_analyzer()