| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 小红书搜索任务执行器
- 读取 associated_tags_results_with_search.json,
- 对所有非空的 search_word 执行小红书搜索,
- 并将结果写入到对应的特征节点下。
- """
- import sys
- import os
- # 将项目根目录添加到Python路径
- project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.insert(0, project_root)
- import json
- import logging
- import time
- import copy
- from pathlib import Path
- from typing import Dict, List, Any, Set, Optional
- from datetime import datetime
- import argparse
- from src.clients.xiaohongshu_search import XiaohongshuSearch
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- handlers=[
- logging.FileHandler('search_execution.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class SearchTaskExecutor:
- """搜索任务执行器"""
- def __init__(
- self,
- input_path: str,
- output_path: str = None,
- progress_path: str = 'search_progress.json',
- search_delay: float = 2.0,
- content_type: str = '图文',
- sort_type: str = '综合'
- ):
- """
- 初始化执行器
- Args:
- input_path: 输入JSON文件路径
- output_path: 输出JSON文件路径
- progress_path: 进度文件路径
- search_delay: 每次搜索间隔时间(秒)
- content_type: 内容类型
- sort_type: 排序方式
- """
- self.input_path = input_path
- self.output_path = output_path or input_path.replace(
- '.json', '_with_search_data.json'
- )
- self.progress_path = progress_path
- self.search_delay = search_delay
- self.content_type = content_type
- self.sort_type = sort_type
- # 初始化搜索客户端
- self.search_client = XiaohongshuSearch()
- # 统计信息
- self.stats = {
- '总特征数': 0,
- '有search_word的特征数': 0,
- '唯一search_word数': 0,
- '已完成搜索数': 0,
- '成功搜索数': 0,
- '失败搜索数': 0,
- '跳过搜索数': 0
- }
- def load_json(self, file_path: str) -> Any:
- """加载JSON文件"""
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except FileNotFoundError:
- logger.warning(f"文件不存在: {file_path}")
- return None
- except Exception as e:
- logger.error(f"加载文件失败 {file_path}: {e}")
- raise
- def save_json(self, data: Any, file_path: str):
- """保存JSON文件"""
- try:
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- logger.info(f"已保存: {file_path}")
- except Exception as e:
- logger.error(f"保存文件失败 {file_path}: {e}")
- raise
- def load_progress(self) -> Dict[str, Any]:
- """加载进度文件"""
- progress = self.load_json(self.progress_path)
- if progress is None:
- return {
- 'completed_searches': {}, # search_word -> result
- 'started_at': datetime.now().isoformat(),
- 'last_updated': None
- }
- return progress
- def save_progress(self, progress: Dict[str, Any]):
- """保存进度文件"""
- progress['last_updated'] = datetime.now().isoformat()
- self.save_json(progress, self.progress_path)
- def collect_search_words(self, data: List[Dict[str, Any]]) -> Dict[str, List[tuple]]:
- """
- 收集所有需要搜索的关键词
- Args:
- data: 输入数据列表
- Returns:
- 字典,key 为 search_word,value 为特征位置列表
- 位置格式: (result_idx, assoc_idx, feature_idx)
- """
- search_word_map = {} # search_word -> [(result_idx, assoc_idx, feature_idx), ...]
- for result_idx, result in enumerate(data):
- for assoc_idx, assoc in enumerate(result.get('找到的关联', [])):
- for feature_idx, feature in enumerate(assoc.get('特征列表', [])):
- self.stats['总特征数'] += 1
- search_word = feature.get('search_word')
- if search_word and search_word.strip():
- self.stats['有search_word的特征数'] += 1
- if search_word not in search_word_map:
- search_word_map[search_word] = []
- search_word_map[search_word].append(
- (result_idx, assoc_idx, feature_idx)
- )
- self.stats['唯一search_word数'] = len(search_word_map)
- return search_word_map
- def execute_search(
- self,
- search_word: str,
- max_retries: int = 3
- ) -> Optional[Dict[str, Any]]:
- """
- 执行单个搜索
- Args:
- search_word: 搜索关键词
- max_retries: 最大重试次数
- Returns:
- 搜索结果字典,失败返回 None
- """
- try:
- logger.info(f" 搜索: {search_word}")
- result = self.search_client.search(
- keyword=search_word,
- content_type=self.content_type,
- sort_type=self.sort_type,
- max_retries=max_retries
- )
- # 提取帖子数量
- note_count = len(result.get('data', {}).get('data', []))
- logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
- return result
- except Exception as e:
- logger.error(f" ✗ 失败: {e}")
- return None
- def process_searches(
- self,
- data: List[Dict[str, Any]],
- search_word_map: Dict[str, List[tuple]],
- progress: Dict[str, Any]
- ):
- """
- 执行所有搜索任务
- Args:
- data: 输入数据(会被修改)
- search_word_map: 搜索词映射
- progress: 进度数据
- """
- completed_searches = progress['completed_searches']
- total_searches = len(search_word_map)
- logger.info("=" * 60)
- logger.info("开始执行搜索任务")
- logger.info("=" * 60)
- logger.info(f"唯一搜索词数: {total_searches}")
- logger.info(f"已完成: {len(completed_searches)}")
- logger.info(f"待执行: {total_searches - len(completed_searches)}")
- logger.info("")
- # 遍历所有唯一的搜索词
- for idx, (search_word, positions) in enumerate(search_word_map.items(), 1):
- logger.info(f"[{idx}/{total_searches}] 处理: {search_word}")
- logger.info(f" 影响 {len(positions)} 个特征节点")
- # 检查是否已完成
- if search_word in completed_searches:
- logger.info(f" ⊙ 已完成(使用缓存结果)")
- search_result = completed_searches[search_word]
- self.stats['跳过搜索数'] += 1
- else:
- # 执行搜索
- search_result = self.execute_search(search_word)
- # 记录结果到进度文件
- completed_searches[search_word] = search_result
- self.stats['已完成搜索数'] += 1
- if search_result:
- self.stats['成功搜索数'] += 1
- else:
- self.stats['失败搜索数'] += 1
- # 保存进度
- self.save_progress(progress)
- # 延迟,避免请求过快
- if idx < total_searches: # 最后一次不需要延迟
- time.sleep(self.search_delay)
- # 将搜索结果写入到所有相关的特征节点
- self._write_results_to_features(
- data, positions, search_word, search_result
- )
- logger.info("")
- logger.info("=" * 60)
- logger.info("搜索任务执行完成")
- logger.info("=" * 60)
- def _write_results_to_features(
- self,
- data: List[Dict[str, Any]],
- positions: List[tuple],
- search_word: str,
- search_result: Optional[Dict[str, Any]]
- ):
- """
- 将搜索结果写入到所有相关的特征节点
- Args:
- data: 数据列表(会被修改)
- positions: 特征位置列表
- search_word: 搜索关键词
- search_result: 搜索结果
- """
- for result_idx, assoc_idx, feature_idx in positions:
- feature = data[result_idx]['找到的关联'][assoc_idx]['特征列表'][feature_idx]
- # 添加搜索结果
- if search_result:
- # 深拷贝,确保每个特征有独立的数据
- feature['search_result'] = copy.deepcopy(search_result)
- # 添加元数据
- note_count = len(search_result.get('data', {}).get('data', []))
- feature['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'success',
- 'note_count': note_count,
- 'search_params': {
- 'keyword': search_word,
- 'content_type': self.content_type,
- 'sort_type': self.sort_type
- }
- }
- else:
- # 搜索失败
- feature['search_result'] = None
- feature['search_metadata'] = {
- 'searched_at': datetime.now().isoformat(),
- 'status': 'failed',
- 'note_count': 0,
- 'search_params': {
- 'keyword': search_word,
- 'content_type': self.content_type,
- 'sort_type': self.sort_type
- }
- }
- def execute(self):
- """执行完整流程"""
- logger.info("=" * 60)
- logger.info("搜索任务执行器启动")
- logger.info("=" * 60)
- logger.info(f"输入文件: {self.input_path}")
- logger.info(f"输出文件: {self.output_path}")
- logger.info(f"进度文件: {self.progress_path}")
- logger.info(f"搜索延迟: {self.search_delay} 秒")
- logger.info("")
- # 1. 加载输入数据
- logger.info("步骤1: 加载输入数据")
- data = self.load_json(self.input_path)
- if not data:
- logger.error("输入数据为空,退出")
- return
- # 2. 加载进度
- logger.info("步骤2: 加载进度文件")
- progress = self.load_progress()
- # 3. 收集搜索词
- logger.info("步骤3: 收集搜索关键词")
- search_word_map = self.collect_search_words(data)
- logger.info(f" 总特征数: {self.stats['总特征数']}")
- logger.info(f" 有search_word的特征数: {self.stats['有search_word的特征数']}")
- logger.info(f" 唯一search_word数: {self.stats['唯一search_word数']}")
- logger.info("")
- # 4. 执行搜索
- logger.info("步骤4: 执行搜索任务")
- self.process_searches(data, search_word_map, progress)
- # 5. 保存结果
- logger.info("步骤5: 保存结果")
- self.save_json(data, self.output_path)
- # 6. 输出统计
- logger.info("")
- logger.info("=" * 60)
- logger.info("执行统计")
- logger.info("=" * 60)
- for key, value in self.stats.items():
- logger.info(f" {key}: {value}")
- logger.info("")
- logger.info("✓ 执行完成")
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='小红书搜索任务执行器')
- parser.add_argument(
- '--input',
- default='associated_tags_results_with_search.json',
- help='输入JSON文件路径(默认: associated_tags_results_with_search.json)'
- )
- parser.add_argument(
- '--output',
- default=None,
- help='输出JSON文件路径(默认: 输入文件名_with_search_data.json)'
- )
- parser.add_argument(
- '--progress',
- default='search_progress.json',
- help='进度文件路径(默认: search_progress.json)'
- )
- parser.add_argument(
- '--delay',
- type=float,
- default=2.0,
- help='每次搜索间隔时间(秒,默认: 2.0)'
- )
- parser.add_argument(
- '--content-type',
- default='图文',
- choices=['不限', '视频', '图文'],
- help='内容类型(默认: 图文)'
- )
- parser.add_argument(
- '--sort-type',
- default='综合',
- choices=['综合', '最新', '最多点赞', '最多评论'],
- help='排序方式(默认: 综合)'
- )
- args = parser.parse_args()
- # 创建执行器
- executor = SearchTaskExecutor(
- input_path=args.input,
- output_path=args.output,
- progress_path=args.progress,
- search_delay=args.delay,
- content_type=args.content_type,
- sort_type=args.sort_type
- )
- # 执行
- executor.execute()
- if __name__ == '__main__':
- main()
|