stage7_api_client.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Stage 7 API 客户端
  5. 用于调用深度解构分析 API
  6. """
  7. import time
  8. import logging
  9. import requests
  10. from datetime import datetime
  11. from typing import Dict, List, Any, Optional
  12. logger = logging.getLogger(__name__)
  13. def map_note_to_api_format(
  14. note: Dict,
  15. note_card: Dict,
  16. evaluation: Dict,
  17. search_word: str,
  18. original_feature: str,
  19. start_points: List[str],
  20. processed_image_urls: Optional[List[str]] = None
  21. ) -> Dict:
  22. """
  23. 将小红书笔记数据映射为 API 所需格式
  24. Args:
  25. note: 笔记原始数据
  26. note_card: 笔记卡片信息
  27. evaluation: 评估结果
  28. search_word: 搜索词
  29. original_feature: 原始特征
  30. start_points: 起点列表
  31. processed_image_urls: 处理后的图片URL列表(如果提供,将替代原始URL)
  32. Returns:
  33. API 请求格式的数据
  34. """
  35. # 构建小红书链接
  36. note_id = note.get('id', '')
  37. link = f"https://www.xiaohongshu.com/explore/{note_id}"
  38. # 获取用户信息
  39. user = note_card.get('user', {})
  40. interact_info = note_card.get('interact_info', {})
  41. # 获取发布时间(需要转换为毫秒时间戳)
  42. publish_ts = note_card.get('publish_timestamp', 0)
  43. publish_ts_ms = publish_ts * 1000 if publish_ts else 0
  44. # 格式化发布日期
  45. publish_date = ''
  46. if publish_ts:
  47. try:
  48. publish_date = datetime.fromtimestamp(publish_ts).strftime('%Y-%m-%d %H:%M:%S')
  49. except:
  50. publish_date = ''
  51. # 使用处理后的图片URL,如果没有则使用原始URL
  52. image_urls = processed_image_urls if processed_image_urls else note_card.get('image_list', [])
  53. return {
  54. "post_data": {
  55. "channel_content_id": note_id,
  56. "link": link,
  57. "xsec_token": "", # 通常为空
  58. "comment_count": interact_info.get('comment_count', 0),
  59. "images": image_urls,
  60. "like_count": interact_info.get('liked_count', 0),
  61. "body_text": note_card.get('desc', ''),
  62. "title": note_card.get('display_title', ''),
  63. "collect_count": interact_info.get('collected_count', 0),
  64. "channel_account_id": user.get('user_id', ''),
  65. "channel_account_name": user.get('nick_name', ''),
  66. "publish_timestamp": publish_ts_ms,
  67. "modify_timestamp": publish_ts_ms,
  68. "update_timestamp": int(time.time() * 1000),
  69. "publish_date": publish_date,
  70. "content_type": "note",
  71. "video": {} # 图文类型无视频
  72. },
  73. "question_data": {
  74. "target": original_feature, # 例如: "墨镜"
  75. "start_points": start_points, # 例如: ["墨镜", "猫咪服饰造型元素", "图片中猫咪佩戴墨镜"]
  76. "query": search_word # 例如: "猫咪服饰造型元素"
  77. }
  78. }
  79. class DeconstructionAPIClient:
  80. """解构分析 API 客户端"""
  81. def __init__(
  82. self,
  83. api_url: str = "http://192.168.245.150:7000/what/analysis/single",
  84. timeout: int = 30,
  85. max_retries: int = 3
  86. ):
  87. """
  88. 初始化 API 客户端
  89. Args:
  90. api_url: API 地址
  91. timeout: 超时时间(秒)
  92. max_retries: 最大重试次数
  93. """
  94. self.api_url = api_url
  95. self.timeout = timeout
  96. self.max_retries = max_retries
  97. def call_api(
  98. self,
  99. api_payload: Dict
  100. ) -> Dict:
  101. """
  102. 调用解构 API(带重试机制)
  103. Args:
  104. api_payload: API 请求数据
  105. Returns:
  106. {
  107. 'status': 'success' | 'failed',
  108. 'result': API响应数据(成功时),
  109. 'error': 错误信息(失败时)
  110. }
  111. """
  112. for attempt in range(self.max_retries):
  113. try:
  114. response = requests.post(
  115. self.api_url,
  116. json=api_payload,
  117. headers={'Content-Type': 'application/json'},
  118. timeout=self.timeout
  119. )
  120. if response.status_code == 200:
  121. return {
  122. 'status': 'success',
  123. 'result': response.json(),
  124. 'error': None
  125. }
  126. else:
  127. error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
  128. # 如果还有重试机会,继续重试
  129. if attempt < self.max_retries - 1:
  130. wait_time = 2 ** attempt # 指数退避: 1s, 2s, 4s
  131. logger.warning(f" API 调用失败,{wait_time}s 后重试 ({attempt + 1}/{self.max_retries})")
  132. time.sleep(wait_time)
  133. continue
  134. # 最后一次重试也失败
  135. return {
  136. 'status': 'failed',
  137. 'result': None,
  138. 'error': error_msg
  139. }
  140. except requests.Timeout:
  141. if attempt < self.max_retries - 1:
  142. wait_time = 2 ** attempt
  143. logger.warning(f" API 超时,{wait_time}s 后重试 ({attempt + 1}/{self.max_retries})")
  144. time.sleep(wait_time)
  145. continue
  146. return {
  147. 'status': 'failed',
  148. 'result': None,
  149. 'error': f'API timeout after {self.timeout}s'
  150. }
  151. except Exception as e:
  152. if attempt < self.max_retries - 1:
  153. wait_time = 2 ** attempt
  154. logger.warning(f" API 异常,{wait_time}s 后重试 ({attempt + 1}/{self.max_retries}): {e}")
  155. time.sleep(wait_time)
  156. continue
  157. return {
  158. 'status': 'failed',
  159. 'result': None,
  160. 'error': f'Exception: {str(e)}'
  161. }
  162. # 理论上不会到这里
  163. return {
  164. 'status': 'failed',
  165. 'result': None,
  166. 'error': 'Max retries exceeded'
  167. }
  168. def test_api_client():
  169. """测试 API 客户端"""
  170. # 模拟数据
  171. test_note = {
  172. 'id': '68ba3a27000000001c00f8fc'
  173. }
  174. test_note_card = {
  175. 'display_title': '测试标题',
  176. 'desc': '测试内容',
  177. 'image_list': [
  178. 'https://example.com/image1.jpg',
  179. 'https://example.com/image2.jpg'
  180. ],
  181. 'user': {
  182. 'user_id': '123456',
  183. 'nick_name': '测试用户'
  184. },
  185. 'interact_info': {
  186. 'liked_count': 100,
  187. 'collected_count': 50,
  188. 'comment_count': 10
  189. },
  190. 'publish_timestamp': 1640000000
  191. }
  192. test_evaluation = {
  193. '综合得分': 9.0,
  194. '关键匹配点': ['测试匹配点1', '测试匹配点2']
  195. }
  196. # 数据映射测试
  197. api_payload = map_note_to_api_format(
  198. note=test_note,
  199. note_card=test_note_card,
  200. evaluation=test_evaluation,
  201. search_word='测试搜索词',
  202. original_feature='测试特征',
  203. start_points=['起点1', '起点2']
  204. )
  205. print("API Payload:")
  206. import json
  207. print(json.dumps(api_payload, ensure_ascii=False, indent=2))
  208. # API 调用测试(需要实际 API 服务)
  209. # client = DeconstructionAPIClient()
  210. # result = client.call_api(api_payload)
  211. # print("\nAPI Result:")
  212. # print(json.dumps(result, ensure_ascii=False, indent=2))
  213. if __name__ == '__main__':
  214. logging.basicConfig(
  215. level=logging.INFO,
  216. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  217. )
  218. test_api_client()