fetch.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Xiaohongshu Blogger Historical Posts Fetcher
  5. Features:
  6. 1. Fetch blogger's historical posts (posts command)
  7. - Input Xiaohongshu author ID
  8. - Call API to get blogger's homepage info and historical posts
  9. - Support pagination
  10. - Output to author directory's historical posts folder
  11. - Filename is post ID with .json extension
  12. - Auto fetch post details when body_text > threshold (default: 90 chars)
  13. - Update original post file's body_text with complete version from detail API
  14. 2. Fetch single post detail (detail command)
  15. - Input Xiaohongshu note ID
  16. - Call API to get post detail
  17. - Save to specified output file
  18. Usage:
  19. # Fetch historical posts (with auto body_text update)
  20. python fetch.py posts <account_id> [-o output_dir] [-m max_pages] [-d delay]
  21. # Fetch historical posts without auto detail fetching
  22. python fetch.py posts <account_id> --no-auto-detail
  23. # Fetch historical posts with custom threshold (e.g., 120 chars)
  24. python fetch.py posts <account_id> --detail-threshold 120
  25. # Fetch single post detail
  26. python fetch.py detail <note_id> [-o output_file]
  27. """
  28. import requests
  29. import json
  30. import os
  31. import time
  32. from typing import Optional, Dict, List
  33. class XHSBloggerFetcher:
  34. """Xiaohongshu Blogger Historical Posts Fetcher"""
  35. API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_blogger_historical_posts"
  36. DETAIL_API_URL = "http://47.84.182.56:8001/tools/call/get_xhs_detail_by_note_id"
  37. def __init__(self, account_id: str, output_dir: Optional[str] = None,
  38. auto_fetch_detail: bool = True, detail_threshold: int = 90):
  39. """
  40. Initialize fetcher
  41. Args:
  42. account_id: Xiaohongshu blogger's ID
  43. output_dir: Output directory path, defaults to current dir + account_name
  44. auto_fetch_detail: Whether to auto fetch detail for posts with body_text > threshold
  45. detail_threshold: Minimum character count in body_text to trigger detail fetch (default: 90)
  46. """
  47. self.account_id = account_id
  48. self.account_name = None # Will be set from first post's channel_account_name
  49. self.output_dir = output_dir
  50. self.posts_dir = None
  51. self.first_post_saved = False # Track if first post has been copied
  52. self.auto_fetch_detail = auto_fetch_detail
  53. self.detail_threshold = detail_threshold
  54. # Statistics
  55. self.detail_fetch_count = 0 # Successfully fetched and updated details
  56. self.detail_skip_count = 0 # Skipped due to low character count
  57. self.detail_error_count = 0 # Failed to fetch details
  58. def fetch_posts(self, cursor: Optional[str] = None) -> Dict:
  59. """
  60. Fetch blogger's historical posts
  61. Args:
  62. cursor: Pagination cursor, not passed for first request
  63. Returns:
  64. API response data
  65. """
  66. payload = {
  67. "account_id": self.account_id
  68. }
  69. if cursor:
  70. payload["cursor"] = cursor
  71. try:
  72. response = requests.post(
  73. self.API_URL,
  74. json=payload,
  75. headers={"Content-Type": "application/json"},
  76. timeout=30
  77. )
  78. response.raise_for_status()
  79. return response.json()
  80. except requests.exceptions.RequestException as e:
  81. print(f"API request failed: {e}")
  82. return {}
  83. def fetch_post_detail(self, note_id: str) -> Dict:
  84. """
  85. Fetch single post detail by note ID
  86. Args:
  87. note_id: Xiaohongshu note ID
  88. Returns:
  89. API response data containing post details
  90. """
  91. payload = {
  92. "note_id": note_id
  93. }
  94. try:
  95. response = requests.post(
  96. self.DETAIL_API_URL,
  97. json=payload,
  98. headers={"Content-Type": "application/json"},
  99. timeout=30
  100. )
  101. response.raise_for_status()
  102. return response.json()
  103. except requests.exceptions.RequestException as e:
  104. print(f"API request failed: {e}")
  105. return {}
  106. def _convert_post_format(self, post: Dict) -> Dict:
  107. """
  108. Convert API post format to standard format
  109. Args:
  110. post: Original post data from API
  111. Returns:
  112. Converted post data in standard format
  113. """
  114. from datetime import datetime
  115. # Extract image URLs from image_url_list
  116. images = []
  117. for img in post.get("image_url_list", []):
  118. if isinstance(img, dict):
  119. images.append(img.get("image_url", ""))
  120. elif isinstance(img, str):
  121. images.append(img)
  122. # Extract video URL from video_url_list
  123. video_list = post.get("video_url_list", [])
  124. video = ""
  125. if video_list:
  126. if isinstance(video_list[0], dict):
  127. video = video_list[0].get("video_url", "")
  128. elif isinstance(video_list[0], str):
  129. video = video_list[0]
  130. # Convert timestamp to datetime string
  131. publish_timestamp = post.get("publish_timestamp", 0)
  132. publish_time = ""
  133. if publish_timestamp:
  134. try:
  135. dt = datetime.fromtimestamp(publish_timestamp / 1000)
  136. publish_time = dt.strftime("%Y-%m-%d %H:%M:%S")
  137. except:
  138. publish_time = ""
  139. # Build standard format
  140. converted = {
  141. "channel_content_id": post.get("channel_content_id"),
  142. "link": post.get("content_link", ""),
  143. "comment_count": post.get("comment_count", 0),
  144. "images": images,
  145. "like_count": post.get("like_count", 0),
  146. "body_text": post.get("body_text", ""),
  147. "title": post.get("title", ""),
  148. "collect_count": post.get("collect_count", 0),
  149. "channel_account_id": post.get("channel_account_id", ""),
  150. "channel_account_name": post.get("channel_account_name", ""),
  151. "content_type": post.get("content_type", "note"),
  152. "video": video,
  153. "publish_timestamp": publish_timestamp,
  154. "publish_time": publish_time
  155. }
  156. return converted
  157. def _initialize_directories(self, account_name: str):
  158. """
  159. Initialize output directories using account name
  160. Args:
  161. account_name: Account name from first post
  162. """
  163. if self.posts_dir is not None:
  164. return # Already initialized
  165. self.account_name = account_name
  166. # Use provided output_dir or default to current dir + account_name
  167. if self.output_dir is None:
  168. self.output_dir = os.path.join(os.getcwd(), account_name)
  169. self.posts_dir = os.path.join(self.output_dir, "作者历史帖子")
  170. # Ensure output directory exists
  171. os.makedirs(self.posts_dir, exist_ok=True)
  172. def _count_body_text_chars(self, body_text: str) -> int:
  173. """
  174. Count characters in body_text
  175. Args:
  176. body_text: The post body text
  177. Returns:
  178. Total number of characters (including all whitespace, emoji, etc.)
  179. """
  180. if not body_text:
  181. return 0
  182. # Return the total length of the string
  183. return len(body_text)
  184. def save_post(self, post: Dict, is_first_post: bool = False) -> bool:
  185. """
  186. Save single post to JSON file
  187. Args:
  188. post: Post data
  189. is_first_post: Whether this is the first post (will be copied to parent directory)
  190. Returns:
  191. Whether save was successful
  192. """
  193. # Initialize directories on first post using account name
  194. if self.posts_dir is None:
  195. account_name = post.get("channel_account_name")
  196. if not account_name:
  197. print(f"Warning: Post data missing channel_account_name field, using account_id instead")
  198. account_name = self.account_id
  199. self._initialize_directories(account_name)
  200. # Get post ID as filename
  201. post_id = post.get("channel_content_id")
  202. if not post_id:
  203. print(f"Warning: Post data missing channel_content_id field, skipping")
  204. return False
  205. file_path = os.path.join(self.posts_dir, f"{post_id}.json")
  206. try:
  207. # Convert to standard format before saving
  208. converted_post = self._convert_post_format(post)
  209. with open(file_path, 'w', encoding='utf-8') as f:
  210. json.dump(converted_post, f, ensure_ascii=False, indent=2)
  211. print(f"✓ Saved post: {post_id}.json")
  212. # If this is the first post, copy it to parent directory as "待解构帖子.json"
  213. if is_first_post and not self.first_post_saved:
  214. target_path = os.path.join(self.output_dir, "待解构帖子.json")
  215. with open(target_path, 'w', encoding='utf-8') as f:
  216. json.dump(converted_post, f, ensure_ascii=False, indent=2)
  217. print(f"✓ Copied first post to: 待解构帖子.json")
  218. self.first_post_saved = True
  219. # Auto fetch post detail if body_text exceeds threshold
  220. if self.auto_fetch_detail:
  221. body_text = converted_post.get("body_text", "")
  222. char_count = self._count_body_text_chars(body_text)
  223. if char_count > self.detail_threshold:
  224. print(f" → Body text has {char_count} chars (> {self.detail_threshold}), fetching detail...")
  225. # Fetch detail
  226. response_data = self.fetch_post_detail(post_id)
  227. if response_data and response_data.get('success'):
  228. try:
  229. # Parse the result field (it's a JSON string containing a list)
  230. result_str = response_data.get('result', '[]')
  231. result_list = json.loads(result_str)
  232. # The result is a list with one item, which contains 'data' field
  233. if isinstance(result_list, list) and len(result_list) > 0:
  234. detail_data = result_list[0].get('data', {})
  235. if detail_data and 'body_text' in detail_data:
  236. # Update the original post file with detailed body_text
  237. detail_body_text = detail_data.get('body_text', '')
  238. if detail_body_text and detail_body_text != body_text:
  239. converted_post['body_text'] = detail_body_text
  240. # Re-save the post file with updated body_text
  241. with open(file_path, 'w', encoding='utf-8') as f:
  242. json.dump(converted_post, f, ensure_ascii=False, indent=2)
  243. print(f" ✓ Updated body_text with complete version from detail API")
  244. else:
  245. print(f" → Body text already complete, no update needed")
  246. self.detail_fetch_count += 1
  247. else:
  248. print(f" ✗ No valid data in detail response")
  249. self.detail_error_count += 1
  250. else:
  251. print(f" ✗ Unexpected detail response format")
  252. self.detail_error_count += 1
  253. except Exception as e:
  254. print(f" ✗ Failed to parse/update detail: {e}")
  255. self.detail_error_count += 1
  256. else:
  257. print(f" ✗ Failed to fetch detail")
  258. self.detail_error_count += 1
  259. else:
  260. print(f" → Body text has {char_count} chars (<= {self.detail_threshold}), skipping detail fetch")
  261. self.detail_skip_count += 1
  262. return True
  263. except Exception as e:
  264. print(f"✗ Failed to save post {post_id}: {e}")
  265. return False
  266. def fetch_all_posts(self, max_pages: Optional[int] = None, delay: float = 1.0) -> int:
  267. """
  268. Fetch all historical posts (with pagination support)
  269. Args:
  270. max_pages: Maximum pages to fetch, None means fetch all
  271. delay: Delay between requests in seconds
  272. Returns:
  273. Number of successfully saved posts
  274. """
  275. cursor = None
  276. page = 0
  277. total_saved = 0
  278. print(f"Starting to fetch historical posts for blogger: {self.account_id}")
  279. print("-" * 60)
  280. while True:
  281. page += 1
  282. print(f"\nPage {page}:")
  283. # Fetch data
  284. response_data = self.fetch_posts(cursor)
  285. if not response_data:
  286. print("Failed to fetch data, stopping")
  287. break
  288. # Extract posts list (adjust based on actual API response structure)
  289. posts = self._extract_posts(response_data)
  290. if not posts:
  291. print("No more posts, finished")
  292. break
  293. # Save posts
  294. print(f"Got {len(posts)} posts on this page")
  295. for idx, post in enumerate(posts):
  296. # Mark the first post overall (page 1, first post)
  297. is_first = (page == 1 and idx == 0)
  298. if self.save_post(post, is_first_post=is_first):
  299. total_saved += 1
  300. # Print output directory info after first post is saved
  301. if is_first:
  302. print(f"Output directory: {self.posts_dir}")
  303. # Check if there's a next page
  304. cursor = self._get_next_cursor(response_data)
  305. if not cursor:
  306. print("\nAll posts fetched")
  307. break
  308. # Check if max pages limit reached
  309. if max_pages and page >= max_pages:
  310. print(f"\nReached max pages limit ({max_pages} pages)")
  311. break
  312. # Delay to avoid too frequent requests
  313. if delay > 0:
  314. time.sleep(delay)
  315. print("-" * 60)
  316. print(f"✓ Done! Saved {total_saved} posts to: {self.posts_dir}")
  317. # Print detail fetch statistics if auto_fetch_detail is enabled
  318. if self.auto_fetch_detail:
  319. print(f"\nBody Text Update Statistics:")
  320. print(f" ✓ Successfully updated: {self.detail_fetch_count}")
  321. print(f" → Skipped (text <= {self.detail_threshold} chars): {self.detail_skip_count}")
  322. print(f" ✗ Failed: {self.detail_error_count}")
  323. return total_saved
  324. def _extract_posts(self, response_data: Dict) -> List[Dict]:
  325. """
  326. Extract posts list from API response
  327. Args:
  328. response_data: API response data
  329. Returns:
  330. List of posts
  331. """
  332. try:
  333. # Check if API call was successful
  334. if not response_data.get('success'):
  335. print(f"API call failed: {response_data}")
  336. return []
  337. # Parse the result field (it's a JSON string)
  338. result_str = response_data.get('result', '{}')
  339. result = json.loads(result_str)
  340. # Check response code
  341. if result.get('code') != 0:
  342. print(f"API returned error code: {result.get('code')}, message: {result.get('msg')}")
  343. return []
  344. # Extract posts from result.data.data
  345. data = result.get('data', {})
  346. posts = data.get('data', [])
  347. return posts if isinstance(posts, list) else []
  348. except json.JSONDecodeError as e:
  349. print(f"Failed to parse result JSON: {e}")
  350. return []
  351. except Exception as e:
  352. print(f"Error extracting posts: {e}")
  353. return []
  354. def _get_next_cursor(self, response_data: Dict) -> Optional[str]:
  355. """
  356. Extract next page cursor from API response
  357. Args:
  358. response_data: API response data
  359. Returns:
  360. Next page cursor, or None if no more pages
  361. """
  362. try:
  363. # Check if API call was successful
  364. if not response_data.get('success'):
  365. return None
  366. # Parse the result field (it's a JSON string)
  367. result_str = response_data.get('result', '{}')
  368. result = json.loads(result_str)
  369. # Extract cursor from result.data.next_cursor and check has_more
  370. data = result.get('data', {})
  371. has_more = data.get('has_more', False)
  372. next_cursor = data.get('next_cursor')
  373. # Only return cursor if there are more pages
  374. if has_more and next_cursor:
  375. return str(next_cursor)
  376. return None
  377. except Exception as e:
  378. print(f"Error extracting next cursor: {e}")
  379. return None
  380. def fetch_and_save_post_detail(self, note_id: str, output_path: Optional[str] = None) -> bool:
  381. """
  382. Fetch post detail by note_id and save to file
  383. Args:
  384. note_id: Xiaohongshu note ID
  385. output_path: Optional output file path, defaults to {note_id}_detail.json
  386. Returns:
  387. Whether fetch and save was successful
  388. """
  389. print(f"Fetching post detail for note_id: {note_id}")
  390. print("-" * 60)
  391. # Fetch post detail
  392. response_data = self.fetch_post_detail(note_id)
  393. if not response_data:
  394. print("✗ Failed to fetch post detail")
  395. return False
  396. try:
  397. # Check if API call was successful
  398. if not response_data.get('success'):
  399. print(f"✗ API call failed: {response_data}")
  400. return False
  401. # Parse the result field (it's a JSON string)
  402. result_str = response_data.get('result', '{}')
  403. result = json.loads(result_str)
  404. # Check response code
  405. if result.get('code') != 0:
  406. print(f"✗ API returned error code: {result.get('code')}, message: {result.get('msg')}")
  407. return False
  408. # Extract post data from result.data
  409. post_data = result.get('data', {})
  410. if not post_data:
  411. print("✗ No post data in response")
  412. return False
  413. # Determine output path
  414. if output_path is None:
  415. output_path = f"{note_id}_detail.json"
  416. # Save to file
  417. with open(output_path, 'w', encoding='utf-8') as f:
  418. json.dump(post_data, f, ensure_ascii=False, indent=2)
  419. print(f"✓ Post detail saved to: {output_path}")
  420. print("-" * 60)
  421. return True
  422. except json.JSONDecodeError as e:
  423. print(f"✗ Failed to parse result JSON: {e}")
  424. return False
  425. except Exception as e:
  426. print(f"✗ Error saving post detail: {e}")
  427. return False
  428. def main():
  429. """Main function"""
  430. import argparse
  431. parser = argparse.ArgumentParser(description='Fetch Xiaohongshu blogger historical posts or single post detail')
  432. # Create subparsers for different commands
  433. subparsers = parser.add_subparsers(dest='command', help='Command to execute')
  434. # Subparser for fetching historical posts
  435. posts_parser = subparsers.add_parser('posts', help='Fetch blogger historical posts')
  436. posts_parser.add_argument('account_id', help='Xiaohongshu blogger ID')
  437. posts_parser.add_argument('-o', '--output', help='Output directory path (default: current dir + account_name)')
  438. posts_parser.add_argument('-m', '--max-pages', type=int, help='Maximum pages to fetch (default: fetch all)')
  439. posts_parser.add_argument('-d', '--delay', type=float, default=1.0, help='Request delay in seconds (default: 1.0)')
  440. posts_parser.add_argument('--no-auto-detail', action='store_true', help='Disable auto fetching post details')
  441. posts_parser.add_argument('--detail-threshold', type=int, default=90,
  442. help='Minimum character count in body_text to trigger detail fetch (default: 90)')
  443. # Subparser for fetching single post detail
  444. detail_parser = subparsers.add_parser('detail', help='Fetch single post detail by note_id')
  445. detail_parser.add_argument('note_id', help='Xiaohongshu note ID')
  446. detail_parser.add_argument('-o', '--output', help='Output file path (default: {note_id}_detail.json)')
  447. args = parser.parse_args()
  448. # If no command specified, show help
  449. if not args.command:
  450. parser.print_help()
  451. return
  452. # Execute corresponding command
  453. if args.command == 'posts':
  454. # Create fetcher and execute
  455. auto_fetch_detail = not args.no_auto_detail # Invert the flag
  456. fetcher = XHSBloggerFetcher(
  457. args.account_id,
  458. args.output,
  459. auto_fetch_detail=auto_fetch_detail,
  460. detail_threshold=args.detail_threshold
  461. )
  462. fetcher.fetch_all_posts(max_pages=args.max_pages, delay=args.delay)
  463. elif args.command == 'detail':
  464. # Create a minimal fetcher instance (account_id not needed for detail fetching)
  465. fetcher = XHSBloggerFetcher(account_id='')
  466. fetcher.fetch_and_save_post_detail(args.note_id, args.output)
  467. if __name__ == '__main__':
  468. main()