xhs_utils.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. #!/usr/bin/env python3
  2. """
  3. 小红书数据获取工具模块
  4. 包含共用的API调用、数据处理等函数
  5. """
  6. import json
  7. import time
  8. from typing import Dict, List
  9. from datetime import datetime
  10. import requests
  11. # API配置
  12. BASE_URL = "http://47.84.182.56:8001"
  13. API_GET_DETAIL = f"{BASE_URL}/tools/call/get_xhs_detail_by_note_id"
  14. API_GET_HISTORY = f"{BASE_URL}/tools/call/get_xhs_history_note_list_by_account_id"
  15. def call_api(api_url: str, params: Dict, max_retries: int = 3) -> Dict:
  16. """
  17. 调用API(带重试机制)
  18. Args:
  19. api_url: API地址
  20. params: 请求参数
  21. max_retries: 最大重试次数
  22. Returns:
  23. 响应数据
  24. """
  25. for attempt in range(max_retries):
  26. try:
  27. print(f"调用API: {api_url},参数: {params} (尝试 {attempt + 1}/{max_retries})")
  28. print(params)
  29. response = requests.post(api_url, json=params, timeout=600)
  30. response.raise_for_status()
  31. return response.json()
  32. except requests.exceptions.RequestException as e:
  33. if attempt < max_retries - 1:
  34. print(f" API调用失败,{2}秒后重试... (尝试 {attempt + 1}/{max_retries})")
  35. time.sleep(2)
  36. else:
  37. print(f"API调用失败: {e}")
  38. raise
  39. def get_note_detail(note_id: str) -> Dict:
  40. """
  41. 获取帖子详情
  42. Args:
  43. note_id: 帖子ID
  44. Returns:
  45. 帖子详情数据
  46. """
  47. params = {"note_id": note_id}
  48. result = call_api(API_GET_DETAIL, params)
  49. # 解析API返回的数据结构
  50. try:
  51. if result.get("success") and result.get("result"):
  52. # result字段是一个JSON字符串,需要解析
  53. result_data = json.loads(result["result"])
  54. if isinstance(result_data, list) and len(result_data) > 0:
  55. # 返回第一个元素的data字段
  56. return result_data[0].get("data", {})
  57. except:
  58. print(result)
  59. raise
  60. return {}
  61. def format_timestamp(timestamp_ms) -> str:
  62. """
  63. 将毫秒时间戳转换为年月日时分秒格式
  64. Args:
  65. timestamp_ms: 毫秒级时间戳
  66. Returns:
  67. 格式化的时间字符串 (YYYY-MM-DD HH:MM:SS)
  68. """
  69. try:
  70. if timestamp_ms:
  71. # 将毫秒时间戳转换为秒
  72. timestamp_s = int(timestamp_ms) / 1000
  73. dt = datetime.fromtimestamp(timestamp_s)
  74. return dt.strftime("%Y-%m-%d %H:%M:%S")
  75. except (ValueError, TypeError, OSError):
  76. pass
  77. return ""
  78. def get_author_history_notes(account_id: str) -> List[Dict]:
  79. """
  80. 获取作者历史帖子列表
  81. Args:
  82. account_id: 账号ID
  83. Returns:
  84. 历史帖子列表
  85. """
  86. params = {"account_id": account_id}
  87. result = call_api(API_GET_HISTORY, params)
  88. # 解析API返回的数据结构
  89. if result.get("success") and result.get("result"):
  90. # result字段是一个JSON字符串,需要解析
  91. result_data = json.loads(result["result"])
  92. if isinstance(result_data, list) and len(result_data) > 0:
  93. # 历史帖子API返回格式: [{'data': [note1, note2, ...]}]
  94. # 提取第一个元素的data字段,它是一个帖子列表
  95. first_item = result_data[0]
  96. if isinstance(first_item, dict) and "data" in first_item:
  97. data = first_item.get("data")
  98. if isinstance(data, list):
  99. return data
  100. return []
  101. def merge_note_data(history_data: Dict, detail_data: Dict) -> Dict:
  102. """
  103. 合并历史API和详情API的数据,优先使用历史API数据
  104. Args:
  105. history_data: 历史API返回的数据
  106. detail_data: 详情API返回的数据
  107. Returns:
  108. 合并后的数据
  109. """
  110. # 从历史数据提取基本信息
  111. note_id = history_data.get("note_id", "")
  112. # 优先使用详情API的完整链接(包含token),否则用note_id拼接简单链接
  113. if detail_data and detail_data.get("content_link"):
  114. link = detail_data.get("content_link")
  115. else:
  116. link = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else ""
  117. # 提取用户信息
  118. user_info = history_data.get("user", {})
  119. user_id = user_info.get("user_id", "") if isinstance(user_info, dict) else ""
  120. nickname = user_info.get("nickname", "") if isinstance(user_info, dict) else ""
  121. # 提取图片列表(优先使用历史API的图片)
  122. images = []
  123. if "image_url_list" in history_data and isinstance(history_data["image_url_list"], list):
  124. images = [img.get("cdn_url") or img.get("url", "") for img in history_data["image_url_list"]]
  125. elif "cover" in history_data and isinstance(history_data["cover"], dict):
  126. cover_url = history_data["cover"].get("cdn_url") or history_data["cover"].get("url", "")
  127. if cover_url:
  128. images.append(cover_url)
  129. # 如果历史API没有图片,尝试从详情API获取
  130. if detail_data:
  131. if "images" in detail_data and isinstance(detail_data["images"], list) and len(detail_data["images"]) > 0:
  132. images = [img.get("cdn_url") or img.get("url", "") for img in detail_data["images"]]
  133. # 去重:保留第一次出现的图片,过滤空字符串
  134. seen = set()
  135. unique_images = []
  136. for img_url in images:
  137. if img_url and img_url not in seen:
  138. seen.add(img_url)
  139. unique_images.append(img_url)
  140. images = unique_images
  141. # 提取发布时间戳(优先使用历史API数据)
  142. publish_timestamp = history_data.get("publish_timestamp") or (detail_data.get("publish_timestamp") if detail_data else None)
  143. publish_time = format_timestamp(publish_timestamp)
  144. # 优先使用历史API的数据,缺失时从详情API补充
  145. merged = {
  146. "channel_content_id": note_id,
  147. "link": link,
  148. "comment_count": history_data.get("comment_count", detail_data.get("comment_count", 0) if detail_data else 0),
  149. "images": images,
  150. "like_count": history_data.get("like_count", detail_data.get("like_count", 0) if detail_data else 0),
  151. "body_text": history_data.get("desc") or history_data.get("note_text") or (detail_data.get("body_text", "") if detail_data else ""),
  152. "title": history_data.get("title", detail_data.get("title", "") if detail_data else ""),
  153. "collect_count": history_data.get("collecte_count") or history_data.get("collect_count", detail_data.get("collect_count", 0) if detail_data else 0),
  154. "channel_account_id": user_id or (detail_data.get("channel_account_id", "") if detail_data else ""),
  155. "channel_account_name": nickname or (detail_data.get("channel_account_name", "") if detail_data else ""),
  156. "content_type": history_data.get("type", detail_data.get("content_type", "") if detail_data else ""),
  157. "video": history_data.get("video", detail_data.get("video", {}) if detail_data else {}),
  158. "publish_timestamp": publish_timestamp if publish_timestamp else 0,
  159. "publish_time": publish_time
  160. }
  161. return merged
  162. def transform_note_data(note_data: Dict) -> Dict:
  163. """
  164. 将详情API返回的数据转换为目标格式
  165. Args:
  166. note_data: 详情API返回的原始数据
  167. Returns:
  168. 转换后的数据
  169. """
  170. # 提取图片URL列表
  171. images = []
  172. if "images" in note_data and isinstance(note_data["images"], list):
  173. # 优先取cdn_url,否则取url
  174. images = [img.get("cdn_url") or img.get("url", "") for img in note_data["images"]]
  175. # 去重:保留第一次出现的图片,过滤空字符串
  176. seen = set()
  177. unique_images = []
  178. for img_url in images:
  179. if img_url and img_url not in seen:
  180. seen.add(img_url)
  181. unique_images.append(img_url)
  182. images = unique_images
  183. # 提取发布时间戳并格式化
  184. publish_timestamp = note_data.get("publish_timestamp")
  185. publish_time = format_timestamp(publish_timestamp)
  186. transformed = {
  187. "channel_content_id": note_data.get("channel_content_id", ""),
  188. "link": note_data.get("content_link", ""),
  189. "comment_count": note_data.get("comment_count", 0),
  190. "images": images,
  191. "like_count": note_data.get("like_count", 0),
  192. "body_text": note_data.get("body_text", ""),
  193. "title": note_data.get("title", ""),
  194. "collect_count": note_data.get("collect_count", 0),
  195. "channel_account_id": note_data.get("channel_account_id", ""),
  196. "channel_account_name": note_data.get("channel_account_name", ""),
  197. "content_type": note_data.get("content_type", ""),
  198. "video": note_data.get("video", {}),
  199. "publish_timestamp": publish_timestamp if publish_timestamp else 0,
  200. "publish_time": publish_time
  201. }
  202. return transformed