#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 小红书搜索任务执行器 读取 associated_tags_results_with_search.json, 对所有非空的 search_word 执行小红书搜索, 并将结果写入到对应的特征节点下。 """ import sys import os # 将项目根目录添加到Python路径 project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, project_root) import json import logging import time import copy from pathlib import Path from typing import Dict, List, Any, Set, Optional from datetime import datetime import argparse from src.clients.xiaohongshu_search import XiaohongshuSearch # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler('search_execution.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class SearchTaskExecutor: """搜索任务执行器""" def __init__( self, input_path: str, output_path: str = None, progress_path: str = 'search_progress.json', search_delay: float = 2.0, content_type: str = '图文', sort_type: str = '综合' ): """ 初始化执行器 Args: input_path: 输入JSON文件路径 output_path: 输出JSON文件路径 progress_path: 进度文件路径 search_delay: 每次搜索间隔时间(秒) content_type: 内容类型 sort_type: 排序方式 """ self.input_path = input_path self.output_path = output_path or input_path.replace( '.json', '_with_search_data.json' ) self.progress_path = progress_path self.search_delay = search_delay self.content_type = content_type self.sort_type = sort_type # 初始化搜索客户端 self.search_client = XiaohongshuSearch() # 统计信息 self.stats = { '总特征数': 0, '有search_word的特征数': 0, '唯一search_word数': 0, '已完成搜索数': 0, '成功搜索数': 0, '失败搜索数': 0, '跳过搜索数': 0 } def load_json(self, file_path: str) -> Any: """加载JSON文件""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.warning(f"文件不存在: {file_path}") return None except Exception as e: logger.error(f"加载文件失败 {file_path}: {e}") raise def save_json(self, data: Any, file_path: str): """保存JSON文件""" try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"已保存: {file_path}") except Exception as e: logger.error(f"保存文件失败 {file_path}: {e}") raise def load_progress(self) -> Dict[str, Any]: """加载进度文件""" progress = self.load_json(self.progress_path) if progress is None: return { 'completed_searches': {}, # search_word -> result 'started_at': datetime.now().isoformat(), 'last_updated': None } return progress def save_progress(self, progress: Dict[str, Any]): """保存进度文件""" progress['last_updated'] = datetime.now().isoformat() self.save_json(progress, self.progress_path) def collect_search_words(self, data: List[Dict[str, Any]]) -> Dict[str, List[tuple]]: """ 收集所有需要搜索的关键词 Args: data: 输入数据列表 Returns: 字典,key 为 search_word,value 为特征位置列表 位置格式: (result_idx, assoc_idx, feature_idx) """ search_word_map = {} # search_word -> [(result_idx, assoc_idx, feature_idx), ...] for result_idx, result in enumerate(data): for assoc_idx, assoc in enumerate(result.get('找到的关联', [])): for feature_idx, feature in enumerate(assoc.get('特征列表', [])): self.stats['总特征数'] += 1 search_word = feature.get('search_word') if search_word and search_word.strip(): self.stats['有search_word的特征数'] += 1 if search_word not in search_word_map: search_word_map[search_word] = [] search_word_map[search_word].append( (result_idx, assoc_idx, feature_idx) ) self.stats['唯一search_word数'] = len(search_word_map) return search_word_map def execute_search( self, search_word: str, max_retries: int = 3 ) -> Optional[Dict[str, Any]]: """ 执行单个搜索 Args: search_word: 搜索关键词 max_retries: 最大重试次数 Returns: 搜索结果字典,失败返回 None """ try: logger.info(f" 搜索: {search_word}") result = self.search_client.search( keyword=search_word, content_type=self.content_type, sort_type=self.sort_type, max_retries=max_retries ) # 提取帖子数量 note_count = len(result.get('data', {}).get('data', [])) logger.info(f" ✓ 成功,获取 {note_count} 条帖子") return result except Exception as e: logger.error(f" ✗ 失败: {e}") return None def process_searches( self, data: List[Dict[str, Any]], search_word_map: Dict[str, List[tuple]], progress: Dict[str, Any] ): """ 执行所有搜索任务 Args: data: 输入数据(会被修改) search_word_map: 搜索词映射 progress: 进度数据 """ completed_searches = progress['completed_searches'] total_searches = len(search_word_map) logger.info("=" * 60) logger.info("开始执行搜索任务") logger.info("=" * 60) logger.info(f"唯一搜索词数: {total_searches}") logger.info(f"已完成: {len(completed_searches)}") logger.info(f"待执行: {total_searches - len(completed_searches)}") logger.info("") # 遍历所有唯一的搜索词 for idx, (search_word, positions) in enumerate(search_word_map.items(), 1): logger.info(f"[{idx}/{total_searches}] 处理: {search_word}") logger.info(f" 影响 {len(positions)} 个特征节点") # 检查是否已完成 if search_word in completed_searches: logger.info(f" ⊙ 已完成(使用缓存结果)") search_result = completed_searches[search_word] self.stats['跳过搜索数'] += 1 else: # 执行搜索 search_result = self.execute_search(search_word) # 记录结果到进度文件 completed_searches[search_word] = search_result self.stats['已完成搜索数'] += 1 if search_result: self.stats['成功搜索数'] += 1 else: self.stats['失败搜索数'] += 1 # 保存进度 self.save_progress(progress) # 延迟,避免请求过快 if idx < total_searches: # 最后一次不需要延迟 time.sleep(self.search_delay) # 将搜索结果写入到所有相关的特征节点 self._write_results_to_features( data, positions, search_word, search_result ) logger.info("") logger.info("=" * 60) logger.info("搜索任务执行完成") logger.info("=" * 60) def _write_results_to_features( self, data: List[Dict[str, Any]], positions: List[tuple], search_word: str, search_result: Optional[Dict[str, Any]] ): """ 将搜索结果写入到所有相关的特征节点 Args: data: 数据列表(会被修改) positions: 特征位置列表 search_word: 搜索关键词 search_result: 搜索结果 """ for result_idx, assoc_idx, feature_idx in positions: feature = data[result_idx]['找到的关联'][assoc_idx]['特征列表'][feature_idx] # 添加搜索结果 if search_result: # 深拷贝,确保每个特征有独立的数据 feature['search_result'] = copy.deepcopy(search_result) # 添加元数据 note_count = len(search_result.get('data', {}).get('data', [])) feature['search_metadata'] = { 'searched_at': datetime.now().isoformat(), 'status': 'success', 'note_count': note_count, 'search_params': { 'keyword': search_word, 'content_type': self.content_type, 'sort_type': self.sort_type } } else: # 搜索失败 feature['search_result'] = None feature['search_metadata'] = { 'searched_at': datetime.now().isoformat(), 'status': 'failed', 'note_count': 0, 'search_params': { 'keyword': search_word, 'content_type': self.content_type, 'sort_type': self.sort_type } } def execute(self): """执行完整流程""" logger.info("=" * 60) logger.info("搜索任务执行器启动") logger.info("=" * 60) logger.info(f"输入文件: {self.input_path}") logger.info(f"输出文件: {self.output_path}") logger.info(f"进度文件: {self.progress_path}") logger.info(f"搜索延迟: {self.search_delay} 秒") logger.info("") # 1. 加载输入数据 logger.info("步骤1: 加载输入数据") data = self.load_json(self.input_path) if not data: logger.error("输入数据为空,退出") return # 2. 加载进度 logger.info("步骤2: 加载进度文件") progress = self.load_progress() # 3. 收集搜索词 logger.info("步骤3: 收集搜索关键词") search_word_map = self.collect_search_words(data) logger.info(f" 总特征数: {self.stats['总特征数']}") logger.info(f" 有search_word的特征数: {self.stats['有search_word的特征数']}") logger.info(f" 唯一search_word数: {self.stats['唯一search_word数']}") logger.info("") # 4. 执行搜索 logger.info("步骤4: 执行搜索任务") self.process_searches(data, search_word_map, progress) # 5. 保存结果 logger.info("步骤5: 保存结果") self.save_json(data, self.output_path) # 6. 输出统计 logger.info("") logger.info("=" * 60) logger.info("执行统计") logger.info("=" * 60) for key, value in self.stats.items(): logger.info(f" {key}: {value}") logger.info("") logger.info("✓ 执行完成") def main(): """主函数""" parser = argparse.ArgumentParser(description='小红书搜索任务执行器') parser.add_argument( '--input', default='associated_tags_results_with_search.json', help='输入JSON文件路径(默认: associated_tags_results_with_search.json)' ) parser.add_argument( '--output', default=None, help='输出JSON文件路径(默认: 输入文件名_with_search_data.json)' ) parser.add_argument( '--progress', default='search_progress.json', help='进度文件路径(默认: search_progress.json)' ) parser.add_argument( '--delay', type=float, default=2.0, help='每次搜索间隔时间(秒,默认: 2.0)' ) parser.add_argument( '--content-type', default='图文', choices=['不限', '视频', '图文'], help='内容类型(默认: 图文)' ) parser.add_argument( '--sort-type', default='综合', choices=['综合', '最新', '最多点赞', '最多评论'], help='排序方式(默认: 综合)' ) args = parser.parse_args() # 创建执行器 executor = SearchTaskExecutor( input_path=args.input, output_path=args.output, progress_path=args.progress, search_delay=args.delay, content_type=args.content_type, sort_type=args.sort_type ) # 执行 executor.execute() if __name__ == '__main__': main()