| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Xiaohongshu Blogger Historical Posts Fetcher
- Features:
- 1. Input Xiaohongshu author ID
- 2. Call API to get blogger's homepage info and historical posts
- 3. Support pagination
- 4. Output to author directory's historical posts folder
- 5. Filename is post ID with .json extension
- """
- import requests
- import json
- import os
- import time
- from typing import Optional, Dict, List
- class XHSBloggerFetcher:
- """Xiaohongshu Blogger Historical Posts Fetcher"""
- API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts"
- def __init__(self, account_id: str, output_dir: Optional[str] = None):
- """
- Initialize fetcher
- Args:
- account_id: Xiaohongshu blogger's ID
- output_dir: Output directory path, defaults to current dir + account_name
- """
- self.account_id = account_id
- self.account_name = None # Will be set from first post's channel_account_name
- self.output_dir = output_dir
- self.posts_dir = None
- self.first_post_saved = False # Track if first post has been copied
- def fetch_posts(self, cursor: Optional[str] = None) -> Dict:
- """
- Fetch blogger's historical posts
- Args:
- cursor: Pagination cursor, not passed for first request
- Returns:
- API response data
- """
- payload = {
- "account_id": self.account_id
- }
- if cursor:
- payload["cursor"] = cursor
- try:
- response = requests.post(
- self.API_URL,
- json=payload,
- headers={"Content-Type": "application/json"},
- timeout=30
- )
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"API request failed: {e}")
- return {}
- def _convert_post_format(self, post: Dict) -> Dict:
- """
- Convert API post format to standard format
- Args:
- post: Original post data from API
- Returns:
- Converted post data in standard format
- """
- from datetime import datetime
- # Extract image URLs from image_url_list
- images = []
- for img in post.get("image_url_list", []):
- if isinstance(img, dict):
- images.append(img.get("image_url", ""))
- elif isinstance(img, str):
- images.append(img)
- # Extract video URL from video_url_list
- video_list = post.get("video_url_list", [])
- video = ""
- if video_list:
- if isinstance(video_list[0], dict):
- video = video_list[0].get("video_url", "")
- elif isinstance(video_list[0], str):
- video = video_list[0]
- # Convert timestamp to datetime string
- publish_timestamp = post.get("publish_timestamp", 0)
- publish_time = ""
- if publish_timestamp:
- try:
- dt = datetime.fromtimestamp(publish_timestamp / 1000)
- publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
- except:
- publish_time = ""
- # Build standard format
- converted = {
- "channel_content_id": post.get("channel_content_id"),
- "link": post.get("content_link", ""),
- "comment_count": post.get("comment_count", 0),
- "images": images,
- "like_count": post.get("like_count", 0),
- "body_text": post.get("body_text", ""),
- "title": post.get("title", ""),
- "collect_count": post.get("collect_count", 0),
- "channel_account_id": post.get("channel_account_id", ""),
- "channel_account_name": post.get("channel_account_name", ""),
- "content_type": post.get("content_type", "note"),
- "video": video,
- "publish_timestamp": publish_timestamp,
- "publish_time": publish_time
- }
- return converted
- def _initialize_directories(self, account_name: str):
- """
- Initialize output directories using account name
- Args:
- account_name: Account name from first post
- """
- if self.posts_dir is not None:
- return # Already initialized
- self.account_name = account_name
- # Use provided output_dir or default to current dir + account_name
- if self.output_dir is None:
- self.output_dir = os.path.join(os.getcwd(), account_name)
- self.posts_dir = os.path.join(self.output_dir, "作者历史帖子")
- # Ensure output directory exists
- os.makedirs(self.posts_dir, exist_ok=True)
- def save_post(self, post: Dict, is_first_post: bool = False) -> bool:
- """
- Save single post to JSON file
- Args:
- post: Post data
- is_first_post: Whether this is the first post (will be copied to parent directory)
- Returns:
- Whether save was successful
- """
- # Initialize directories on first post using account name
- if self.posts_dir is None:
- account_name = post.get("channel_account_name")
- if not account_name:
- print(f"Warning: Post data missing channel_account_name field, using account_id instead")
- account_name = self.account_id
- self._initialize_directories(account_name)
- # Get post ID as filename
- post_id = post.get("channel_content_id")
- if not post_id:
- print(f"Warning: Post data missing channel_content_id field, skipping")
- return False
- file_path = os.path.join(self.posts_dir, f"{post_id}.json")
- try:
- # Convert to standard format before saving
- converted_post = self._convert_post_format(post)
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(converted_post, f, ensure_ascii=False, indent=2)
- print(f"✓ Saved post: {post_id}.json")
- # If this is the first post, copy it to parent directory as "待解构帖子.json"
- if is_first_post and not self.first_post_saved:
- target_path = os.path.join(self.output_dir, "待解构帖子.json")
- with open(target_path, 'w', encoding='utf-8') as f:
- json.dump(converted_post, f, ensure_ascii=False, indent=2)
- print(f"✓ Copied first post to: 待解构帖子.json")
- self.first_post_saved = True
- return True
- except Exception as e:
- print(f"✗ Failed to save post {post_id}: {e}")
- return False
- def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int:
- """
- Fetch all historical posts (with pagination support)
- Args:
- max_pages: Maximum pages to fetch, None means fetch all
- delay: Delay between requests in seconds
- Returns:
- Number of successfully saved posts
- """
- cursor = None
- page = 0
- total_saved = 0
- print(f"Starting to fetch historical posts for blogger: {self.account_id}")
- print("-" * 60)
- while True:
- page += 1
- print(f"\nPage {page}:")
- # Fetch data
- response_data = self.fetch_posts(cursor)
- if not response_data:
- print("Failed to fetch data, stopping")
- break
- # Extract posts list (adjust based on actual API response structure)
- posts = self._extract_posts(response_data)
- if not posts:
- print("No more posts, finished")
- break
- # Save posts
- print(f"Got {len(posts)} posts on this page")
- for idx, post in enumerate(posts):
- # Mark the first post overall (page 1, first post)
- is_first = (page == 1 and idx == 0)
- if self.save_post(post, is_first_post=is_first):
- total_saved += 1
- # Print output directory info after first post is saved
- if is_first:
- print(f"Output directory: {self.posts_dir}")
- # Check if there's a next page
- cursor = self._get_next_cursor(response_data)
- if not cursor:
- print("\nAll posts fetched")
- break
- # Check if max pages limit reached
- if max_pages and page >= max_pages:
- print(f"\nReached max pages limit ({max_pages} pages)")
- break
- # Delay to avoid too frequent requests
- if delay > 0:
- time.sleep(delay)
- print("-" * 60)
- print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}")
- return total_saved
- def _extract_posts(self, response_data: Dict) -> List[Dict]:
- """
- Extract posts list from API response
- Args:
- response_data: API response data
- Returns:
- List of posts
- """
- try:
- # Check if API call was successful
- if not response_data.get('success'):
- print(f"API call failed: {response_data}")
- return []
- # Parse the result field (it's a JSON string)
- result_str = response_data.get('result', '{}')
- result = json.loads(result_str)
- # Check response code
- if result.get('code') != 0:
- print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}")
- return []
- # Extract posts from result.data.data
- data = result.get('data', {})
- posts = data.get('data', [])
- return posts if isinstance(posts, list) else []
- except json.JSONDecodeError as e:
- print(f"Failed to parse result JSON: {e}")
- return []
- except Exception as e:
- print(f"Error extracting posts: {e}")
- return []
- def _get_next_cursor(self, response_data: Dict) -> Optional[str]:
- """
- Extract next page cursor from API response
- Args:
- response_data: API response data
- Returns:
- Next page cursor, or None if no more pages
- """
- try:
- # Check if API call was successful
- if not response_data.get('success'):
- return None
- # Parse the result field (it's a JSON string)
- result_str = response_data.get('result', '{}')
- result = json.loads(result_str)
- # Extract cursor from result.data.next_cursor and check has_more
- data = result.get('data', {})
- has_more = data.get('has_more', False)
- next_cursor = data.get('next_cursor')
- # Only return cursor if there are more pages
- if has_more and next_cursor:
- return str(next_cursor)
- return None
- except Exception as e:
- print(f"Error extracting next cursor: {e}")
- return None
- def main():
- """Main function"""
- import argparse
- parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts')
- parser.add_argument('account_id', help='Xiaohongshu blogger ID')
- parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_id)')
- parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)')
- parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)')
- args = parser.parse_args()
- # Create fetcher and execute
- fetcher = XHSBloggerFetcher(args.account_id, args.output)
- fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay)
- if __name__ == '__main__':
- main()
|