batch_fetch_accounts.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. #!/usr/bin/env python3
  2. """
  3. 批量处理账号数据脚本
  4. 功能:根据账号整理.json文件批量获取账号数据
  5. 输出目录结构:examples/[品类名称]/[tag名称]/[账号名称]/[帖子ID]/输入/
  6. """
  7. import json
  8. import time
  9. from pathlib import Path
  10. import sys
  11. import argparse
  12. import shutil
  13. # 导入共享工具模块
  14. from xhs_utils import (
  15. get_note_detail,
  16. get_author_history_notes,
  17. merge_note_data
  18. )
  19. def extract_account_id_from_url(url: str) -> str:
  20. """
  21. 从小红书账号URL中提取account_id
  22. Args:
  23. url: 小红书账号URL
  24. Returns:
  25. account_id: 账号ID
  26. """
  27. import re
  28. # 尝试从URL路径中提取
  29. pattern = r'/user/profile/([a-f0-9]+)'
  30. match = re.search(pattern, url)
  31. if match:
  32. return match.group(1)
  33. # 如果直接传入的是account_id,则直接返回
  34. if re.match(r'^[a-f0-9]{24}$', url):
  35. return url
  36. raise ValueError(f"无法从URL中提取account_id: {url}")
  37. def save_note_to_file(note_data: dict, file_path: Path):
  38. """
  39. 将帖子数据保存到JSON文件
  40. Args:
  41. note_data: 帖子数据
  42. file_path: 文件路径
  43. """
  44. # 确保目录存在
  45. file_path.parent.mkdir(parents=True, exist_ok=True)
  46. # 保存JSON文件
  47. with open(file_path, 'w', encoding='utf-8') as f:
  48. json.dump(note_data, f, ensure_ascii=False, indent=2)
  49. print(f" 已保存: {file_path}")
  50. def check_note_data_integrity(note_data: dict) -> bool:
  51. """
  52. 检查帖子数据的完整性
  53. Args:
  54. note_data: 帖子数据字典
  55. Returns:
  56. bool: 如果 images 或 video 字段至少一个不为空,返回 True,否则返回 False
  57. """
  58. images = note_data.get("images", [])
  59. video = note_data.get("video")
  60. # 检查 images 是否为非空列表
  61. has_images = isinstance(images, list) and len(images) > 0
  62. # 检查 video 是否存在且不为空(字符串或字典都可以)
  63. has_video = video is not None and video != "" and video != {}
  64. return has_images or has_video
  65. def check_account_data_exists(category_name: str, tag_name: str, account_name: str,
  66. note_id: str = None, output_dir: str = "examples") -> dict:
  67. """
  68. 检查账号数据是否已经存在且完整
  69. Args:
  70. category_name: 品类名称
  71. tag_name: tag名称
  72. account_name: 账号名称
  73. note_id: 帖子ID(可选,如果提供则检查该特定帖子)
  74. output_dir: 输出根目录
  75. Returns:
  76. dict: 包含检查结果的字典
  77. {
  78. "exists": bool, # 数据是否存在
  79. "complete": bool, # 数据是否完整
  80. "target_note_path": Path or None, # 待解构帖子路径
  81. "history_notes_path": Path or None, # 历史帖子目录路径
  82. "incomplete_files": list, # 不完整的文件列表
  83. "note_id": str or None # 如果已存在,返回帖子ID
  84. }
  85. """
  86. result = {
  87. "exists": False,
  88. "complete": False,
  89. "target_note_path": None,
  90. "history_notes_path": None,
  91. "incomplete_files": [],
  92. "note_id": None
  93. }
  94. # 如果没有提供note_id,需要先查找账号目录下是否有数据
  95. base_dir = Path(output_dir) / category_name / tag_name / account_name
  96. if not base_dir.exists():
  97. return result
  98. # 如果没有提供note_id,尝试查找现有的note_id目录
  99. if note_id is None:
  100. # 查找第一个存在的note_id目录
  101. note_dirs = [d for d in base_dir.iterdir() if d.is_dir()]
  102. if not note_dirs:
  103. return result
  104. # 使用第一个找到的目录
  105. note_id = note_dirs[0].name
  106. result["note_id"] = note_id
  107. # 构建路径
  108. input_dir = base_dir / note_id / "输入"
  109. target_note_path = input_dir / "待解构帖子.json"
  110. history_notes_path = input_dir / "作者历史帖子"
  111. result["target_note_path"] = target_note_path
  112. result["history_notes_path"] = history_notes_path
  113. # 检查输入目录是否存在
  114. if not input_dir.exists():
  115. return result
  116. result["exists"] = True
  117. # 检查待解构帖子是否存在且完整
  118. if not target_note_path.exists():
  119. result["incomplete_files"].append(str(target_note_path))
  120. return result
  121. try:
  122. with open(target_note_path, 'r', encoding='utf-8') as f:
  123. target_note_data = json.load(f)
  124. if not check_note_data_integrity(target_note_data):
  125. result["incomplete_files"].append(str(target_note_path))
  126. except Exception as e:
  127. result["incomplete_files"].append(f"{target_note_path} (读取错误: {e})")
  128. # 检查历史帖子目录
  129. if not history_notes_path.exists():
  130. result["incomplete_files"].append(str(history_notes_path))
  131. return result
  132. # 检查历史帖子文件的完整性
  133. history_files = list(history_notes_path.glob("*.json"))
  134. if len(history_files) == 0:
  135. result["incomplete_files"].append(f"{history_notes_path} (没有历史帖子文件)")
  136. else:
  137. # 统计有效的历史帖子数量
  138. valid_history_count = 0
  139. for history_file in history_files:
  140. try:
  141. with open(history_file, 'r', encoding='utf-8') as f:
  142. history_note_data = json.load(f)
  143. if not check_note_data_integrity(history_note_data):
  144. result["incomplete_files"].append(str(history_file))
  145. else:
  146. valid_history_count += 1
  147. except Exception as e:
  148. result["incomplete_files"].append(f"{history_file} (读取错误: {e})")
  149. # 验证历史帖子数量必须大于4
  150. if valid_history_count <= 4:
  151. result["incomplete_files"].append(f"{history_notes_path} (有效历史帖子数量 {valid_history_count} ≤ 4,不满足要求)")
  152. # 如果没有不完整的文件,则数据完整
  153. result["complete"] = len(result["incomplete_files"]) == 0
  154. return result
  155. def delete_incomplete_data(category_name: str, tag_name: str, account_name: str,
  156. note_id: str, output_dir: str = "examples") -> bool:
  157. """
  158. 删除不完整的账号数据目录
  159. Args:
  160. category_name: 品类名称
  161. tag_name: tag名称
  162. account_name: 账号名称
  163. note_id: 帖子ID
  164. output_dir: 输出根目录
  165. Returns:
  166. bool: 删除成功返回True,否则返回False
  167. """
  168. try:
  169. # 构建要删除的目录路径:examples/[品类]/[tag]/[账号]/[帖子ID]
  170. target_dir = Path(output_dir) / category_name / tag_name / account_name / note_id
  171. if target_dir.exists():
  172. shutil.rmtree(target_dir)
  173. print(f" ✓ 已删除不完整数据目录: {target_dir}")
  174. return True
  175. else:
  176. print(f" ⚠️ 目录不存在: {target_dir}")
  177. return False
  178. except Exception as e:
  179. print(f" ✗ 删除目录失败: {e}")
  180. return False
  181. def process_account(category_name: str, tag_name: str, account_info: dict,
  182. output_dir: str = "examples", check_only: bool = False,
  183. skip_if_exists: bool = True, clean_incomplete: bool = False):
  184. """
  185. 处理单个账号的数据获取
  186. Args:
  187. category_name: 品类名称
  188. tag_name: tag名称
  189. account_info: 账号信息字典,包含name和url
  190. output_dir: 输出根目录
  191. check_only: 如果为True,只检查数据是否存在,不执行获取操作
  192. skip_if_exists: 如果为True且数据已存在且完整,则跳过获取
  193. clean_incomplete: 如果为True,检测到不完整数据时自动删除
  194. """
  195. account_name = account_info.get("name", "未知账号")
  196. account_url = account_info.get("url", "")
  197. if not account_url:
  198. print(f"⚠️ 账号 {account_name} 没有URL,跳过")
  199. return
  200. print(f"\n{'='*80}")
  201. print(f"{'[检查模式]' if check_only else '[处理模式]'} 账号: {account_name}")
  202. print(f" 品类: {category_name}")
  203. print(f" Tag: {tag_name}")
  204. print(f" URL: {account_url}")
  205. print(f"{'='*80}")
  206. # 先检查数据是否已存在
  207. check_result = check_account_data_exists(category_name, tag_name, account_name, output_dir=output_dir)
  208. if check_result["exists"]:
  209. if check_result["complete"]:
  210. print(f"✓ 数据已存在且完整")
  211. print(f" 帖子ID: {check_result['note_id']}")
  212. print(f" 待解构帖子: {check_result['target_note_path']}")
  213. print(f" 历史帖子目录: {check_result['history_notes_path']}")
  214. if check_only or skip_if_exists:
  215. print(f"{' [检查模式] 跳过获取' if check_only else ' [跳过] 数据已完整'}")
  216. return
  217. else:
  218. print(f"⚠️ 数据存在但不完整")
  219. print(f" 帖子ID: {check_result['note_id']}")
  220. print(f" 不完整的文件:")
  221. for incomplete_file in check_result["incomplete_files"]:
  222. print(f" - {incomplete_file}")
  223. # 如果启用了清理不完整数据的功能
  224. if clean_incomplete:
  225. print(f" [清理模式] 删除不完整数据...")
  226. delete_incomplete_data(category_name, tag_name, account_name,
  227. check_result['note_id'], output_dir)
  228. if check_only:
  229. print(f" [检查模式] 需要重新获取")
  230. return
  231. else:
  232. print(f" 将重新获取数据...")
  233. else:
  234. print(f"ℹ️ 数据不存在")
  235. if check_only:
  236. print(f" [检查模式] 需要获取")
  237. return
  238. # 如果是检查模式,到这里就结束了
  239. if check_only:
  240. return
  241. try:
  242. # 1. 提取account_id
  243. account_id = extract_account_id_from_url(account_url)
  244. print(f"✓ 提取到account_id: {account_id}")
  245. # 2. 获取账号的所有历史帖子
  246. print(f"正在获取历史帖子...")
  247. history_notes = get_author_history_notes(account_id)
  248. if not history_notes or len(history_notes) == 0:
  249. print(f"⚠️ 未找到历史帖子")
  250. return
  251. print(f"✓ 找到 {len(history_notes)} 个历史帖子")
  252. # 3. 找出点赞数最高的帖子
  253. max_like_note = max(history_notes, key=lambda x: x.get("like_count", 0))
  254. max_like_note_id = max_like_note.get("note_id", "")
  255. max_like_count = max_like_note.get("like_count", 0)
  256. print(f"✓ 点赞数最高的帖子:")
  257. print(f" - 帖子ID: {max_like_note_id}")
  258. print(f" - 标题: {max_like_note.get('title', '无标题')}")
  259. print(f" - 点赞数: {max_like_count}")
  260. # 4. 处理点赞数最高的帖子(待解构帖子)
  261. print(f"正在处理待解构帖子...")
  262. need_detail = not (max_like_note.get("desc") or max_like_note.get("note_text") or max_like_note.get("body_text"))
  263. target_note_detail = None
  264. if need_detail:
  265. target_note_detail = get_note_detail(max_like_note_id)
  266. # 合并历史API和详情API的数据
  267. transformed_target = merge_note_data(max_like_note, target_note_detail)
  268. # 5. 创建新的目录结构:examples/[品类名称]/[tag名称]/[账号名称]/[帖子ID]/输入/
  269. base_path = Path(output_dir) / category_name / tag_name / account_name / max_like_note_id / "输入"
  270. history_path = base_path / "作者历史帖子"
  271. # 6. 保存待解构帖子
  272. target_note_path = base_path / "待解构帖子.json"
  273. save_note_to_file(transformed_target, target_note_path)
  274. # 7. 为每个历史帖子处理数据并保存
  275. print(f"正在处理所有历史帖子...")
  276. success_count = 0
  277. for idx, note in enumerate(history_notes, 1):
  278. history_note_id = note.get("note_id", "")
  279. if history_note_id:
  280. try:
  281. # 检查历史API数据是否缺少关键字段(主要是body_text)
  282. need_detail = not (note.get("desc") or note.get("note_text") or note.get("body_text"))
  283. detail_data = None
  284. if need_detail:
  285. detail_data = get_note_detail(history_note_id)
  286. # 添加请求间隔,避免频繁调用
  287. if idx < len(history_notes):
  288. time.sleep(0.5)
  289. # 合并历史API和详情API的数据
  290. merged_note = merge_note_data(note, detail_data)
  291. # 保存到文件
  292. history_note_path = history_path / f"{history_note_id}.json"
  293. save_note_to_file(merged_note, history_note_path)
  294. success_count += 1
  295. except Exception as e:
  296. print(f" ⚠️ 处理帖子 {history_note_id} 失败: {e}")
  297. continue
  298. print(f"\n✓ 账号 {account_name} 处理完成!")
  299. print(f"✓ 待解构帖子: {max_like_note_id}")
  300. print(f"✓ 共保存 {success_count} 个历史帖子")
  301. print(f"✓ 输出目录: {base_path}")
  302. except Exception as e:
  303. print(f"✗ 处理账号 {account_name} 失败: {e}")
  304. import traceback
  305. traceback.print_exc()
  306. def main():
  307. """主函数"""
  308. # 解析命令行参数
  309. parser = argparse.ArgumentParser(
  310. description='批量处理账号数据脚本',
  311. formatter_class=argparse.RawDescriptionHelpFormatter,
  312. epilog="""
  313. 使用示例:
  314. # 默认模式:获取数据,如果已存在且完整则跳过
  315. python batch_fetch_accounts.py
  316. # 只检查模式:只检查数据是否存在且完整,不获取数据
  317. python batch_fetch_accounts.py --check-only
  318. # 检查并清理不完整数据
  319. python batch_fetch_accounts.py --check-only --clean-incomplete
  320. # 强制获取模式:即使数据已存在也重新获取
  321. python batch_fetch_accounts.py --no-skip-if-exists
  322. # 指定配置文件
  323. python batch_fetch_accounts.py --config 账号整理.json
  324. """
  325. )
  326. parser.add_argument(
  327. '--config',
  328. type=str,
  329. default='账号整理.json',
  330. help='配置文件路径 (默认: 账号整理.json)'
  331. )
  332. parser.add_argument(
  333. '--check-only',
  334. action='store_true',
  335. help='只检查数据是否存在且完整,不执行获取操作'
  336. )
  337. parser.add_argument(
  338. '--no-skip-if-exists',
  339. action='store_true',
  340. help='即使数据已存在且完整也重新获取'
  341. )
  342. parser.add_argument(
  343. '--clean-incomplete',
  344. action='store_true',
  345. help='自动删除检测到的不完整数据目录'
  346. )
  347. parser.add_argument(
  348. '--output-dir',
  349. type=str,
  350. default='examples',
  351. help='输出根目录 (默认: examples)'
  352. )
  353. args = parser.parse_args()
  354. config_file = args.config
  355. check_only = args.check_only
  356. skip_if_exists = not args.no_skip_if_exists
  357. clean_incomplete = args.clean_incomplete
  358. output_dir = args.output_dir
  359. print(f"{'='*80}")
  360. print(f"批量账号数据{'检查' if check_only else '获取'}脚本")
  361. print(f"{'='*80}")
  362. print(f"配置文件: {config_file}")
  363. print(f"模式: {'只检查' if check_only else '获取数据'}")
  364. print(f"跳过已存在: {'是' if skip_if_exists else '否'}")
  365. print(f"清理不完整数据: {'是' if clean_incomplete else '否'}")
  366. print(f"输出目录: {output_dir}")
  367. print(f"{'='*80}\n")
  368. try:
  369. with open(config_file, 'r', encoding='utf-8') as f:
  370. config = json.load(f)
  371. except FileNotFoundError:
  372. print(f"错误: 找不到文件 {config_file}")
  373. return 1
  374. except json.JSONDecodeError as e:
  375. print(f"错误: JSON格式错误 - {e}")
  376. return 1
  377. # 解析配置文件
  378. categories = config.get("categories", [])
  379. if not categories:
  380. print("错误: 配置文件中没有找到 categories 数据")
  381. return 1
  382. # 统计信息
  383. total_accounts = 0
  384. processed_accounts = 0
  385. # 遍历所有品类
  386. for category in categories:
  387. category_name = category.get("name", "未知品类")
  388. tags = category.get("tags", [])
  389. # 遍历所有tag
  390. for tag_info in tags:
  391. tag_name = tag_info.get("tag", "未知tag")
  392. accounts = tag_info.get("accounts", [])
  393. # 遍历所有账号
  394. for account in accounts:
  395. total_accounts += 1
  396. try:
  397. process_account(
  398. category_name,
  399. tag_name,
  400. account,
  401. output_dir=output_dir,
  402. check_only=check_only,
  403. skip_if_exists=skip_if_exists,
  404. clean_incomplete=clean_incomplete
  405. )
  406. processed_accounts += 1
  407. except Exception as e:
  408. print(f"处理账号失败: {e}")
  409. continue
  410. # 账号之间添加延迟(检查模式不需要延迟)
  411. if not check_only:
  412. time.sleep(1)
  413. print(f"\n{'='*80}")
  414. print(f"批处理完成!")
  415. print(f"总共: {total_accounts} 个账号")
  416. print(f"成功: {processed_accounts} 个账号")
  417. print(f"失败: {total_accounts - processed_accounts} 个账号")
  418. print(f"{'='*80}")
  419. return 0
  420. if __name__ == "__main__":
  421. exit(main())