| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Xiaohongshu Blogger Historical Posts Fetcher
- Features:
- 1. Fetch blogger's historical posts (posts command)
- - Input Xiaohongshu author ID
- - Call API to get blogger's homepage info and historical posts
- - Support pagination
- - Output to author directory's historical posts folder
- - Filename is post ID with .json extension
- - Auto fetch post details when body_text > threshold (default: 90 chars)
- - Update original post file's body_text with complete version from detail API
- 2. Fetch single post detail (detail command)
- - Input Xiaohongshu note ID
- - Call API to get post detail
- - Save to specified output file
- Usage:
- # Fetch historical posts (with auto body_text update)
- python fetch.py posts <account_id> [-o output_dir] [-m max_pages] [-d delay]
- # Fetch historical posts without auto detail fetching
- python fetch.py posts <account_id> --no-auto-detail
- # Fetch historical posts with custom threshold (e.g., 120 chars)
- python fetch.py posts <account_id> --detail-threshold 120
- # Fetch single post detail
- python fetch.py detail <note_id> [-o output_file]
- """
- import requests
- import json
- import os
- import time
- from typing import Optional, Dict, List
- class XHSBloggerFetcher:
- """Xiaohongshu Blogger Historical Posts Fetcher"""
- API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts"
- DETAIL_API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_detail_by_note_id"
- def __init__(self, account_id: str, output_dir: Optional[str] = None,
- auto_fetch_detail: bool = True, detail_threshold: int = 90):
- """
- Initialize fetcher
- Args:
- account_id: Xiaohongshu blogger's ID
- output_dir: Output directory path, defaults to current dir + account_name
- auto_fetch_detail: Whether to auto fetch detail for posts with body_text > threshold
- detail_threshold: Minimum character count in body_text to trigger detail fetch (default: 90)
- """
- self.account_id = account_id
- self.account_name = None # Will be set from first post's channel_account_name
- self.output_dir = output_dir
- self.posts_dir = None
- self.first_post_saved = False # Track if first post has been copied
- self.auto_fetch_detail = auto_fetch_detail
- self.detail_threshold = detail_threshold
- # Statistics
- self.detail_fetch_count = 0 # Successfully fetched and updated details
- self.detail_skip_count = 0 # Skipped due to low character count
- self.detail_error_count = 0 # Failed to fetch details
- def fetch_posts(self, cursor: Optional[str] = None) -> Dict:
- """
- Fetch blogger's historical posts
- Args:
- cursor: Pagination cursor, not passed for first request
- Returns:
- API response data
- """
- payload = {
- "account_id": self.account_id
- }
- if cursor:
- payload["cursor"] = cursor
- try:
- response = requests.post(
- self.API_URL,
- json=payload,
- headers={"Content-Type": "application/json"},
- timeout=30
- )
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"API request failed: {e}")
- return {}
- def fetch_post_detail(self, note_id: str) -> Dict:
- """
- Fetch single post detail by note ID
- Args:
- note_id: Xiaohongshu note ID
- Returns:
- API response data containing post details
- """
- payload = {
- "note_id": note_id
- }
- try:
- response = requests.post(
- self.DETAIL_API_URL,
- json=payload,
- headers={"Content-Type": "application/json"},
- timeout=30
- )
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"API request failed: {e}")
- return {}
- def _convert_post_format(self, post: Dict) -> Dict:
- """
- Convert API post format to standard format
- Args:
- post: Original post data from API
- Returns:
- Converted post data in standard format
- """
- from datetime import datetime
- # Extract image URLs from image_url_list
- images = []
- for img in post.get("image_url_list", []):
- if isinstance(img, dict):
- images.append(img.get("image_url", ""))
- elif isinstance(img, str):
- images.append(img)
- # Extract video URL from video_url_list
- video_list = post.get("video_url_list", [])
- video = ""
- if video_list:
- if isinstance(video_list[0], dict):
- video = video_list[0].get("video_url", "")
- elif isinstance(video_list[0], str):
- video = video_list[0]
- # Convert timestamp to datetime string
- publish_timestamp = post.get("publish_timestamp", 0)
- publish_time = ""
- if publish_timestamp:
- try:
- dt = datetime.fromtimestamp(publish_timestamp / 1000)
- publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
- except:
- publish_time = ""
- # Build standard format
- converted = {
- "channel_content_id": post.get("channel_content_id"),
- "link": post.get("content_link", ""),
- "comment_count": post.get("comment_count", 0),
- "images": images,
- "like_count": post.get("like_count", 0),
- "body_text": post.get("body_text", ""),
- "title": post.get("title", ""),
- "collect_count": post.get("collect_count", 0),
- "channel_account_id": post.get("channel_account_id", ""),
- "channel_account_name": post.get("channel_account_name", ""),
- "content_type": post.get("content_type", "note"),
- "video": video,
- "publish_timestamp": publish_timestamp,
- "publish_time": publish_time
- }
- return converted
- def _initialize_directories(self, account_name: str):
- """
- Initialize output directories using account name
- Args:
- account_name: Account name from first post
- """
- if self.posts_dir is not None:
- return # Already initialized
- self.account_name = account_name
- # Use provided output_dir or default to current dir + account_name
- if self.output_dir is None:
- self.output_dir = os.path.join(os.getcwd(), account_name)
- self.posts_dir = os.path.join(self.output_dir, "作者历史帖子")
- # Ensure output directory exists
- os.makedirs(self.posts_dir, exist_ok=True)
- def _count_body_text_chars(self, body_text: str) -> int:
- """
- Count characters in body_text
- Args:
- body_text: The post body text
- Returns:
- Total number of characters (including all whitespace, emoji, etc.)
- """
- if not body_text:
- return 0
- # Return the total length of the string
- return len(body_text)
- def save_post(self, post: Dict, is_first_post: bool = False) -> bool:
- """
- Save single post to JSON file
- Args:
- post: Post data
- is_first_post: Whether this is the first post (will be copied to parent directory)
- Returns:
- Whether save was successful
- """
- # Initialize directories on first post using account name
- if self.posts_dir is None:
- account_name = post.get("channel_account_name")
- if not account_name:
- print(f"Warning: Post data missing channel_account_name field, using account_id instead")
- account_name = self.account_id
- self._initialize_directories(account_name)
- # Get post ID as filename
- post_id = post.get("channel_content_id")
- if not post_id:
- print(f"Warning: Post data missing channel_content_id field, skipping")
- return False
- file_path = os.path.join(self.posts_dir, f"{post_id}.json")
- try:
- # Convert to standard format before saving
- converted_post = self._convert_post_format(post)
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(converted_post, f, ensure_ascii=False, indent=2)
- print(f"✓ Saved post: {post_id}.json")
- # If this is the first post, copy it to parent directory as "待解构帖子.json"
- if is_first_post and not self.first_post_saved:
- target_path = os.path.join(self.output_dir, "待解构帖子.json")
- with open(target_path, 'w', encoding='utf-8') as f:
- json.dump(converted_post, f, ensure_ascii=False, indent=2)
- print(f"✓ Copied first post to: 待解构帖子.json")
- self.first_post_saved = True
- # Auto fetch post detail if body_text exceeds threshold
- if self.auto_fetch_detail:
- body_text = converted_post.get("body_text", "")
- char_count = self._count_body_text_chars(body_text)
- if char_count > self.detail_threshold:
- print(f" → Body text has {char_count} chars (> {self.detail_threshold}), fetching detail...")
- # Fetch detail
- response_data = self.fetch_post_detail(post_id)
- if response_data and response_data.get('success'):
- try:
- # Parse the result field (it's a JSON string containing a list)
- result_str = response_data.get('result', '[]')
- result_list = json.loads(result_str)
- # The result is a list with one item, which contains 'data' field
- if isinstance(result_list, list) and len(result_list) > 0:
- detail_data = result_list[0].get('data', {})
- if detail_data and 'body_text' in detail_data:
- # Update the original post file with detailed body_text
- detail_body_text = detail_data.get('body_text', '')
- if detail_body_text and detail_body_text != body_text:
- converted_post['body_text'] = detail_body_text
- # Re-save the post file with updated body_text
- with open(file_path, 'w', encoding='utf-8') as f:
- json.dump(converted_post, f, ensure_ascii=False, indent=2)
- print(f" ✓ Updated body_text with complete version from detail API")
- else:
- print(f" → Body text already complete, no update needed")
- self.detail_fetch_count += 1
- else:
- print(f" ✗ No valid data in detail response")
- self.detail_error_count += 1
- else:
- print(f" ✗ Unexpected detail response format")
- self.detail_error_count += 1
- except Exception as e:
- print(f" ✗ Failed to parse/update detail: {e}")
- self.detail_error_count += 1
- else:
- print(f" ✗ Failed to fetch detail")
- self.detail_error_count += 1
- else:
- print(f" → Body text has {char_count} chars (<= {self.detail_threshold}), skipping detail fetch")
- self.detail_skip_count += 1
- return True
- except Exception as e:
- print(f"✗ Failed to save post {post_id}: {e}")
- return False
- def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int:
- """
- Fetch all historical posts (with pagination support)
- Args:
- max_pages: Maximum pages to fetch, None means fetch all
- delay: Delay between requests in seconds
- Returns:
- Number of successfully saved posts
- """
- cursor = None
- page = 0
- total_saved = 0
- print(f"Starting to fetch historical posts for blogger: {self.account_id}")
- print("-" * 60)
- while True:
- page += 1
- print(f"\nPage {page}:")
- # Fetch data
- response_data = self.fetch_posts(cursor)
- if not response_data:
- print("Failed to fetch data, stopping")
- break
- # Extract posts list (adjust based on actual API response structure)
- posts = self._extract_posts(response_data)
- if not posts:
- print("No more posts, finished")
- break
- # Save posts
- print(f"Got {len(posts)} posts on this page")
- for idx, post in enumerate(posts):
- # Mark the first post overall (page 1, first post)
- is_first = (page == 1 and idx == 0)
- if self.save_post(post, is_first_post=is_first):
- total_saved += 1
- # Print output directory info after first post is saved
- if is_first:
- print(f"Output directory: {self.posts_dir}")
- # Check if there's a next page
- cursor = self._get_next_cursor(response_data)
- if not cursor:
- print("\nAll posts fetched")
- break
- # Check if max pages limit reached
- if max_pages and page >= max_pages:
- print(f"\nReached max pages limit ({max_pages} pages)")
- break
- # Delay to avoid too frequent requests
- if delay > 0:
- time.sleep(delay)
- print("-" * 60)
- print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}")
- # Print detail fetch statistics if auto_fetch_detail is enabled
- if self.auto_fetch_detail:
- print(f"\nBody Text Update Statistics:")
- print(f" ✓ Successfully updated: {self.detail_fetch_count}")
- print(f" → Skipped (text <= {self.detail_threshold} chars): {self.detail_skip_count}")
- print(f" ✗ Failed: {self.detail_error_count}")
- return total_saved
- def _extract_posts(self, response_data: Dict) -> List[Dict]:
- """
- Extract posts list from API response
- Args:
- response_data: API response data
- Returns:
- List of posts
- """
- try:
- # Check if API call was successful
- if not response_data.get('success'):
- print(f"API call failed: {response_data}")
- return []
- # Parse the result field (it's a JSON string)
- result_str = response_data.get('result', '{}')
- result = json.loads(result_str)
- # Check response code
- if result.get('code') != 0:
- print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}")
- return []
- # Extract posts from result.data.data
- data = result.get('data', {})
- posts = data.get('data', [])
- return posts if isinstance(posts, list) else []
- except json.JSONDecodeError as e:
- print(f"Failed to parse result JSON: {e}")
- return []
- except Exception as e:
- print(f"Error extracting posts: {e}")
- return []
- def _get_next_cursor(self, response_data: Dict) -> Optional[str]:
- """
- Extract next page cursor from API response
- Args:
- response_data: API response data
- Returns:
- Next page cursor, or None if no more pages
- """
- try:
- # Check if API call was successful
- if not response_data.get('success'):
- return None
- # Parse the result field (it's a JSON string)
- result_str = response_data.get('result', '{}')
- result = json.loads(result_str)
- # Extract cursor from result.data.next_cursor and check has_more
- data = result.get('data', {})
- has_more = data.get('has_more', False)
- next_cursor = data.get('next_cursor')
- # Only return cursor if there are more pages
- if has_more and next_cursor:
- return str(next_cursor)
- return None
- except Exception as e:
- print(f"Error extracting next cursor: {e}")
- return None
- def fetch_and_save_post_detail(self, note_id: str, output_path: Optional[str] = None) -> bool:
- """
- Fetch post detail by note_id and save to file
- Args:
- note_id: Xiaohongshu note ID
- output_path: Optional output file path, defaults to {note_id}_detail.json
- Returns:
- Whether fetch and save was successful
- """
- print(f"Fetching post detail for note_id: {note_id}")
- print("-" * 60)
- # Fetch post detail
- response_data = self.fetch_post_detail(note_id)
- if not response_data:
- print("✗ Failed to fetch post detail")
- return False
- try:
- # Check if API call was successful
- if not response_data.get('success'):
- print(f"✗ API call failed: {response_data}")
- return False
- # Parse the result field (it's a JSON string)
- result_str = response_data.get('result', '{}')
- result = json.loads(result_str)
- # Check response code
- if result.get('code') != 0:
- print(f"✗ API returned error code: {result.get('code')}, message: {result.get('msg')}")
- return False
- # Extract post data from result.data
- post_data = result.get('data', {})
- if not post_data:
- print("✗ No post data in response")
- return False
- # Determine output path
- if output_path is None:
- output_path = f"{note_id}_detail.json"
- # Save to file
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(post_data, f, ensure_ascii=False, indent=2)
- print(f"✓ Post detail saved to: {output_path}")
- print("-" * 60)
- return True
- except json.JSONDecodeError as e:
- print(f"✗ Failed to parse result JSON: {e}")
- return False
- except Exception as e:
- print(f"✗ Error saving post detail: {e}")
- return False
- def main():
- """Main function"""
- import argparse
- parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts or single post detail')
- # Create subparsers for different commands
- subparsers = parser.add_subparsers(dest='command', help='Command to execute')
- # Subparser for fetching historical posts
- posts_parser = subparsers.add_parser('posts', help='Fetch blogger historical posts')
- posts_parser.add_argument('account_id', help='Xiaohongshu blogger ID')
- posts_parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_name)')
- posts_parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)')
- posts_parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)')
- posts_parser.add_argument('--no-auto-detail', action='store_true', help='Disable auto fetching post details')
- posts_parser.add_argument('--detail-threshold', type=int, default=90,
- help='Minimum character count in body_text to trigger detail fetch (default: 90)')
- # Subparser for fetching single post detail
- detail_parser = subparsers.add_parser('detail', help='Fetch single post detail by note_id')
- detail_parser.add_argument('note_id', help='Xiaohongshu note ID')
- detail_parser.add_argument('-o', '--output', help='Output file path (default: {note_id}_detail.json)')
- args = parser.parse_args()
- # If no command specified, show help
- if not args.command:
- parser.print_help()
- return
- # Execute corresponding command
- if args.command == 'posts':
- # Create fetcher and execute
- auto_fetch_detail = not args.no_auto_detail # Invert the flag
- fetcher = XHSBloggerFetcher(
- args.account_id,
- args.output,
- auto_fetch_detail=auto_fetch_detail,
- detail_threshold=args.detail_threshold
- )
- fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay)
- elif args.command == 'detail':
- # Create a minimal fetcher instance (account_id not needed for detail fetching)
- fetcher = XHSBloggerFetcher(account_id='')
- fetcher.fetch_and_save_post_detail(args.note_id, args.output)
- if __name__ == '__main__':
- main()
|