#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Xiaohongshu Blogger Historical Posts Fetcher Features: 1. Input Xiaohongshu author ID 2. Call API to get blogger's homepage info and historical posts 3. Support pagination 4. Output to author directory's historical posts folder 5. Filename is post ID with .json extension """ import requests import json import os import time from typing import Optional, Dict, List class XHSBloggerFetcher: """Xiaohongshu Blogger Historical Posts Fetcher""" API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts" def __init__(self, account_id: str, output_dir: Optional[str] = None): """ Initialize fetcher Args: account_id: Xiaohongshu blogger's ID output_dir: Output directory path, defaults to current dir + account_name """ self.account_id = account_id self.account_name = None # Will be set from first post's channel_account_name self.output_dir = output_dir self.posts_dir = None self.first_post_saved = False # Track if first post has been copied def fetch_posts(self, cursor: Optional[str] = None) -> Dict: """ Fetch blogger's historical posts Args: cursor: Pagination cursor, not passed for first request Returns: API response data """ payload = { "account_id": self.account_id } if cursor: payload["cursor"] = cursor try: response = requests.post( self.API_URL, json=payload, headers={"Content-Type": "application/json"}, timeout=30 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API request failed: {e}") return {} def _convert_post_format(self, post: Dict) -> Dict: """ Convert API post format to standard format Args: post: Original post data from API Returns: Converted post data in standard format """ from datetime import datetime # Extract image URLs from image_url_list images = [] for img in post.get("image_url_list", []): if isinstance(img, dict): images.append(img.get("image_url", "")) elif isinstance(img, str): images.append(img) # Extract video URL from video_url_list video_list = post.get("video_url_list", []) video = "" if video_list: if isinstance(video_list[0], dict): video = video_list[0].get("video_url", "") elif isinstance(video_list[0], str): video = video_list[0] # Convert timestamp to datetime string publish_timestamp = post.get("publish_timestamp", 0) publish_time = "" if publish_timestamp: try: dt = datetime.fromtimestamp(publish_timestamp / 1000) publish_time = dt.strftime("%Y-%m-%d %H:%M:%S") except: publish_time = "" # Build standard format converted = { "channel_content_id": post.get("channel_content_id"), "link": post.get("content_link", ""), "comment_count": post.get("comment_count", 0), "images": images, "like_count": post.get("like_count", 0), "body_text": post.get("body_text", ""), "title": post.get("title", ""), "collect_count": post.get("collect_count", 0), "channel_account_id": post.get("channel_account_id", ""), "channel_account_name": post.get("channel_account_name", ""), "content_type": post.get("content_type", "note"), "video": video, "publish_timestamp": publish_timestamp, "publish_time": publish_time } return converted def _initialize_directories(self, account_name: str): """ Initialize output directories using account name Args: account_name: Account name from first post """ if self.posts_dir is not None: return # Already initialized self.account_name = account_name # Use provided output_dir or default to current dir + account_name if self.output_dir is None: self.output_dir = os.path.join(os.getcwd(), account_name) self.posts_dir = os.path.join(self.output_dir, "作者历史帖子") # Ensure output directory exists os.makedirs(self.posts_dir, exist_ok=True) def save_post(self, post: Dict, is_first_post: bool = False) -> bool: """ Save single post to JSON file Args: post: Post data is_first_post: Whether this is the first post (will be copied to parent directory) Returns: Whether save was successful """ # Initialize directories on first post using account name if self.posts_dir is None: account_name = post.get("channel_account_name") if not account_name: print(f"Warning: Post data missing channel_account_name field, using account_id instead") account_name = self.account_id self._initialize_directories(account_name) # Get post ID as filename post_id = post.get("channel_content_id") if not post_id: print(f"Warning: Post data missing channel_content_id field, skipping") return False file_path = os.path.join(self.posts_dir, f"{post_id}.json") try: # Convert to standard format before saving converted_post = self._convert_post_format(post) with open(file_path, 'w', encoding='utf-8') as f: json.dump(converted_post, f, ensure_ascii=False, indent=2) print(f"✓ Saved post: {post_id}.json") # If this is the first post, copy it to parent directory as "待解构帖子.json" if is_first_post and not self.first_post_saved: target_path = os.path.join(self.output_dir, "待解构帖子.json") with open(target_path, 'w', encoding='utf-8') as f: json.dump(converted_post, f, ensure_ascii=False, indent=2) print(f"✓ Copied first post to: 待解构帖子.json") self.first_post_saved = True return True except Exception as e: print(f"✗ Failed to save post {post_id}: {e}") return False def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int: """ Fetch all historical posts (with pagination support) Args: max_pages: Maximum pages to fetch, None means fetch all delay: Delay between requests in seconds Returns: Number of successfully saved posts """ cursor = None page = 0 total_saved = 0 print(f"Starting to fetch historical posts for blogger: {self.account_id}") print("-" * 60) while True: page += 1 print(f"\nPage {page}:") # Fetch data response_data = self.fetch_posts(cursor) if not response_data: print("Failed to fetch data, stopping") break # Extract posts list (adjust based on actual API response structure) posts = self._extract_posts(response_data) if not posts: print("No more posts, finished") break # Save posts print(f"Got {len(posts)} posts on this page") for idx, post in enumerate(posts): # Mark the first post overall (page 1, first post) is_first = (page == 1 and idx == 0) if self.save_post(post, is_first_post=is_first): total_saved += 1 # Print output directory info after first post is saved if is_first: print(f"Output directory: {self.posts_dir}") # Check if there's a next page cursor = self._get_next_cursor(response_data) if not cursor: print("\nAll posts fetched") break # Check if max pages limit reached if max_pages and page >= max_pages: print(f"\nReached max pages limit ({max_pages} pages)") break # Delay to avoid too frequent requests if delay > 0: time.sleep(delay) print("-" * 60) print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}") return total_saved def _extract_posts(self, response_data: Dict) -> List[Dict]: """ Extract posts list from API response Args: response_data: API response data Returns: List of posts """ try: # Check if API call was successful if not response_data.get('success'): print(f"API call failed: {response_data}") return [] # Parse the result field (it's a JSON string) result_str = response_data.get('result', '{}') result = json.loads(result_str) # Check response code if result.get('code') != 0: print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}") return [] # Extract posts from result.data.data data = result.get('data', {}) posts = data.get('data', []) return posts if isinstance(posts, list) else [] except json.JSONDecodeError as e: print(f"Failed to parse result JSON: {e}") return [] except Exception as e: print(f"Error extracting posts: {e}") return [] def _get_next_cursor(self, response_data: Dict) -> Optional[str]: """ Extract next page cursor from API response Args: response_data: API response data Returns: Next page cursor, or None if no more pages """ try: # Check if API call was successful if not response_data.get('success'): return None # Parse the result field (it's a JSON string) result_str = response_data.get('result', '{}') result = json.loads(result_str) # Extract cursor from result.data.next_cursor and check has_more data = result.get('data', {}) has_more = data.get('has_more', False) next_cursor = data.get('next_cursor') # Only return cursor if there are more pages if has_more and next_cursor: return str(next_cursor) return None except Exception as e: print(f"Error extracting next cursor: {e}") return None def main(): """Main function""" import argparse parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts') parser.add_argument('account_id', help='Xiaohongshu blogger ID') parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_id)') parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)') parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)') args = parser.parse_args() # Create fetcher and execute fetcher = XHSBloggerFetcher(args.account_id, args.output) fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay) if __name__ == '__main__': main()