#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Xiaohongshu Blogger Historical Posts Fetcher Features: 1. Fetch blogger's historical posts (posts command) - Input Xiaohongshu author ID - Call API to get blogger's homepage info and historical posts - Support pagination - Output to author directory's historical posts folder - Filename is post ID with .json extension - Auto fetch post details when body_text > threshold (default: 90 chars) - Update original post file's body_text with complete version from detail API 2. Fetch single post detail (detail command) - Input Xiaohongshu note ID - Call API to get post detail - Save to specified output file Usage: # Fetch historical posts (with auto body_text update) python fetch.py posts [-o output_dir] [-m max_pages] [-d delay] # Fetch historical posts without auto detail fetching python fetch.py posts --no-auto-detail # Fetch historical posts with custom threshold (e.g., 120 chars) python fetch.py posts --detail-threshold 120 # Fetch single post detail python fetch.py detail [-o output_file] """ import requests import json import os import time from typing import Optional, Dict, List class XHSBloggerFetcher: """Xiaohongshu Blogger Historical Posts Fetcher""" API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts" DETAIL_API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_detail_by_note_id" def __init__(self, account_id: str, output_dir: Optional[str] = None, auto_fetch_detail: bool = True, detail_threshold: int = 90): """ Initialize fetcher Args: account_id: Xiaohongshu blogger's ID output_dir: Output directory path, defaults to current dir + account_name auto_fetch_detail: Whether to auto fetch detail for posts with body_text > threshold detail_threshold: Minimum character count in body_text to trigger detail fetch (default: 90) """ self.account_id = account_id self.account_name = None # Will be set from first post's channel_account_name self.output_dir = output_dir self.posts_dir = None self.first_post_saved = False # Track if first post has been copied self.auto_fetch_detail = auto_fetch_detail self.detail_threshold = detail_threshold # Statistics self.detail_fetch_count = 0 # Successfully fetched and updated details self.detail_skip_count = 0 # Skipped due to low character count self.detail_error_count = 0 # Failed to fetch details def fetch_posts(self, cursor: Optional[str] = None) -> Dict: """ Fetch blogger's historical posts Args: cursor: Pagination cursor, not passed for first request Returns: API response data """ payload = { "account_id": self.account_id } if cursor: payload["cursor"] = cursor try: response = requests.post( self.API_URL, json=payload, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API request failed: {e}") return {} def fetch_post_detail(self, note_id: str) -> Dict: """ Fetch single post detail by note ID Args: note_id: Xiaohongshu note ID Returns: API response data containing post details """ payload = { "note_id": note_id } try: response = requests.post( self.DETAIL_API_URL, json=payload, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API request failed: {e}") return {} def _convert_post_format(self, post: Dict) -> Dict: """ Convert API post format to standard format Args: post: Original post data from API Returns: Converted post data in standard format """ from datetime import datetime # Extract image URLs from image_url_list images = [] for img in post.get("image_url_list", []): if isinstance(img, dict): images.append(img.get("image_url", "")) elif isinstance(img, str): images.append(img) # Extract video URL from video_url_list video_list = post.get("video_url_list", []) video = "" if video_list: if isinstance(video_list[0], dict): video = video_list[0].get("video_url", "") elif isinstance(video_list[0], str): video = video_list[0] # Convert timestamp to datetime string publish_timestamp = post.get("publish_timestamp", 0) publish_time = "" if publish_timestamp: try: dt = datetime.fromtimestamp(publish_timestamp / 1000) publish_time = dt.strftime("%Y-%m-%d %H:%M:%S") except: publish_time = "" # Build standard format converted = { "channel_content_id": post.get("channel_content_id"), "link": post.get("content_link", ""), "comment_count": post.get("comment_count", 0), "images": images, "like_count": post.get("like_count", 0), "body_text": post.get("body_text", ""), "title": post.get("title", ""), "collect_count": post.get("collect_count", 0), "channel_account_id": post.get("channel_account_id", ""), "channel_account_name": post.get("channel_account_name", ""), "content_type": post.get("content_type", "note"), "video": video, "publish_timestamp": publish_timestamp, "publish_time": publish_time } return converted def _initialize_directories(self, account_name: str): """ Initialize output directories using account name Args: account_name: Account name from first post """ if self.posts_dir is not None: return # Already initialized self.account_name = account_name # Use provided output_dir or default to current dir + account_name if self.output_dir is None: self.output_dir = os.path.join(os.getcwd(), account_name) self.posts_dir = os.path.join(self.output_dir, "作者历史帖子") # Ensure output directory exists os.makedirs(self.posts_dir, exist_ok=True) def _count_body_text_chars(self, body_text: str) -> int: """ Count characters in body_text Args: body_text: The post body text Returns: Total number of characters (including all whitespace, emoji, etc.) """ if not body_text: return 0 # Return the total length of the string return len(body_text) def save_post(self, post: Dict, is_first_post: bool = False) -> bool: """ Save single post to JSON file Args: post: Post data is_first_post: Whether this is the first post (will be copied to parent directory) Returns: Whether save was successful """ # Initialize directories on first post using account name if self.posts_dir is None: account_name = post.get("channel_account_name") if not account_name: print(f"Warning: Post data missing channel_account_name field, using account_id instead") account_name = self.account_id self._initialize_directories(account_name) # Get post ID as filename post_id = post.get("channel_content_id") if not post_id: print(f"Warning: Post data missing channel_content_id field, skipping") return False file_path = os.path.join(self.posts_dir, f"{post_id}.json") try: # Convert to standard format before saving converted_post = self._convert_post_format(post) with open(file_path, 'w', encoding='utf-8') as f: json.dump(converted_post, f, ensure_ascii=False, indent=2) print(f"✓ Saved post: {post_id}.json") # If this is the first post, copy it to parent directory as "待解构帖子.json" if is_first_post and not self.first_post_saved: target_path = os.path.join(self.output_dir, "待解构帖子.json") with open(target_path, 'w', encoding='utf-8') as f: json.dump(converted_post, f, ensure_ascii=False, indent=2) print(f"✓ Copied first post to: 待解构帖子.json") self.first_post_saved = True # Auto fetch post detail if body_text exceeds threshold if self.auto_fetch_detail: body_text = converted_post.get("body_text", "") char_count = self._count_body_text_chars(body_text) if char_count > self.detail_threshold: print(f" → Body text has {char_count} chars (> {self.detail_threshold}), fetching detail...") # Fetch detail response_data = self.fetch_post_detail(post_id) if response_data and response_data.get('success'): try: # Parse the result field (it's a JSON string containing a list) result_str = response_data.get('result', '[]') result_list = json.loads(result_str) # The result is a list with one item, which contains 'data' field if isinstance(result_list, list) and len(result_list) > 0: detail_data = result_list[0].get('data', {}) if detail_data and 'body_text' in detail_data: # Update the original post file with detailed body_text detail_body_text = detail_data.get('body_text', '') if detail_body_text and detail_body_text != body_text: converted_post['body_text'] = detail_body_text # Re-save the post file with updated body_text with open(file_path, 'w', encoding='utf-8') as f: json.dump(converted_post, f, ensure_ascii=False, indent=2) print(f" ✓ Updated body_text with complete version from detail API") else: print(f" → Body text already complete, no update needed") self.detail_fetch_count += 1 else: print(f" ✗ No valid data in detail response") self.detail_error_count += 1 else: print(f" ✗ Unexpected detail response format") self.detail_error_count += 1 except Exception as e: print(f" ✗ Failed to parse/update detail: {e}") self.detail_error_count += 1 else: print(f" ✗ Failed to fetch detail") self.detail_error_count += 1 else: print(f" → Body text has {char_count} chars (<= {self.detail_threshold}), skipping detail fetch") self.detail_skip_count += 1 return True except Exception as e: print(f"✗ Failed to save post {post_id}: {e}") return False def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int: """ Fetch all historical posts (with pagination support) Args: max_pages: Maximum pages to fetch, None means fetch all delay: Delay between requests in seconds Returns: Number of successfully saved posts """ cursor = None page = 0 total_saved = 0 print(f"Starting to fetch historical posts for blogger: {self.account_id}") print("-" * 60) while True: page += 1 print(f"\nPage {page}:") # Fetch data response_data = self.fetch_posts(cursor) if not response_data: print("Failed to fetch data, stopping") break # Extract posts list (adjust based on actual API response structure) posts = self._extract_posts(response_data) if not posts: print("No more posts, finished") break # Save posts print(f"Got {len(posts)} posts on this page") for idx, post in enumerate(posts): # Mark the first post overall (page 1, first post) is_first = (page == 1 and idx == 0) if self.save_post(post, is_first_post=is_first): total_saved += 1 # Print output directory info after first post is saved if is_first: print(f"Output directory: {self.posts_dir}") # Check if there's a next page cursor = self._get_next_cursor(response_data) if not cursor: print("\nAll posts fetched") break # Check if max pages limit reached if max_pages and page >= max_pages: print(f"\nReached max pages limit ({max_pages} pages)") break # Delay to avoid too frequent requests if delay > 0: time.sleep(delay) print("-" * 60) print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}") # Print detail fetch statistics if auto_fetch_detail is enabled if self.auto_fetch_detail: print(f"\nBody Text Update Statistics:") print(f" ✓ Successfully updated: {self.detail_fetch_count}") print(f" → Skipped (text <= {self.detail_threshold} chars): {self.detail_skip_count}") print(f" ✗ Failed: {self.detail_error_count}") return total_saved def _extract_posts(self, response_data: Dict) -> List[Dict]: """ Extract posts list from API response Args: response_data: API response data Returns: List of posts """ try: # Check if API call was successful if not response_data.get('success'): print(f"API call failed: {response_data}") return [] # Parse the result field (it's a JSON string) result_str = response_data.get('result', '{}') result = json.loads(result_str) # Check response code if result.get('code') != 0: print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}") return [] # Extract posts from result.data.data data = result.get('data', {}) posts = data.get('data', []) return posts if isinstance(posts, list) else [] except json.JSONDecodeError as e: print(f"Failed to parse result JSON: {e}") return [] except Exception as e: print(f"Error extracting posts: {e}") return [] def _get_next_cursor(self, response_data: Dict) -> Optional[str]: """ Extract next page cursor from API response Args: response_data: API response data Returns: Next page cursor, or None if no more pages """ try: # Check if API call was successful if not response_data.get('success'): return None # Parse the result field (it's a JSON string) result_str = response_data.get('result', '{}') result = json.loads(result_str) # Extract cursor from result.data.next_cursor and check has_more data = result.get('data', {}) has_more = data.get('has_more', False) next_cursor = data.get('next_cursor') # Only return cursor if there are more pages if has_more and next_cursor: return str(next_cursor) return None except Exception as e: print(f"Error extracting next cursor: {e}") return None def fetch_and_save_post_detail(self, note_id: str, output_path: Optional[str] = None) -> bool: """ Fetch post detail by note_id and save to file Args: note_id: Xiaohongshu note ID output_path: Optional output file path, defaults to {note_id}_detail.json Returns: Whether fetch and save was successful """ print(f"Fetching post detail for note_id: {note_id}") print("-" * 60) # Fetch post detail response_data = self.fetch_post_detail(note_id) if not response_data: print("✗ Failed to fetch post detail") return False try: # Check if API call was successful if not response_data.get('success'): print(f"✗ API call failed: {response_data}") return False # Parse the result field (it's a JSON string) result_str = response_data.get('result', '{}') result = json.loads(result_str) # Check response code if result.get('code') != 0: print(f"✗ API returned error code: {result.get('code')}, message: {result.get('msg')}") return False # Extract post data from result.data post_data = result.get('data', {}) if not post_data: print("✗ No post data in response") return False # Determine output path if output_path is None: output_path = f"{note_id}_detail.json" # Save to file with open(output_path, 'w', encoding='utf-8') as f: json.dump(post_data, f, ensure_ascii=False, indent=2) print(f"✓ Post detail saved to: {output_path}") print("-" * 60) return True except json.JSONDecodeError as e: print(f"✗ Failed to parse result JSON: {e}") return False except Exception as e: print(f"✗ Error saving post detail: {e}") return False def main(): """Main function""" import argparse parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts or single post detail') # Create subparsers for different commands subparsers = parser.add_subparsers(dest='command', help='Command to execute') # Subparser for fetching historical posts posts_parser = subparsers.add_parser('posts', help='Fetch blogger historical posts') posts_parser.add_argument('account_id', help='Xiaohongshu blogger ID') posts_parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_name)') posts_parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)') posts_parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)') posts_parser.add_argument('--no-auto-detail', action='store_true', help='Disable auto fetching post details') posts_parser.add_argument('--detail-threshold', type=int, default=90, help='Minimum character count in body_text to trigger detail fetch (default: 90)') # Subparser for fetching single post detail detail_parser = subparsers.add_parser('detail', help='Fetch single post detail by note_id') detail_parser.add_argument('note_id', help='Xiaohongshu note ID') detail_parser.add_argument('-o', '--output', help='Output file path (default: {note_id}_detail.json)') args = parser.parse_args() # If no command specified, show help if not args.command: parser.print_help() return # Execute corresponding command if args.command == 'posts': # Create fetcher and execute auto_fetch_detail = not args.no_auto_detail # Invert the flag fetcher = XHSBloggerFetcher( args.account_id, args.output, auto_fetch_detail=auto_fetch_detail, detail_threshold=args.detail_threshold ) fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay) elif args.command == 'detail': # Create a minimal fetcher instance (account_id not needed for detail fetching) fetcher = XHSBloggerFetcher(account_id='') fetcher.fetch_and_save_post_detail(args.note_id, args.output) if __name__ == '__main__': main()