fetch.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Xiaohongshu Blogger Historical Posts Fetcher
  5. Features:
  6. 1. Input Xiaohongshu author ID
  7. 2. Call API to get blogger's homepage info and historical posts
  8. 3. Support pagination
  9. 4. Output to author directory's historical posts folder
  10. 5. Filename is post ID with .json extension
  11. """
  12. import requests
  13. import json
  14. import os
  15. import time
  16. from typing import Optional, Dict, List
  17. class XHSBloggerFetcher:
  18. """Xiaohongshu Blogger Historical Posts Fetcher"""
  19. API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts"
  20. def __init__(self, account_id: str, output_dir: Optional[str] = None):
  21. """
  22. Initialize fetcher
  23. Args:
  24. account_id: Xiaohongshu blogger's ID
  25. output_dir: Output directory path, defaults to current dir + account_name
  26. """
  27. self.account_id = account_id
  28. self.account_name = None # Will be set from first post's channel_account_name
  29. self.output_dir = output_dir
  30. self.posts_dir = None
  31. self.first_post_saved = False # Track if first post has been copied
  32. def fetch_posts(self, cursor: Optional[str] = None) -> Dict:
  33. """
  34. Fetch blogger's historical posts
  35. Args:
  36. cursor: Pagination cursor, not passed for first request
  37. Returns:
  38. API response data
  39. """
  40. payload = {
  41. "account_id": self.account_id
  42. }
  43. if cursor:
  44. payload["cursor"] = cursor
  45. try:
  46. response = requests.post(
  47. self.API_URL,
  48. json=payload,
  49. headers={"Content-Type": "application/json"},
  50. timeout=30
  51. )
  52. response.raise_for_status()
  53. return response.json()
  54. except requests.exceptions.RequestException as e:
  55. print(f"API request failed: {e}")
  56. return {}
  57. def _convert_post_format(self, post: Dict) -> Dict:
  58. """
  59. Convert API post format to standard format
  60. Args:
  61. post: Original post data from API
  62. Returns:
  63. Converted post data in standard format
  64. """
  65. from datetime import datetime
  66. # Extract image URLs from image_url_list
  67. images = []
  68. for img in post.get("image_url_list", []):
  69. if isinstance(img, dict):
  70. images.append(img.get("image_url", ""))
  71. elif isinstance(img, str):
  72. images.append(img)
  73. # Extract video URL from video_url_list
  74. video_list = post.get("video_url_list", [])
  75. video = ""
  76. if video_list:
  77. if isinstance(video_list[0], dict):
  78. video = video_list[0].get("video_url", "")
  79. elif isinstance(video_list[0], str):
  80. video = video_list[0]
  81. # Convert timestamp to datetime string
  82. publish_timestamp = post.get("publish_timestamp", 0)
  83. publish_time = ""
  84. if publish_timestamp:
  85. try:
  86. dt = datetime.fromtimestamp(publish_timestamp / 1000)
  87. publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
  88. except:
  89. publish_time = ""
  90. # Build standard format
  91. converted = {
  92. "channel_content_id": post.get("channel_content_id"),
  93. "link": post.get("content_link", ""),
  94. "comment_count": post.get("comment_count", 0),
  95. "images": images,
  96. "like_count": post.get("like_count", 0),
  97. "body_text": post.get("body_text", ""),
  98. "title": post.get("title", ""),
  99. "collect_count": post.get("collect_count", 0),
  100. "channel_account_id": post.get("channel_account_id", ""),
  101. "channel_account_name": post.get("channel_account_name", ""),
  102. "content_type": post.get("content_type", "note"),
  103. "video": video,
  104. "publish_timestamp": publish_timestamp,
  105. "publish_time": publish_time
  106. }
  107. return converted
  108. def _initialize_directories(self, account_name: str):
  109. """
  110. Initialize output directories using account name
  111. Args:
  112. account_name: Account name from first post
  113. """
  114. if self.posts_dir is not None:
  115. return # Already initialized
  116. self.account_name = account_name
  117. # Use provided output_dir or default to current dir + account_name
  118. if self.output_dir is None:
  119. self.output_dir = os.path.join(os.getcwd(), account_name)
  120. self.posts_dir = os.path.join(self.output_dir, "作者历史帖子")
  121. # Ensure output directory exists
  122. os.makedirs(self.posts_dir, exist_ok=True)
  123. def save_post(self, post: Dict, is_first_post: bool = False) -> bool:
  124. """
  125. Save single post to JSON file
  126. Args:
  127. post: Post data
  128. is_first_post: Whether this is the first post (will be copied to parent directory)
  129. Returns:
  130. Whether save was successful
  131. """
  132. # Initialize directories on first post using account name
  133. if self.posts_dir is None:
  134. account_name = post.get("channel_account_name")
  135. if not account_name:
  136. print(f"Warning: Post data missing channel_account_name field, using account_id instead")
  137. account_name = self.account_id
  138. self._initialize_directories(account_name)
  139. # Get post ID as filename
  140. post_id = post.get("channel_content_id")
  141. if not post_id:
  142. print(f"Warning: Post data missing channel_content_id field, skipping")
  143. return False
  144. file_path = os.path.join(self.posts_dir, f"{post_id}.json")
  145. try:
  146. # Convert to standard format before saving
  147. converted_post = self._convert_post_format(post)
  148. with open(file_path, 'w', encoding='utf-8') as f:
  149. json.dump(converted_post, f, ensure_ascii=False, indent=2)
  150. print(f"✓ Saved post: {post_id}.json")
  151. # If this is the first post, copy it to parent directory as "待解构帖子.json"
  152. if is_first_post and not self.first_post_saved:
  153. target_path = os.path.join(self.output_dir, "待解构帖子.json")
  154. with open(target_path, 'w', encoding='utf-8') as f:
  155. json.dump(converted_post, f, ensure_ascii=False, indent=2)
  156. print(f"✓ Copied first post to: 待解构帖子.json")
  157. self.first_post_saved = True
  158. return True
  159. except Exception as e:
  160. print(f"✗ Failed to save post {post_id}: {e}")
  161. return False
  162. def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int:
  163. """
  164. Fetch all historical posts (with pagination support)
  165. Args:
  166. max_pages: Maximum pages to fetch, None means fetch all
  167. delay: Delay between requests in seconds
  168. Returns:
  169. Number of successfully saved posts
  170. """
  171. cursor = None
  172. page = 0
  173. total_saved = 0
  174. print(f"Starting to fetch historical posts for blogger: {self.account_id}")
  175. print("-" * 60)
  176. while True:
  177. page += 1
  178. print(f"\nPage {page}:")
  179. # Fetch data
  180. response_data = self.fetch_posts(cursor)
  181. if not response_data:
  182. print("Failed to fetch data, stopping")
  183. break
  184. # Extract posts list (adjust based on actual API response structure)
  185. posts = self._extract_posts(response_data)
  186. if not posts:
  187. print("No more posts, finished")
  188. break
  189. # Save posts
  190. print(f"Got {len(posts)} posts on this page")
  191. for idx, post in enumerate(posts):
  192. # Mark the first post overall (page 1, first post)
  193. is_first = (page == 1 and idx == 0)
  194. if self.save_post(post, is_first_post=is_first):
  195. total_saved += 1
  196. # Print output directory info after first post is saved
  197. if is_first:
  198. print(f"Output directory: {self.posts_dir}")
  199. # Check if there's a next page
  200. cursor = self._get_next_cursor(response_data)
  201. if not cursor:
  202. print("\nAll posts fetched")
  203. break
  204. # Check if max pages limit reached
  205. if max_pages and page >= max_pages:
  206. print(f"\nReached max pages limit ({max_pages} pages)")
  207. break
  208. # Delay to avoid too frequent requests
  209. if delay > 0:
  210. time.sleep(delay)
  211. print("-" * 60)
  212. print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}")
  213. return total_saved
  214. def _extract_posts(self, response_data: Dict) -> List[Dict]:
  215. """
  216. Extract posts list from API response
  217. Args:
  218. response_data: API response data
  219. Returns:
  220. List of posts
  221. """
  222. try:
  223. # Check if API call was successful
  224. if not response_data.get('success'):
  225. print(f"API call failed: {response_data}")
  226. return []
  227. # Parse the result field (it's a JSON string)
  228. result_str = response_data.get('result', '{}')
  229. result = json.loads(result_str)
  230. # Check response code
  231. if result.get('code') != 0:
  232. print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}")
  233. return []
  234. # Extract posts from result.data.data
  235. data = result.get('data', {})
  236. posts = data.get('data', [])
  237. return posts if isinstance(posts, list) else []
  238. except json.JSONDecodeError as e:
  239. print(f"Failed to parse result JSON: {e}")
  240. return []
  241. except Exception as e:
  242. print(f"Error extracting posts: {e}")
  243. return []
  244. def _get_next_cursor(self, response_data: Dict) -> Optional[str]:
  245. """
  246. Extract next page cursor from API response
  247. Args:
  248. response_data: API response data
  249. Returns:
  250. Next page cursor, or None if no more pages
  251. """
  252. try:
  253. # Check if API call was successful
  254. if not response_data.get('success'):
  255. return None
  256. # Parse the result field (it's a JSON string)
  257. result_str = response_data.get('result', '{}')
  258. result = json.loads(result_str)
  259. # Extract cursor from result.data.next_cursor and check has_more
  260. data = result.get('data', {})
  261. has_more = data.get('has_more', False)
  262. next_cursor = data.get('next_cursor')
  263. # Only return cursor if there are more pages
  264. if has_more and next_cursor:
  265. return str(next_cursor)
  266. return None
  267. except Exception as e:
  268. print(f"Error extracting next cursor: {e}")
  269. return None
  270. def main():
  271. """Main function"""
  272. import argparse
  273. parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts')
  274. parser.add_argument('account_id', help='Xiaohongshu blogger ID')
  275. parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_id)')
  276. parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)')
  277. parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)')
  278. args = parser.parse_args()
  279. # Create fetcher and execute
  280. fetcher = XHSBloggerFetcher(args.account_id, args.output)
  281. fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay)
  282. if __name__ == '__main__':
  283. main()