| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- #!/usr/bin/env python3
- """
- 批量处理账号数据脚本
- 功能:根据账号整理.json文件批量获取账号数据
- 输出目录结构:examples/[品类名称]/[tag名称]/[账号名称]/[帖子ID]/输入/
- """
- import json
- import time
- from pathlib import Path
- import sys
- import argparse
- import shutil
- # 导入共享工具模块
- from xhs_utils import (
- get_note_detail,
- get_author_history_notes,
- merge_note_data
- )
- def extract_account_id_from_url(url: str) -> str:
- """
- 从小红书账号URL中提取account_id
- Args:
- url: 小红书账号URL
- Returns:
- account_id: 账号ID
- """
- import re
- # 尝试从URL路径中提取
- pattern = r'/user/profile/([a-f0-9]+)'
- match = re.search(pattern, url)
- if match:
- return match.group(1)
- # 如果直接传入的是account_id,则直接返回
- if re.match(r'^[a-f0-9]{24}$', url):
- return url
- raise ValueError(f"无法从URL中提取account_id: {url}")
- def save_note_to_file(note_data: dict, file_path: Path):
- """
- 将帖子数据保存到JSON文件
- Args:
- note_data: 帖子数据
- file_path: 文件路径
- """
- # 确保目录存在
- file_path.parent.mkdir(parents=True, exist_ok=True)
- # 保存JSON文件
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(note_data, f, ensure_ascii=False, indent=2)
- print(f" 已保存: {file_path}")
- def check_note_data_integrity(note_data: dict) -> bool:
- """
- 检查帖子数据的完整性
- Args:
- note_data: 帖子数据字典
- Returns:
- bool: 如果 images 或 video 字段至少一个不为空,返回 True,否则返回 False
- """
- images = note_data.get("images", [])
- video = note_data.get("video")
- # 检查 images 是否为非空列表
- has_images = isinstance(images, list) and len(images) > 0
- # 检查 video 是否存在且不为空(字符串或字典都可以)
- has_video = video is not None and video != "" and video != {}
- return has_images or has_video
- def check_account_data_exists(category_name: str, tag_name: str, account_name: str,
- note_id: str = None, output_dir: str = "examples") -> dict:
- """
- 检查账号数据是否已经存在且完整
- Args:
- category_name: 品类名称
- tag_name: tag名称
- account_name: 账号名称
- note_id: 帖子ID(可选,如果提供则检查该特定帖子)
- output_dir: 输出根目录
- Returns:
- dict: 包含检查结果的字典
- {
- "exists": bool, # 数据是否存在
- "complete": bool, # 数据是否完整
- "target_note_path": Path or None, # 待解构帖子路径
- "history_notes_path": Path or None, # 历史帖子目录路径
- "incomplete_files": list, # 不完整的文件列表
- "note_id": str or None # 如果已存在,返回帖子ID
- }
- """
- result = {
- "exists": False,
- "complete": False,
- "target_note_path": None,
- "history_notes_path": None,
- "incomplete_files": [],
- "note_id": None
- }
- # 如果没有提供note_id,需要先查找账号目录下是否有数据
- base_dir = Path(output_dir) / category_name / tag_name / account_name
- if not base_dir.exists():
- return result
- # 如果没有提供note_id,尝试查找现有的note_id目录
- if note_id is None:
- # 查找第一个存在的note_id目录
- note_dirs = [d for d in base_dir.iterdir() if d.is_dir()]
- if not note_dirs:
- return result
- # 使用第一个找到的目录
- note_id = note_dirs[0].name
- result["note_id"] = note_id
- # 构建路径
- input_dir = base_dir / note_id / "输入"
- target_note_path = input_dir / "待解构帖子.json"
- history_notes_path = input_dir / "作者历史帖子"
- result["target_note_path"] = target_note_path
- result["history_notes_path"] = history_notes_path
- # 检查输入目录是否存在
- if not input_dir.exists():
- return result
- result["exists"] = True
- # 检查待解构帖子是否存在且完整
- if not target_note_path.exists():
- result["incomplete_files"].append(str(target_note_path))
- return result
- try:
- with open(target_note_path, 'r', encoding='utf-8') as f:
- target_note_data = json.load(f)
- if not check_note_data_integrity(target_note_data):
- result["incomplete_files"].append(str(target_note_path))
- except Exception as e:
- result["incomplete_files"].append(f"{target_note_path} (读取错误: {e})")
- # 检查历史帖子目录
- if not history_notes_path.exists():
- result["incomplete_files"].append(str(history_notes_path))
- return result
- # 检查历史帖子文件的完整性
- history_files = list(history_notes_path.glob("*.json"))
- if len(history_files) == 0:
- result["incomplete_files"].append(f"{history_notes_path} (没有历史帖子文件)")
- else:
- # 统计有效的历史帖子数量
- valid_history_count = 0
- for history_file in history_files:
- try:
- with open(history_file, 'r', encoding='utf-8') as f:
- history_note_data = json.load(f)
- if not check_note_data_integrity(history_note_data):
- result["incomplete_files"].append(str(history_file))
- else:
- valid_history_count += 1
- except Exception as e:
- result["incomplete_files"].append(f"{history_file} (读取错误: {e})")
- # 验证历史帖子数量必须大于4
- if valid_history_count <= 4:
- result["incomplete_files"].append(f"{history_notes_path} (有效历史帖子数量 {valid_history_count} ≤ 4,不满足要求)")
- # 如果没有不完整的文件,则数据完整
- result["complete"] = len(result["incomplete_files"]) == 0
- return result
- def delete_incomplete_data(category_name: str, tag_name: str, account_name: str,
- note_id: str, output_dir: str = "examples") -> bool:
- """
- 删除不完整的账号数据目录
- Args:
- category_name: 品类名称
- tag_name: tag名称
- account_name: 账号名称
- note_id: 帖子ID
- output_dir: 输出根目录
- Returns:
- bool: 删除成功返回True,否则返回False
- """
- try:
- # 构建要删除的目录路径:examples/[品类]/[tag]/[账号]/[帖子ID]
- target_dir = Path(output_dir) / category_name / tag_name / account_name / note_id
- if target_dir.exists():
- shutil.rmtree(target_dir)
- print(f" ✓ 已删除不完整数据目录: {target_dir}")
- return True
- else:
- print(f" ⚠️ 目录不存在: {target_dir}")
- return False
- except Exception as e:
- print(f" ✗ 删除目录失败: {e}")
- return False
- def process_account(category_name: str, tag_name: str, account_info: dict,
- output_dir: str = "examples", check_only: bool = False,
- skip_if_exists: bool = True, clean_incomplete: bool = False):
- """
- 处理单个账号的数据获取
- Args:
- category_name: 品类名称
- tag_name: tag名称
- account_info: 账号信息字典,包含name和url
- output_dir: 输出根目录
- check_only: 如果为True,只检查数据是否存在,不执行获取操作
- skip_if_exists: 如果为True且数据已存在且完整,则跳过获取
- clean_incomplete: 如果为True,检测到不完整数据时自动删除
- """
- account_name = account_info.get("name", "未知账号")
- account_url = account_info.get("url", "")
- if not account_url:
- print(f"⚠️ 账号 {account_name} 没有URL,跳过")
- return
- print(f"\n{'='*80}")
- print(f"{'[检查模式]' if check_only else '[处理模式]'} 账号: {account_name}")
- print(f" 品类: {category_name}")
- print(f" Tag: {tag_name}")
- print(f" URL: {account_url}")
- print(f"{'='*80}")
- # 先检查数据是否已存在
- check_result = check_account_data_exists(category_name, tag_name, account_name, output_dir=output_dir)
- if check_result["exists"]:
- if check_result["complete"]:
- print(f"✓ 数据已存在且完整")
- print(f" 帖子ID: {check_result['note_id']}")
- print(f" 待解构帖子: {check_result['target_note_path']}")
- print(f" 历史帖子目录: {check_result['history_notes_path']}")
- if check_only or skip_if_exists:
- print(f"{' [检查模式] 跳过获取' if check_only else ' [跳过] 数据已完整'}")
- return
- else:
- print(f"⚠️ 数据存在但不完整")
- print(f" 帖子ID: {check_result['note_id']}")
- print(f" 不完整的文件:")
- for incomplete_file in check_result["incomplete_files"]:
- print(f" - {incomplete_file}")
- # 如果启用了清理不完整数据的功能
- if clean_incomplete:
- print(f" [清理模式] 删除不完整数据...")
- delete_incomplete_data(category_name, tag_name, account_name,
- check_result['note_id'], output_dir)
- if check_only:
- print(f" [检查模式] 需要重新获取")
- return
- else:
- print(f" 将重新获取数据...")
- else:
- print(f"ℹ️ 数据不存在")
- if check_only:
- print(f" [检查模式] 需要获取")
- return
- # 如果是检查模式,到这里就结束了
- if check_only:
- return
- try:
- # 1. 提取account_id
- account_id = extract_account_id_from_url(account_url)
- print(f"✓ 提取到account_id: {account_id}")
- # 2. 获取账号的所有历史帖子
- print(f"正在获取历史帖子...")
- history_notes = get_author_history_notes(account_id)
- if not history_notes or len(history_notes) == 0:
- print(f"⚠️ 未找到历史帖子")
- return
- print(f"✓ 找到 {len(history_notes)} 个历史帖子")
- # 3. 找出点赞数最高的帖子
- max_like_note = max(history_notes, key=lambda x: x.get("like_count", 0))
- max_like_note_id = max_like_note.get("note_id", "")
- max_like_count = max_like_note.get("like_count", 0)
- print(f"✓ 点赞数最高的帖子:")
- print(f" - 帖子ID: {max_like_note_id}")
- print(f" - 标题: {max_like_note.get('title', '无标题')}")
- print(f" - 点赞数: {max_like_count}")
- # 4. 处理点赞数最高的帖子(待解构帖子)
- print(f"正在处理待解构帖子...")
- need_detail = not (max_like_note.get("desc") or max_like_note.get("note_text") or max_like_note.get("body_text"))
- target_note_detail = None
- if need_detail:
- target_note_detail = get_note_detail(max_like_note_id)
- # 合并历史API和详情API的数据
- transformed_target = merge_note_data(max_like_note, target_note_detail)
- # 5. 创建新的目录结构:examples/[品类名称]/[tag名称]/[账号名称]/[帖子ID]/输入/
- base_path = Path(output_dir) / category_name / tag_name / account_name / max_like_note_id / "输入"
- history_path = base_path / "作者历史帖子"
- # 6. 保存待解构帖子
- target_note_path = base_path / "待解构帖子.json"
- save_note_to_file(transformed_target, target_note_path)
- # 7. 为每个历史帖子处理数据并保存
- print(f"正在处理所有历史帖子...")
- success_count = 0
- for idx, note in enumerate(history_notes, 1):
- history_note_id = note.get("note_id", "")
- if history_note_id:
- try:
- # 检查历史API数据是否缺少关键字段(主要是body_text)
- need_detail = not (note.get("desc") or note.get("note_text") or note.get("body_text"))
- detail_data = None
- if need_detail:
- detail_data = get_note_detail(history_note_id)
- # 添加请求间隔,避免频繁调用
- if idx < len(history_notes):
- time.sleep(0.5)
- # 合并历史API和详情API的数据
- merged_note = merge_note_data(note, detail_data)
- # 保存到文件
- history_note_path = history_path / f"{history_note_id}.json"
- save_note_to_file(merged_note, history_note_path)
- success_count += 1
- except Exception as e:
- print(f" ⚠️ 处理帖子 {history_note_id} 失败: {e}")
- continue
- print(f"\n✓ 账号 {account_name} 处理完成!")
- print(f"✓ 待解构帖子: {max_like_note_id}")
- print(f"✓ 共保存 {success_count} 个历史帖子")
- print(f"✓ 输出目录: {base_path}")
- except Exception as e:
- print(f"✗ 处理账号 {account_name} 失败: {e}")
- import traceback
- traceback.print_exc()
- def main():
- """主函数"""
- # 解析命令行参数
- parser = argparse.ArgumentParser(
- description='批量处理账号数据脚本',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 使用示例:
- # 默认模式:获取数据,如果已存在且完整则跳过
- python batch_fetch_accounts.py
- # 只检查模式:只检查数据是否存在且完整,不获取数据
- python batch_fetch_accounts.py --check-only
- # 检查并清理不完整数据
- python batch_fetch_accounts.py --check-only --clean-incomplete
- # 强制获取模式:即使数据已存在也重新获取
- python batch_fetch_accounts.py --no-skip-if-exists
- # 指定配置文件
- python batch_fetch_accounts.py --config 账号整理.json
- """
- )
- parser.add_argument(
- '--config',
- type=str,
- default='账号整理.json',
- help='配置文件路径 (默认: 账号整理.json)'
- )
- parser.add_argument(
- '--check-only',
- action='store_true',
- help='只检查数据是否存在且完整,不执行获取操作'
- )
- parser.add_argument(
- '--no-skip-if-exists',
- action='store_true',
- help='即使数据已存在且完整也重新获取'
- )
- parser.add_argument(
- '--clean-incomplete',
- action='store_true',
- help='自动删除检测到的不完整数据目录'
- )
- parser.add_argument(
- '--output-dir',
- type=str,
- default='examples',
- help='输出根目录 (默认: examples)'
- )
- args = parser.parse_args()
- config_file = args.config
- check_only = args.check_only
- skip_if_exists = not args.no_skip_if_exists
- clean_incomplete = args.clean_incomplete
- output_dir = args.output_dir
- print(f"{'='*80}")
- print(f"批量账号数据{'检查' if check_only else '获取'}脚本")
- print(f"{'='*80}")
- print(f"配置文件: {config_file}")
- print(f"模式: {'只检查' if check_only else '获取数据'}")
- print(f"跳过已存在: {'是' if skip_if_exists else '否'}")
- print(f"清理不完整数据: {'是' if clean_incomplete else '否'}")
- print(f"输出目录: {output_dir}")
- print(f"{'='*80}\n")
- try:
- with open(config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
- except FileNotFoundError:
- print(f"错误: 找不到文件 {config_file}")
- return 1
- except json.JSONDecodeError as e:
- print(f"错误: JSON格式错误 - {e}")
- return 1
- # 解析配置文件
- categories = config.get("categories", [])
- if not categories:
- print("错误: 配置文件中没有找到 categories 数据")
- return 1
- # 统计信息
- total_accounts = 0
- processed_accounts = 0
- # 遍历所有品类
- for category in categories:
- category_name = category.get("name", "未知品类")
- tags = category.get("tags", [])
- # 遍历所有tag
- for tag_info in tags:
- tag_name = tag_info.get("tag", "未知tag")
- accounts = tag_info.get("accounts", [])
- # 遍历所有账号
- for account in accounts:
- total_accounts += 1
- try:
- process_account(
- category_name,
- tag_name,
- account,
- output_dir=output_dir,
- check_only=check_only,
- skip_if_exists=skip_if_exists,
- clean_incomplete=clean_incomplete
- )
- processed_accounts += 1
- except Exception as e:
- print(f"处理账号失败: {e}")
- continue
- # 账号之间添加延迟(检查模式不需要延迟)
- if not check_only:
- time.sleep(1)
- print(f"\n{'='*80}")
- print(f"批处理完成!")
- print(f"总共: {total_accounts} 个账号")
- print(f"成功: {processed_accounts} 个账号")
- print(f"失败: {total_accounts - processed_accounts} 个账号")
- print(f"{'='*80}")
- return 0
- if __name__ == "__main__":
- exit(main())
|