#!/usr/bin/env python3 """ 批量处理账号数据脚本 功能:根据账号整理.json文件批量获取账号数据 输出目录结构:examples/[品类名称]/[tag名称]/[账号名称]/[帖子ID]/输入/ """ import json import time from pathlib import Path import sys import argparse import shutil # 导入共享工具模块 from xhs_utils import ( get_note_detail, get_author_history_notes, merge_note_data ) def extract_account_id_from_url(url: str) -> str: """ 从小红书账号URL中提取account_id Args: url: 小红书账号URL Returns: account_id: 账号ID """ import re # 尝试从URL路径中提取 pattern = r'/user/profile/([a-f0-9]+)' match = re.search(pattern, url) if match: return match.group(1) # 如果直接传入的是account_id,则直接返回 if re.match(r'^[a-f0-9]{24}$', url): return url raise ValueError(f"无法从URL中提取account_id: {url}") def save_note_to_file(note_data: dict, file_path: Path): """ 将帖子数据保存到JSON文件 Args: note_data: 帖子数据 file_path: 文件路径 """ # 确保目录存在 file_path.parent.mkdir(parents=True, exist_ok=True) # 保存JSON文件 with open(file_path, 'w', encoding='utf-8') as f: json.dump(note_data, f, ensure_ascii=False, indent=2) print(f" 已保存: {file_path}") def check_note_data_integrity(note_data: dict) -> bool: """ 检查帖子数据的完整性 Args: note_data: 帖子数据字典 Returns: bool: 如果 images 或 video 字段至少一个不为空,返回 True,否则返回 False """ images = note_data.get("images", []) video = note_data.get("video") # 检查 images 是否为非空列表 has_images = isinstance(images, list) and len(images) > 0 # 检查 video 是否存在且不为空(字符串或字典都可以) has_video = video is not None and video != "" and video != {} return has_images or has_video def check_account_data_exists(category_name: str, tag_name: str, account_name: str, note_id: str = None, output_dir: str = "examples") -> dict: """ 检查账号数据是否已经存在且完整 Args: category_name: 品类名称 tag_name: tag名称 account_name: 账号名称 note_id: 帖子ID(可选,如果提供则检查该特定帖子) output_dir: 输出根目录 Returns: dict: 包含检查结果的字典 { "exists": bool, # 数据是否存在 "complete": bool, # 数据是否完整 "target_note_path": Path or None, # 待解构帖子路径 "history_notes_path": Path or None, # 历史帖子目录路径 "incomplete_files": list, # 不完整的文件列表 "note_id": str or None # 如果已存在,返回帖子ID } """ result = { "exists": False, "complete": False, "target_note_path": None, "history_notes_path": None, "incomplete_files": [], "note_id": None } # 如果没有提供note_id,需要先查找账号目录下是否有数据 base_dir = Path(output_dir) / category_name / tag_name / account_name if not base_dir.exists(): return result # 如果没有提供note_id,尝试查找现有的note_id目录 if note_id is None: # 查找第一个存在的note_id目录 note_dirs = [d for d in base_dir.iterdir() if d.is_dir()] if not note_dirs: return result # 使用第一个找到的目录 note_id = note_dirs[0].name result["note_id"] = note_id # 构建路径 input_dir = base_dir / note_id / "输入" target_note_path = input_dir / "待解构帖子.json" history_notes_path = input_dir / "作者历史帖子" result["target_note_path"] = target_note_path result["history_notes_path"] = history_notes_path # 检查输入目录是否存在 if not input_dir.exists(): return result result["exists"] = True # 检查待解构帖子是否存在且完整 if not target_note_path.exists(): result["incomplete_files"].append(str(target_note_path)) return result try: with open(target_note_path, 'r', encoding='utf-8') as f: target_note_data = json.load(f) if not check_note_data_integrity(target_note_data): result["incomplete_files"].append(str(target_note_path)) except Exception as e: result["incomplete_files"].append(f"{target_note_path} (读取错误: {e})") # 检查历史帖子目录 if not history_notes_path.exists(): result["incomplete_files"].append(str(history_notes_path)) return result # 检查历史帖子文件的完整性 history_files = list(history_notes_path.glob("*.json")) if len(history_files) == 0: result["incomplete_files"].append(f"{history_notes_path} (没有历史帖子文件)") else: # 统计有效的历史帖子数量 valid_history_count = 0 for history_file in history_files: try: with open(history_file, 'r', encoding='utf-8') as f: history_note_data = json.load(f) if not check_note_data_integrity(history_note_data): result["incomplete_files"].append(str(history_file)) else: valid_history_count += 1 except Exception as e: result["incomplete_files"].append(f"{history_file} (读取错误: {e})") # 验证历史帖子数量必须大于4 if valid_history_count <= 4: result["incomplete_files"].append(f"{history_notes_path} (有效历史帖子数量 {valid_history_count} ≤ 4,不满足要求)") # 如果没有不完整的文件,则数据完整 result["complete"] = len(result["incomplete_files"]) == 0 return result def delete_incomplete_data(category_name: str, tag_name: str, account_name: str, note_id: str, output_dir: str = "examples") -> bool: """ 删除不完整的账号数据目录 Args: category_name: 品类名称 tag_name: tag名称 account_name: 账号名称 note_id: 帖子ID output_dir: 输出根目录 Returns: bool: 删除成功返回True,否则返回False """ try: # 构建要删除的目录路径:examples/[品类]/[tag]/[账号]/[帖子ID] target_dir = Path(output_dir) / category_name / tag_name / account_name / note_id if target_dir.exists(): shutil.rmtree(target_dir) print(f" ✓ 已删除不完整数据目录: {target_dir}") return True else: print(f" ⚠️ 目录不存在: {target_dir}") return False except Exception as e: print(f" ✗ 删除目录失败: {e}") return False def process_account(category_name: str, tag_name: str, account_info: dict, output_dir: str = "examples", check_only: bool = False, skip_if_exists: bool = True, clean_incomplete: bool = False): """ 处理单个账号的数据获取 Args: category_name: 品类名称 tag_name: tag名称 account_info: 账号信息字典,包含name和url output_dir: 输出根目录 check_only: 如果为True,只检查数据是否存在,不执行获取操作 skip_if_exists: 如果为True且数据已存在且完整,则跳过获取 clean_incomplete: 如果为True,检测到不完整数据时自动删除 """ account_name = account_info.get("name", "未知账号") account_url = account_info.get("url", "") if not account_url: print(f"⚠️ 账号 {account_name} 没有URL,跳过") return print(f"\n{'='*80}") print(f"{'[检查模式]' if check_only else '[处理模式]'} 账号: {account_name}") print(f" 品类: {category_name}") print(f" Tag: {tag_name}") print(f" URL: {account_url}") print(f"{'='*80}") # 先检查数据是否已存在 check_result = check_account_data_exists(category_name, tag_name, account_name, output_dir=output_dir) if check_result["exists"]: if check_result["complete"]: print(f"✓ 数据已存在且完整") print(f" 帖子ID: {check_result['note_id']}") print(f" 待解构帖子: {check_result['target_note_path']}") print(f" 历史帖子目录: {check_result['history_notes_path']}") if check_only or skip_if_exists: print(f"{' [检查模式] 跳过获取' if check_only else ' [跳过] 数据已完整'}") return else: print(f"⚠️ 数据存在但不完整") print(f" 帖子ID: {check_result['note_id']}") print(f" 不完整的文件:") for incomplete_file in check_result["incomplete_files"]: print(f" - {incomplete_file}") # 如果启用了清理不完整数据的功能 if clean_incomplete: print(f" [清理模式] 删除不完整数据...") delete_incomplete_data(category_name, tag_name, account_name, check_result['note_id'], output_dir) if check_only: print(f" [检查模式] 需要重新获取") return else: print(f" 将重新获取数据...") else: print(f"ℹ️ 数据不存在") if check_only: print(f" [检查模式] 需要获取") return # 如果是检查模式,到这里就结束了 if check_only: return try: # 1. 提取account_id account_id = extract_account_id_from_url(account_url) print(f"✓ 提取到account_id: {account_id}") # 2. 获取账号的所有历史帖子 print(f"正在获取历史帖子...") history_notes = get_author_history_notes(account_id) if not history_notes or len(history_notes) == 0: print(f"⚠️ 未找到历史帖子") return print(f"✓ 找到 {len(history_notes)} 个历史帖子") # 3. 找出点赞数最高的帖子 max_like_note = max(history_notes, key=lambda x: x.get("like_count", 0)) max_like_note_id = max_like_note.get("note_id", "") max_like_count = max_like_note.get("like_count", 0) print(f"✓ 点赞数最高的帖子:") print(f" - 帖子ID: {max_like_note_id}") print(f" - 标题: {max_like_note.get('title', '无标题')}") print(f" - 点赞数: {max_like_count}") # 4. 处理点赞数最高的帖子(待解构帖子) print(f"正在处理待解构帖子...") need_detail = not (max_like_note.get("desc") or max_like_note.get("note_text") or max_like_note.get("body_text")) target_note_detail = None if need_detail: target_note_detail = get_note_detail(max_like_note_id) # 合并历史API和详情API的数据 transformed_target = merge_note_data(max_like_note, target_note_detail) # 5. 创建新的目录结构:examples/[品类名称]/[tag名称]/[账号名称]/[帖子ID]/输入/ base_path = Path(output_dir) / category_name / tag_name / account_name / max_like_note_id / "输入" history_path = base_path / "作者历史帖子" # 6. 保存待解构帖子 target_note_path = base_path / "待解构帖子.json" save_note_to_file(transformed_target, target_note_path) # 7. 为每个历史帖子处理数据并保存 print(f"正在处理所有历史帖子...") success_count = 0 for idx, note in enumerate(history_notes, 1): history_note_id = note.get("note_id", "") if history_note_id: try: # 检查历史API数据是否缺少关键字段(主要是body_text) need_detail = not (note.get("desc") or note.get("note_text") or note.get("body_text")) detail_data = None if need_detail: detail_data = get_note_detail(history_note_id) # 添加请求间隔,避免频繁调用 if idx < len(history_notes): time.sleep(0.5) # 合并历史API和详情API的数据 merged_note = merge_note_data(note, detail_data) # 保存到文件 history_note_path = history_path / f"{history_note_id}.json" save_note_to_file(merged_note, history_note_path) success_count += 1 except Exception as e: print(f" ⚠️ 处理帖子 {history_note_id} 失败: {e}") continue print(f"\n✓ 账号 {account_name} 处理完成!") print(f"✓ 待解构帖子: {max_like_note_id}") print(f"✓ 共保存 {success_count} 个历史帖子") print(f"✓ 输出目录: {base_path}") except Exception as e: print(f"✗ 处理账号 {account_name} 失败: {e}") import traceback traceback.print_exc() def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser( description='批量处理账号数据脚本', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 默认模式:获取数据,如果已存在且完整则跳过 python batch_fetch_accounts.py # 只检查模式:只检查数据是否存在且完整,不获取数据 python batch_fetch_accounts.py --check-only # 检查并清理不完整数据 python batch_fetch_accounts.py --check-only --clean-incomplete # 强制获取模式:即使数据已存在也重新获取 python batch_fetch_accounts.py --no-skip-if-exists # 指定配置文件 python batch_fetch_accounts.py --config 账号整理.json """ ) parser.add_argument( '--config', type=str, default='账号整理.json', help='配置文件路径 (默认: 账号整理.json)' ) parser.add_argument( '--check-only', action='store_true', help='只检查数据是否存在且完整,不执行获取操作' ) parser.add_argument( '--no-skip-if-exists', action='store_true', help='即使数据已存在且完整也重新获取' ) parser.add_argument( '--clean-incomplete', action='store_true', help='自动删除检测到的不完整数据目录' ) parser.add_argument( '--output-dir', type=str, default='examples', help='输出根目录 (默认: examples)' ) args = parser.parse_args() config_file = args.config check_only = args.check_only skip_if_exists = not args.no_skip_if_exists clean_incomplete = args.clean_incomplete output_dir = args.output_dir print(f"{'='*80}") print(f"批量账号数据{'检查' if check_only else '获取'}脚本") print(f"{'='*80}") print(f"配置文件: {config_file}") print(f"模式: {'只检查' if check_only else '获取数据'}") print(f"跳过已存在: {'是' if skip_if_exists else '否'}") print(f"清理不完整数据: {'是' if clean_incomplete else '否'}") print(f"输出目录: {output_dir}") print(f"{'='*80}\n") try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) except FileNotFoundError: print(f"错误: 找不到文件 {config_file}") return 1 except json.JSONDecodeError as e: print(f"错误: JSON格式错误 - {e}") return 1 # 解析配置文件 categories = config.get("categories", []) if not categories: print("错误: 配置文件中没有找到 categories 数据") return 1 # 统计信息 total_accounts = 0 processed_accounts = 0 # 遍历所有品类 for category in categories: category_name = category.get("name", "未知品类") tags = category.get("tags", []) # 遍历所有tag for tag_info in tags: tag_name = tag_info.get("tag", "未知tag") accounts = tag_info.get("accounts", []) # 遍历所有账号 for account in accounts: total_accounts += 1 try: process_account( category_name, tag_name, account, output_dir=output_dir, check_only=check_only, skip_if_exists=skip_if_exists, clean_incomplete=clean_incomplete ) processed_accounts += 1 except Exception as e: print(f"处理账号失败: {e}") continue # 账号之间添加延迟(检查模式不需要延迟) if not check_only: time.sleep(1) print(f"\n{'='*80}") print(f"批处理完成!") print(f"总共: {total_accounts} 个账号") print(f"成功: {processed_accounts} 个账号") print(f"失败: {total_accounts - processed_accounts} 个账号") print(f"{'='*80}") return 0 if __name__ == "__main__": exit(main())