stage7_analyzer.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Stage 7 分析器
  5. 对 Stage 6 中完全匹配的帖子进行深度解构分析
  6. """
  7. import os
  8. import json
  9. import time
  10. import logging
  11. from datetime import datetime
  12. from concurrent.futures import ThreadPoolExecutor, as_completed
  13. from typing import Dict, List, Any, Optional
  14. from stage7_api_client import DeconstructionAPIClient, map_note_to_api_format
  15. try:
  16. from tqdm import tqdm
  17. TQDM_AVAILABLE = True
  18. except ImportError:
  19. TQDM_AVAILABLE = False
  20. logger.warning("tqdm 未安装,将使用简单进度显示。安装命令: pip install tqdm")
  21. logger = logging.getLogger(__name__)
  22. class Stage7DeconstructionAnalyzer:
  23. """Stage 7: 完全匹配帖子的深度解构分析"""
  24. def __init__(
  25. self,
  26. api_url: str = "http://192.168.245.150:7000/what/analysis/single",
  27. max_workers: int = 5,
  28. max_notes: Optional[int] = None,
  29. min_score: float = 8.0,
  30. skip_count: int = 0,
  31. sort_by: str = 'score',
  32. timeout: int = 30,
  33. max_retries: int = 3,
  34. output_dir: str = "output_v2",
  35. enable_image_download: bool = True,
  36. image_server_url: str = "http://localhost:8765",
  37. image_download_dir: str = "downloaded_images",
  38. target_features: Optional[List[str]] = None
  39. ):
  40. """
  41. 初始化 Stage 7 分析器
  42. Args:
  43. api_url: API 地址
  44. max_workers: 并发数
  45. max_notes: 最多处理多少个帖子(None = 不限制)
  46. min_score: 最低分数阈值(只处理 >= 此分数的帖子)
  47. skip_count: 跳过前 N 个
  48. sort_by: 排序方式 ('score' | 'time' | 'engagement')
  49. timeout: API 超时时间
  50. max_retries: API 最大重试次数
  51. output_dir: 输出目录
  52. enable_image_download: 是否启用图片下载(下载小红书图片并转换为本地URL)
  53. image_server_url: 图片服务器URL
  54. image_download_dir: 图片下载目录
  55. target_features: 指定要处理的原始特征列表(None = 处理所有特征)
  56. """
  57. self.max_workers = max_workers
  58. self.max_notes = max_notes
  59. self.min_score = min_score
  60. self.skip_count = skip_count
  61. self.sort_by = sort_by
  62. self.output_dir = output_dir
  63. self.enable_image_download = enable_image_download
  64. self.target_features = target_features # 新增:目标特征过滤
  65. # 初始化 API 客户端
  66. self.api_client = DeconstructionAPIClient(
  67. api_url=api_url,
  68. timeout=timeout,
  69. max_retries=max_retries
  70. )
  71. # 图片下载功能已弃用,直接使用原始图片URL
  72. # 保留参数以向后兼容,但不再使用
  73. if self.enable_image_download:
  74. logger.warning(" 注意: enable_image_download 参数已弃用,将直接使用原始图片URL")
  75. def extract_matched_notes_from_stage6(
  76. self,
  77. stage6_results: List[Dict]
  78. ) -> List[Dict]:
  79. """
  80. 从 Stage 6 结果中提取所有完全匹配的帖子
  81. Args:
  82. stage6_results: Stage 6 结果(列表)
  83. Returns:
  84. 完全匹配的帖子列表
  85. """
  86. matched_notes = []
  87. # Stage 6 结果是一个列表,每个元素是一个 feature_group
  88. for feature_group in stage6_results:
  89. original_feature = feature_group.get('原始特征名称', '')
  90. # 如果指定了 target_features,只处理指定的特征
  91. if self.target_features and original_feature not in self.target_features:
  92. continue
  93. # 遍历 组合评估结果_分组(这一层包含了 top10_searches)
  94. for combo_group in feature_group.get('组合评估结果_分组', []):
  95. # top10_searches 包含所有搜索结果
  96. for search_item in combo_group.get('top10_searches', []):
  97. search_word = search_item.get('search_word', '')
  98. source_word = search_item.get('source_word', '')
  99. evaluation = search_item.get('evaluation_with_filter', {})
  100. # 检查是否有搜索结果
  101. if 'search_result' not in search_item:
  102. continue
  103. notes = search_item['search_result'].get('data', {}).get('data', [])
  104. # 遍历评估结果
  105. for note_eval in evaluation.get('notes_evaluation', []):
  106. score = note_eval.get('综合得分', 0)
  107. # 只处理完全匹配的(分数 >= min_score)
  108. if score >= self.min_score:
  109. note_index = note_eval.get('note_index', -1)
  110. if 0 <= note_index < len(notes):
  111. note = notes[note_index]
  112. matched_notes.append({
  113. 'note': note,
  114. 'note_card': note.get('note_card', {}),
  115. 'evaluation': note_eval,
  116. 'search_word': search_word,
  117. 'source_word': source_word,
  118. 'original_feature': original_feature,
  119. 'top3_persona_features': feature_group.get('top3匹配信息', [])
  120. })
  121. return matched_notes
  122. def sort_matched_notes(
  123. self,
  124. matched_notes: List[Dict]
  125. ) -> List[Dict]:
  126. """
  127. 对完全匹配的帖子进行排序
  128. Args:
  129. matched_notes: 匹配的帖子列表
  130. Returns:
  131. 排序后的帖子列表
  132. """
  133. if self.sort_by == 'score':
  134. # 按评分降序(优先处理高分帖子)
  135. return sorted(
  136. matched_notes,
  137. key=lambda x: x['evaluation'].get('综合得分', 0),
  138. reverse=True
  139. )
  140. elif self.sort_by == 'time':
  141. # 按时间降序(优先处理最新帖子)
  142. return sorted(
  143. matched_notes,
  144. key=lambda x: x['note_card'].get('publish_timestamp', 0),
  145. reverse=True
  146. )
  147. elif self.sort_by == 'engagement':
  148. # 按互动量降序(点赞+收藏+评论)
  149. def calc_engagement(note_data):
  150. interact = note_data['note_card'].get('interact_info', {})
  151. return (
  152. interact.get('liked_count', 0) +
  153. interact.get('collected_count', 0) +
  154. interact.get('comment_count', 0)
  155. )
  156. return sorted(
  157. matched_notes,
  158. key=calc_engagement,
  159. reverse=True
  160. )
  161. return matched_notes
  162. def _save_intermediate_results(
  163. self,
  164. results: List[Dict],
  165. output_path: str,
  166. processed_count: int,
  167. total_count: int,
  168. start_time: float
  169. ):
  170. """
  171. 保存中间结果
  172. Args:
  173. results: 当前结果列表
  174. output_path: 输出路径
  175. processed_count: 已处理数量
  176. total_count: 总数量
  177. start_time: 开始时间
  178. """
  179. # 构建中间结果文件路径
  180. base_dir = os.path.dirname(output_path) or 'output_v2'
  181. base_name = os.path.basename(output_path)
  182. name_without_ext = os.path.splitext(base_name)[0]
  183. intermediate_path = os.path.join(
  184. base_dir,
  185. f"{name_without_ext}_partial_{processed_count}of{total_count}.json"
  186. )
  187. # 统计成功失败数
  188. success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
  189. failed_count = len(results) - success_count
  190. # 构建中间结果
  191. intermediate_result = {
  192. 'metadata': {
  193. 'stage': 'stage7_partial',
  194. 'description': f'部分结果({processed_count}/{total_count})',
  195. 'processed_notes': len(results),
  196. 'success_count': success_count,
  197. 'failed_count': failed_count,
  198. 'saved_at': datetime.now().isoformat(),
  199. 'processing_time_seconds': round(time.time() - start_time, 2)
  200. },
  201. 'results': results
  202. }
  203. # 保存
  204. os.makedirs(base_dir, exist_ok=True)
  205. with open(intermediate_path, 'w', encoding='utf-8') as f:
  206. json.dump(intermediate_result, f, ensure_ascii=False, indent=2)
  207. logger.info(f" 已保存中间结果: {intermediate_path} ({processed_count}/{total_count})")
  208. def process_single_note(
  209. self,
  210. matched_note_data: Dict,
  211. index: int,
  212. total: int
  213. ) -> Dict:
  214. """
  215. 处理单个帖子的解构分析
  216. Args:
  217. matched_note_data: 匹配的帖子数据
  218. index: 当前索引(用于日志)
  219. total: 总数(用于日志)
  220. Returns:
  221. 处理结果
  222. """
  223. note = matched_note_data['note']
  224. note_card = matched_note_data['note_card']
  225. evaluation = matched_note_data['evaluation']
  226. search_word = matched_note_data['search_word']
  227. original_feature = matched_note_data['original_feature']
  228. note_id = note.get('id', '')
  229. note_title = note_card.get('display_title', '')[:30] # 前30个字符
  230. logger.info(f"[{index}/{total}] 解构分析: {note_id}")
  231. logger.info(f" 标题: {note_title}...")
  232. logger.info(f" 搜索词: {search_word}")
  233. logger.info(f" 原始特征: {original_feature}")
  234. # 获取关键匹配点(用于保存到结果中)
  235. key_points = evaluation.get('关键匹配点', [])
  236. # 获取 top3 人设特征
  237. top3_features = matched_note_data.get('top3_persona_features', [])
  238. # 构建 start_points - 只使用 top3 的第一个人设特征名称
  239. start_points = []
  240. if top3_features:
  241. first_feature = top3_features[0].get('人设特征名称', '')
  242. if first_feature:
  243. start_points = [first_feature]
  244. logger.info(f" start_points: {start_points}")
  245. if top3_features:
  246. logger.info(f" top3人设特征: {[f.get('人设特征名称', '') for f in top3_features[:3]]}")
  247. # 直接使用原始图片URL,不做任何处理
  248. original_images = note_card.get('image_list', [])
  249. if original_images:
  250. logger.info(f" 图片数量: {len(original_images)}")
  251. # 映射数据为 API 格式(直接使用原始图片URL)
  252. api_payload = map_note_to_api_format(
  253. note=note,
  254. note_card=note_card,
  255. evaluation=evaluation,
  256. search_word=search_word,
  257. original_feature=original_feature,
  258. start_points=start_points,
  259. processed_image_urls=None # 不传递处理后的URL,使用原始URL
  260. )
  261. # 调用 API
  262. start_time = time.time()
  263. api_response = self.api_client.call_api(api_payload)
  264. processing_time = (time.time() - start_time) * 1000 # 毫秒
  265. # 构建结果
  266. result = {
  267. 'note_id': note_id,
  268. 'search_word': search_word,
  269. 'original_feature': original_feature,
  270. 'source_word': matched_note_data['source_word'],
  271. 'evaluation_score': evaluation.get('综合得分', 0),
  272. 'evaluation_type': evaluation.get('匹配类型', ''),
  273. 'evaluation_confidence': evaluation.get('置信度', ''),
  274. 'key_matching_points': key_points,
  275. 'note_data': {
  276. 'title': note_card.get('display_title', ''),
  277. 'author': note_card.get('user', {}).get('nick_name', ''),
  278. 'link': f"https://www.xiaohongshu.com/explore/{note_id}"
  279. },
  280. 'api_request': api_payload,
  281. 'api_response': api_response,
  282. 'processed_at': datetime.now().isoformat(),
  283. 'processing_time_ms': round(processing_time, 2)
  284. }
  285. if api_response['status'] == 'success':
  286. logger.info(f" ✓ 成功 ({processing_time:.0f}ms)")
  287. else:
  288. logger.error(f" ✗ 失败: {api_response['error']}")
  289. return result
  290. def run(
  291. self,
  292. stage6_results: Dict,
  293. output_path: Optional[str] = None
  294. ) -> Dict:
  295. """
  296. 执行 Stage 7 解构分析
  297. Args:
  298. stage6_results: Stage 6 结果
  299. output_path: 输出路径(可选)
  300. Returns:
  301. Stage 7 结果
  302. """
  303. logger.info("\n" + "=" * 60)
  304. logger.info("Stage 7: 完全匹配帖子的深度解构分析")
  305. logger.info("=" * 60)
  306. # 打印配置参数
  307. logger.info("配置参数:")
  308. logger.info(f" API 地址: {self.api_client.api_url}")
  309. if self.target_features:
  310. logger.info(f" 目标特征: {', '.join(self.target_features)}")
  311. else:
  312. logger.info(f" 目标特征: 全部")
  313. logger.info(f" 最低分数阈值: {self.min_score}")
  314. logger.info(f" 并发数: {self.max_workers}")
  315. logger.info(f" 最多处理帖子数: {self.max_notes if self.max_notes else '不限制'}")
  316. logger.info(f" 跳过前 N 个: {self.skip_count}")
  317. logger.info(f" 排序方式: {self.sort_by}")
  318. logger.info(f" API 超时: {self.api_client.timeout}秒")
  319. logger.info(f" 最大重试次数: {self.api_client.max_retries}")
  320. # 默认输出路径
  321. if output_path is None:
  322. output_path = os.path.join(self.output_dir, "stage7_with_deconstruction.json")
  323. # 1. 提取完全匹配的帖子
  324. matched_notes = self.extract_matched_notes_from_stage6(stage6_results)
  325. total_matched = len(matched_notes)
  326. logger.info(f" 完全匹配帖子总数: {total_matched} (分数 >= {self.min_score})")
  327. if total_matched == 0:
  328. logger.warning(" 没有找到完全匹配的帖子")
  329. return {
  330. 'metadata': {
  331. 'stage': 'stage7',
  332. 'total_matched_notes': 0,
  333. 'processed_notes': 0
  334. },
  335. 'results': []
  336. }
  337. # 2. 排序
  338. matched_notes = self.sort_matched_notes(matched_notes)
  339. logger.info(f" 排序方式: {self.sort_by}")
  340. # 3. 跳过前 N 个
  341. if self.skip_count > 0:
  342. logger.info(f" 跳过前 {self.skip_count} 个")
  343. matched_notes = matched_notes[self.skip_count:]
  344. # 4. 限制数量
  345. if self.max_notes is not None and len(matched_notes) > self.max_notes:
  346. logger.info(f" 数量限制: {self.max_notes}")
  347. matched_notes = matched_notes[:self.max_notes]
  348. to_process = len(matched_notes)
  349. logger.info(f" 实际处理: {to_process} 个")
  350. logger.info(f" 并发数: {self.max_workers}")
  351. logger.info(f" API: {self.api_client.api_url}")
  352. if to_process == 0:
  353. logger.warning(" 没有需要处理的帖子")
  354. return {
  355. 'metadata': {
  356. 'stage': 'stage7',
  357. 'total_matched_notes': total_matched,
  358. 'processed_notes': 0,
  359. 'skipped_notes': self.skip_count
  360. },
  361. 'results': []
  362. }
  363. # 5. 并行处理
  364. results = []
  365. start_time = time.time()
  366. save_interval = 10 # 每处理10个帖子保存一次
  367. with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
  368. futures = []
  369. for idx, note_data in enumerate(matched_notes, start=1):
  370. future = executor.submit(
  371. self.process_single_note,
  372. note_data,
  373. idx,
  374. to_process
  375. )
  376. futures.append(future)
  377. # 收集结果(带进度显示)
  378. if TQDM_AVAILABLE:
  379. # 使用 tqdm 进度条
  380. logger.info(" 使用进度条显示...")
  381. iterator = tqdm(
  382. as_completed(futures),
  383. total=len(futures),
  384. desc=" 处理进度",
  385. unit="帖子",
  386. ncols=100
  387. )
  388. else:
  389. # 简单进度显示
  390. iterator = as_completed(futures)
  391. processed_count = 0
  392. for future in iterator:
  393. try:
  394. result = future.result()
  395. results.append(result)
  396. processed_count += 1
  397. # 增量保存(每处理一定数量保存一次)
  398. if processed_count % save_interval == 0:
  399. self._save_intermediate_results(
  400. results,
  401. output_path,
  402. processed_count,
  403. to_process,
  404. start_time
  405. )
  406. # 简单进度显示(如果没有 tqdm)
  407. if not TQDM_AVAILABLE and processed_count % 5 == 0:
  408. logger.info(f" 进度: {processed_count}/{to_process}")
  409. except Exception as e:
  410. logger.error(f" 处理失败: {e}")
  411. processing_time = time.time() - start_time
  412. # 6. 统计
  413. success_count = sum(1 for r in results if r['api_response']['status'] == 'success')
  414. failed_count = len(results) - success_count
  415. logger.info(f"\n 总耗时: {processing_time:.1f}s")
  416. logger.info(f" 成功: {success_count}")
  417. logger.info(f" 失败: {failed_count}")
  418. # 7. 构建最终结果
  419. final_result = {
  420. 'metadata': {
  421. 'stage': 'stage7',
  422. 'description': '完全匹配帖子的深度解构分析',
  423. 'target_features': self.target_features if self.target_features else '全部',
  424. 'total_matched_notes': total_matched,
  425. 'processed_notes': len(results),
  426. 'skipped_notes': self.skip_count,
  427. 'max_notes_limit': self.max_notes,
  428. 'sort_by': self.sort_by,
  429. 'success_count': success_count,
  430. 'failed_count': failed_count,
  431. 'api_url': self.api_client.api_url,
  432. 'min_score_threshold': self.min_score,
  433. 'created_at': datetime.now().isoformat(),
  434. 'processing_time_seconds': round(processing_time, 2)
  435. },
  436. 'results': results
  437. }
  438. # 8. 保存结果
  439. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  440. with open(output_path, 'w', encoding='utf-8') as f:
  441. json.dump(final_result, f, ensure_ascii=False, indent=2)
  442. logger.info(f" 结果已保存: {output_path}")
  443. return final_result
  444. def test_stage7_analyzer():
  445. """测试 Stage 7 分析器"""
  446. # 读取 Stage 6 结果
  447. stage6_path = "output_v2/stage6_with_evaluations.json"
  448. if not os.path.exists(stage6_path):
  449. print(f"Stage 6 结果不存在: {stage6_path}")
  450. return
  451. with open(stage6_path, 'r', encoding='utf-8') as f:
  452. stage6_results = json.load(f)
  453. # 创建分析器
  454. analyzer = Stage7DeconstructionAnalyzer(
  455. max_workers=3,
  456. max_notes=5, # 只测试 5 个
  457. skip_count=0,
  458. sort_by='score'
  459. )
  460. # 运行分析
  461. stage7_results = analyzer.run(stage6_results)
  462. print(f"\n处理了 {stage7_results['metadata']['processed_notes']} 个帖子")
  463. print(f"成功: {stage7_results['metadata']['success_count']}")
  464. print(f"失败: {stage7_results['metadata']['failed_count']}")
  465. if __name__ == '__main__':
  466. logging.basicConfig(
  467. level=logging.INFO,
  468. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  469. )
  470. test_stage7_analyzer()