| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- #!/usr/bin/env python3
- """
- 小红书数据获取工具模块
- 包含共用的API调用、数据处理等函数
- """
- import json
- import time
- from typing import Dict, List
- from datetime import datetime
- import requests
- # API配置
- BASE_URL = "http://47.84.182.56:8001"
- API_GET_DETAIL = f"{BASE_URL}/tools/call/get_xhs_detail_by_note_id"
- API_GET_HISTORY = f"{BASE_URL}/tools/call/get_xhs_history_note_list_by_account_id"
- def call_api(api_url: str, params: Dict, max_retries: int = 3) -> Dict:
- """
- 调用API(带重试机制)
- Args:
- api_url: API地址
- params: 请求参数
- max_retries: 最大重试次数
- Returns:
- 响应数据
- """
- for attempt in range(max_retries):
- try:
- print(f"调用API: {api_url},参数: {params} (尝试 {attempt + 1}/{max_retries})")
- print(params)
- response = requests.post(api_url, json=params, timeout=600)
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- if attempt < max_retries - 1:
- print(f" API调用失败,{2}秒后重试... (尝试 {attempt + 1}/{max_retries})")
- time.sleep(2)
- else:
- print(f"API调用失败: {e}")
- raise
- def get_note_detail(note_id: str) -> Dict:
- """
- 获取帖子详情
- Args:
- note_id: 帖子ID
- Returns:
- 帖子详情数据
- """
- params = {"note_id": note_id}
- result = call_api(API_GET_DETAIL, params)
- # 解析API返回的数据结构
- try:
- if result.get("success") and result.get("result"):
- # result字段是一个JSON字符串,需要解析
- result_data = json.loads(result["result"])
- if isinstance(result_data, list) and len(result_data) > 0:
- # 返回第一个元素的data字段
- return result_data[0].get("data", {})
- except:
- print(result)
- raise
- return {}
- def format_timestamp(timestamp_ms) -> str:
- """
- 将毫秒时间戳转换为年月日时分秒格式
- Args:
- timestamp_ms: 毫秒级时间戳
- Returns:
- 格式化的时间字符串 (YYYY-MM-DD HH:MM:SS)
- """
- try:
- if timestamp_ms:
- # 将毫秒时间戳转换为秒
- timestamp_s = int(timestamp_ms) / 1000
- dt = datetime.fromtimestamp(timestamp_s)
- return dt.strftime("%Y-%m-%d %H:%M:%S")
- except (ValueError, TypeError, OSError):
- pass
- return ""
- def get_author_history_notes(account_id: str) -> List[Dict]:
- """
- 获取作者历史帖子列表
- Args:
- account_id: 账号ID
- Returns:
- 历史帖子列表
- """
- params = {"account_id": account_id}
- result = call_api(API_GET_HISTORY, params)
- # 解析API返回的数据结构
- if result.get("success") and result.get("result"):
- # result字段是一个JSON字符串,需要解析
- result_data = json.loads(result["result"])
- if isinstance(result_data, list) and len(result_data) > 0:
- # 历史帖子API返回格式: [{'data': [note1, note2, ...]}]
- # 提取第一个元素的data字段,它是一个帖子列表
- first_item = result_data[0]
- if isinstance(first_item, dict) and "data" in first_item:
- data = first_item.get("data")
- if isinstance(data, list):
- return data
- return []
- def merge_note_data(history_data: Dict, detail_data: Dict) -> Dict:
- """
- 合并历史API和详情API的数据,优先使用历史API数据
- Args:
- history_data: 历史API返回的数据
- detail_data: 详情API返回的数据
- Returns:
- 合并后的数据
- """
- # 从历史数据提取基本信息
- note_id = history_data.get("note_id", "")
- # 优先使用详情API的完整链接(包含token),否则用note_id拼接简单链接
- if detail_data and detail_data.get("content_link"):
- link = detail_data.get("content_link")
- else:
- link = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else ""
- # 提取用户信息
- user_info = history_data.get("user", {})
- user_id = user_info.get("user_id", "") if isinstance(user_info, dict) else ""
- nickname = user_info.get("nickname", "") if isinstance(user_info, dict) else ""
- # 提取图片列表(优先使用历史API的图片)
- images = []
- if "image_url_list" in history_data and isinstance(history_data["image_url_list"], list):
- images = [img.get("cdn_url") or img.get("url", "") for img in history_data["image_url_list"]]
- elif "cover" in history_data and isinstance(history_data["cover"], dict):
- cover_url = history_data["cover"].get("cdn_url") or history_data["cover"].get("url", "")
- if cover_url:
- images.append(cover_url)
- # 如果历史API没有图片,尝试从详情API获取
- if detail_data:
- if "images" in detail_data and isinstance(detail_data["images"], list) and len(detail_data["images"]) > 0:
- images = [img.get("cdn_url") or img.get("url", "") for img in detail_data["images"]]
- # 去重:保留第一次出现的图片,过滤空字符串
- seen = set()
- unique_images = []
- for img_url in images:
- if img_url and img_url not in seen:
- seen.add(img_url)
- unique_images.append(img_url)
- images = unique_images
- # 提取发布时间戳(优先使用历史API数据)
- publish_timestamp = history_data.get("publish_timestamp") or (detail_data.get("publish_timestamp") if detail_data else None)
- publish_time = format_timestamp(publish_timestamp)
- # 优先使用历史API的数据,缺失时从详情API补充
- merged = {
- "channel_content_id": note_id,
- "link": link,
- "comment_count": history_data.get("comment_count", detail_data.get("comment_count", 0) if detail_data else 0),
- "images": images,
- "like_count": history_data.get("like_count", detail_data.get("like_count", 0) if detail_data else 0),
- "body_text": history_data.get("desc") or history_data.get("note_text") or (detail_data.get("body_text", "") if detail_data else ""),
- "title": history_data.get("title", detail_data.get("title", "") if detail_data else ""),
- "collect_count": history_data.get("collecte_count") or history_data.get("collect_count", detail_data.get("collect_count", 0) if detail_data else 0),
- "channel_account_id": user_id or (detail_data.get("channel_account_id", "") if detail_data else ""),
- "channel_account_name": nickname or (detail_data.get("channel_account_name", "") if detail_data else ""),
- "content_type": history_data.get("type", detail_data.get("content_type", "") if detail_data else ""),
- "video": history_data.get("video", detail_data.get("video", {}) if detail_data else {}),
- "publish_timestamp": publish_timestamp if publish_timestamp else 0,
- "publish_time": publish_time
- }
- return merged
- def transform_note_data(note_data: Dict) -> Dict:
- """
- 将详情API返回的数据转换为目标格式
- Args:
- note_data: 详情API返回的原始数据
- Returns:
- 转换后的数据
- """
- # 提取图片URL列表
- images = []
- if "images" in note_data and isinstance(note_data["images"], list):
- # 优先取cdn_url,否则取url
- images = [img.get("cdn_url") or img.get("url", "") for img in note_data["images"]]
- # 去重:保留第一次出现的图片,过滤空字符串
- seen = set()
- unique_images = []
- for img_url in images:
- if img_url and img_url not in seen:
- seen.add(img_url)
- unique_images.append(img_url)
- images = unique_images
- # 提取发布时间戳并格式化
- publish_timestamp = note_data.get("publish_timestamp")
- publish_time = format_timestamp(publish_timestamp)
- transformed = {
- "channel_content_id": note_data.get("channel_content_id", ""),
- "link": note_data.get("content_link", ""),
- "comment_count": note_data.get("comment_count", 0),
- "images": images,
- "like_count": note_data.get("like_count", 0),
- "body_text": note_data.get("body_text", ""),
- "title": note_data.get("title", ""),
- "collect_count": note_data.get("collect_count", 0),
- "channel_account_id": note_data.get("channel_account_id", ""),
- "channel_account_name": note_data.get("channel_account_name", ""),
- "content_type": note_data.get("content_type", ""),
- "video": note_data.get("video", {}),
- "publish_timestamp": publish_timestamp if publish_timestamp else 0,
- "publish_time": publish_time
- }
- return transformed
|