execute_search_tasks.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 小红书搜索任务执行器
  5. 读取 associated_tags_results_with_search.json,
  6. 对所有非空的 search_word 执行小红书搜索,
  7. 并将结果写入到对应的特征节点下。
  8. """
  9. import json
  10. import logging
  11. import time
  12. import copy
  13. from pathlib import Path
  14. from typing import Dict, List, Any, Set, Optional
  15. from datetime import datetime
  16. import argparse
  17. from xiaohongshu_search import XiaohongshuSearch
  18. # 配置日志
  19. logging.basicConfig(
  20. level=logging.INFO,
  21. format='%(asctime)s - %(levelname)s - %(message)s',
  22. datefmt='%Y-%m-%d %H:%M:%S',
  23. handlers=[
  24. logging.FileHandler('search_execution.log', encoding='utf-8'),
  25. logging.StreamHandler()
  26. ]
  27. )
  28. logger = logging.getLogger(__name__)
  29. class SearchTaskExecutor:
  30. """搜索任务执行器"""
  31. def __init__(
  32. self,
  33. input_path: str,
  34. output_path: str = None,
  35. progress_path: str = 'search_progress.json',
  36. search_delay: float = 2.0,
  37. content_type: str = '图文',
  38. sort_type: str = '综合'
  39. ):
  40. """
  41. 初始化执行器
  42. Args:
  43. input_path: 输入JSON文件路径
  44. output_path: 输出JSON文件路径
  45. progress_path: 进度文件路径
  46. search_delay: 每次搜索间隔时间(秒)
  47. content_type: 内容类型
  48. sort_type: 排序方式
  49. """
  50. self.input_path = input_path
  51. self.output_path = output_path or input_path.replace(
  52. '.json', '_with_search_data.json'
  53. )
  54. self.progress_path = progress_path
  55. self.search_delay = search_delay
  56. self.content_type = content_type
  57. self.sort_type = sort_type
  58. # 初始化搜索客户端
  59. self.search_client = XiaohongshuSearch()
  60. # 统计信息
  61. self.stats = {
  62. '总特征数': 0,
  63. '有search_word的特征数': 0,
  64. '唯一search_word数': 0,
  65. '已完成搜索数': 0,
  66. '成功搜索数': 0,
  67. '失败搜索数': 0,
  68. '跳过搜索数': 0
  69. }
  70. def load_json(self, file_path: str) -> Any:
  71. """加载JSON文件"""
  72. try:
  73. with open(file_path, 'r', encoding='utf-8') as f:
  74. return json.load(f)
  75. except FileNotFoundError:
  76. logger.warning(f"文件不存在: {file_path}")
  77. return None
  78. except Exception as e:
  79. logger.error(f"加载文件失败 {file_path}: {e}")
  80. raise
  81. def save_json(self, data: Any, file_path: str):
  82. """保存JSON文件"""
  83. try:
  84. with open(file_path, 'w', encoding='utf-8') as f:
  85. json.dump(data, f, ensure_ascii=False, indent=2)
  86. logger.info(f"已保存: {file_path}")
  87. except Exception as e:
  88. logger.error(f"保存文件失败 {file_path}: {e}")
  89. raise
  90. def load_progress(self) -> Dict[str, Any]:
  91. """加载进度文件"""
  92. progress = self.load_json(self.progress_path)
  93. if progress is None:
  94. return {
  95. 'completed_searches': {}, # search_word -> result
  96. 'started_at': datetime.now().isoformat(),
  97. 'last_updated': None
  98. }
  99. return progress
  100. def save_progress(self, progress: Dict[str, Any]):
  101. """保存进度文件"""
  102. progress['last_updated'] = datetime.now().isoformat()
  103. self.save_json(progress, self.progress_path)
  104. def collect_search_words(self, data: List[Dict[str, Any]]) -> Dict[str, List[tuple]]:
  105. """
  106. 收集所有需要搜索的关键词
  107. Args:
  108. data: 输入数据列表
  109. Returns:
  110. 字典,key 为 search_word,value 为特征位置列表
  111. 位置格式: (result_idx, assoc_idx, feature_idx)
  112. """
  113. search_word_map = {} # search_word -> [(result_idx, assoc_idx, feature_idx), ...]
  114. for result_idx, result in enumerate(data):
  115. for assoc_idx, assoc in enumerate(result.get('找到的关联', [])):
  116. for feature_idx, feature in enumerate(assoc.get('特征列表', [])):
  117. self.stats['总特征数'] += 1
  118. search_word = feature.get('search_word')
  119. if search_word and search_word.strip():
  120. self.stats['有search_word的特征数'] += 1
  121. if search_word not in search_word_map:
  122. search_word_map[search_word] = []
  123. search_word_map[search_word].append(
  124. (result_idx, assoc_idx, feature_idx)
  125. )
  126. self.stats['唯一search_word数'] = len(search_word_map)
  127. return search_word_map
  128. def execute_search(
  129. self,
  130. search_word: str,
  131. max_retries: int = 3
  132. ) -> Optional[Dict[str, Any]]:
  133. """
  134. 执行单个搜索
  135. Args:
  136. search_word: 搜索关键词
  137. max_retries: 最大重试次数
  138. Returns:
  139. 搜索结果字典,失败返回 None
  140. """
  141. try:
  142. logger.info(f" 搜索: {search_word}")
  143. result = self.search_client.search(
  144. keyword=search_word,
  145. content_type=self.content_type,
  146. sort_type=self.sort_type,
  147. max_retries=max_retries
  148. )
  149. # 提取帖子数量
  150. note_count = len(result.get('data', {}).get('data', []))
  151. logger.info(f" ✓ 成功,获取 {note_count} 条帖子")
  152. return result
  153. except Exception as e:
  154. logger.error(f" ✗ 失败: {e}")
  155. return None
  156. def process_searches(
  157. self,
  158. data: List[Dict[str, Any]],
  159. search_word_map: Dict[str, List[tuple]],
  160. progress: Dict[str, Any]
  161. ):
  162. """
  163. 执行所有搜索任务
  164. Args:
  165. data: 输入数据(会被修改)
  166. search_word_map: 搜索词映射
  167. progress: 进度数据
  168. """
  169. completed_searches = progress['completed_searches']
  170. total_searches = len(search_word_map)
  171. logger.info("=" * 60)
  172. logger.info("开始执行搜索任务")
  173. logger.info("=" * 60)
  174. logger.info(f"唯一搜索词数: {total_searches}")
  175. logger.info(f"已完成: {len(completed_searches)}")
  176. logger.info(f"待执行: {total_searches - len(completed_searches)}")
  177. logger.info("")
  178. # 遍历所有唯一的搜索词
  179. for idx, (search_word, positions) in enumerate(search_word_map.items(), 1):
  180. logger.info(f"[{idx}/{total_searches}] 处理: {search_word}")
  181. logger.info(f" 影响 {len(positions)} 个特征节点")
  182. # 检查是否已完成
  183. if search_word in completed_searches:
  184. logger.info(f" ⊙ 已完成(使用缓存结果)")
  185. search_result = completed_searches[search_word]
  186. self.stats['跳过搜索数'] += 1
  187. else:
  188. # 执行搜索
  189. search_result = self.execute_search(search_word)
  190. # 记录结果到进度文件
  191. completed_searches[search_word] = search_result
  192. self.stats['已完成搜索数'] += 1
  193. if search_result:
  194. self.stats['成功搜索数'] += 1
  195. else:
  196. self.stats['失败搜索数'] += 1
  197. # 保存进度
  198. self.save_progress(progress)
  199. # 延迟,避免请求过快
  200. if idx < total_searches: # 最后一次不需要延迟
  201. time.sleep(self.search_delay)
  202. # 将搜索结果写入到所有相关的特征节点
  203. self._write_results_to_features(
  204. data, positions, search_word, search_result
  205. )
  206. logger.info("")
  207. logger.info("=" * 60)
  208. logger.info("搜索任务执行完成")
  209. logger.info("=" * 60)
  210. def _write_results_to_features(
  211. self,
  212. data: List[Dict[str, Any]],
  213. positions: List[tuple],
  214. search_word: str,
  215. search_result: Optional[Dict[str, Any]]
  216. ):
  217. """
  218. 将搜索结果写入到所有相关的特征节点
  219. Args:
  220. data: 数据列表(会被修改)
  221. positions: 特征位置列表
  222. search_word: 搜索关键词
  223. search_result: 搜索结果
  224. """
  225. for result_idx, assoc_idx, feature_idx in positions:
  226. feature = data[result_idx]['找到的关联'][assoc_idx]['特征列表'][feature_idx]
  227. # 添加搜索结果
  228. if search_result:
  229. # 深拷贝,确保每个特征有独立的数据
  230. feature['search_result'] = copy.deepcopy(search_result)
  231. # 添加元数据
  232. note_count = len(search_result.get('data', {}).get('data', []))
  233. feature['search_metadata'] = {
  234. 'searched_at': datetime.now().isoformat(),
  235. 'status': 'success',
  236. 'note_count': note_count,
  237. 'search_params': {
  238. 'keyword': search_word,
  239. 'content_type': self.content_type,
  240. 'sort_type': self.sort_type
  241. }
  242. }
  243. else:
  244. # 搜索失败
  245. feature['search_result'] = None
  246. feature['search_metadata'] = {
  247. 'searched_at': datetime.now().isoformat(),
  248. 'status': 'failed',
  249. 'note_count': 0,
  250. 'search_params': {
  251. 'keyword': search_word,
  252. 'content_type': self.content_type,
  253. 'sort_type': self.sort_type
  254. }
  255. }
  256. def execute(self):
  257. """执行完整流程"""
  258. logger.info("=" * 60)
  259. logger.info("搜索任务执行器启动")
  260. logger.info("=" * 60)
  261. logger.info(f"输入文件: {self.input_path}")
  262. logger.info(f"输出文件: {self.output_path}")
  263. logger.info(f"进度文件: {self.progress_path}")
  264. logger.info(f"搜索延迟: {self.search_delay} 秒")
  265. logger.info("")
  266. # 1. 加载输入数据
  267. logger.info("步骤1: 加载输入数据")
  268. data = self.load_json(self.input_path)
  269. if not data:
  270. logger.error("输入数据为空,退出")
  271. return
  272. # 2. 加载进度
  273. logger.info("步骤2: 加载进度文件")
  274. progress = self.load_progress()
  275. # 3. 收集搜索词
  276. logger.info("步骤3: 收集搜索关键词")
  277. search_word_map = self.collect_search_words(data)
  278. logger.info(f" 总特征数: {self.stats['总特征数']}")
  279. logger.info(f" 有search_word的特征数: {self.stats['有search_word的特征数']}")
  280. logger.info(f" 唯一search_word数: {self.stats['唯一search_word数']}")
  281. logger.info("")
  282. # 4. 执行搜索
  283. logger.info("步骤4: 执行搜索任务")
  284. self.process_searches(data, search_word_map, progress)
  285. # 5. 保存结果
  286. logger.info("步骤5: 保存结果")
  287. self.save_json(data, self.output_path)
  288. # 6. 输出统计
  289. logger.info("")
  290. logger.info("=" * 60)
  291. logger.info("执行统计")
  292. logger.info("=" * 60)
  293. for key, value in self.stats.items():
  294. logger.info(f" {key}: {value}")
  295. logger.info("")
  296. logger.info("✓ 执行完成")
  297. def main():
  298. """主函数"""
  299. parser = argparse.ArgumentParser(description='小红书搜索任务执行器')
  300. parser.add_argument(
  301. '--input',
  302. default='associated_tags_results_with_search.json',
  303. help='输入JSON文件路径(默认: associated_tags_results_with_search.json)'
  304. )
  305. parser.add_argument(
  306. '--output',
  307. default=None,
  308. help='输出JSON文件路径(默认: 输入文件名_with_search_data.json)'
  309. )
  310. parser.add_argument(
  311. '--progress',
  312. default='search_progress.json',
  313. help='进度文件路径(默认: search_progress.json)'
  314. )
  315. parser.add_argument(
  316. '--delay',
  317. type=float,
  318. default=2.0,
  319. help='每次搜索间隔时间(秒,默认: 2.0)'
  320. )
  321. parser.add_argument(
  322. '--content-type',
  323. default='图文',
  324. choices=['不限', '视频', '图文'],
  325. help='内容类型(默认: 图文)'
  326. )
  327. parser.add_argument(
  328. '--sort-type',
  329. default='综合',
  330. choices=['综合', '最新', '最多点赞', '最多评论'],
  331. help='排序方式(默认: 综合)'
  332. )
  333. args = parser.parse_args()
  334. # 创建执行器
  335. executor = SearchTaskExecutor(
  336. input_path=args.input,
  337. output_path=args.output,
  338. progress_path=args.progress,
  339. search_delay=args.delay,
  340. content_type=args.content_type,
  341. sort_type=args.sort_type
  342. )
  343. # 执行
  344. executor.execute()
  345. if __name__ == '__main__':
  346. main()