| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Stage 7 API 客户端
- 用于调用深度解构分析 API
- """
- import time
- import logging
- import requests
- from datetime import datetime
- from typing import Dict, List, Any, Optional
- logger = logging.getLogger(__name__)
- def map_note_to_api_format(
- note: Dict,
- note_card: Dict,
- evaluation: Dict,
- search_word: str,
- original_feature: str,
- start_points: List[str],
- processed_image_urls: Optional[List[str]] = None
- ) -> Dict:
- """
- 将小红书笔记数据映射为 API 所需格式
- Args:
- note: 笔记原始数据
- note_card: 笔记卡片信息
- evaluation: 评估结果
- search_word: 搜索词
- original_feature: 原始特征
- start_points: 起点列表
- processed_image_urls: 处理后的图片URL列表(如果提供,将替代原始URL)
- Returns:
- API 请求格式的数据
- """
- # 构建小红书链接
- note_id = note.get('id', '')
- link = f"https://www.xiaohongshu.com/explore/{note_id}"
- # 获取用户信息
- user = note_card.get('user', {})
- interact_info = note_card.get('interact_info', {})
- # 获取发布时间(需要转换为毫秒时间戳)
- publish_ts = note_card.get('publish_timestamp', 0)
- publish_ts_ms = publish_ts * 1000 if publish_ts else 0
- # 格式化发布日期
- publish_date = ''
- if publish_ts:
- try:
- publish_date = datetime.fromtimestamp(publish_ts).strftime('%Y-%m-%d %H:%M:%S')
- except:
- publish_date = ''
- # 使用处理后的图片URL,如果没有则使用原始URL
- image_urls = processed_image_urls if processed_image_urls else note_card.get('image_list', [])
- return {
- "post_data": {
- "channel_content_id": note_id,
- "link": link,
- "xsec_token": "", # 通常为空
- "comment_count": interact_info.get('comment_count', 0),
- "images": image_urls,
- "like_count": interact_info.get('liked_count', 0),
- "body_text": note_card.get('desc', ''),
- "title": note_card.get('display_title', ''),
- "collect_count": interact_info.get('collected_count', 0),
- "channel_account_id": user.get('user_id', ''),
- "channel_account_name": user.get('nick_name', ''),
- "publish_timestamp": publish_ts_ms,
- "modify_timestamp": publish_ts_ms,
- "update_timestamp": int(time.time() * 1000),
- "publish_date": publish_date,
- "content_type": "note",
- "video": {} # 图文类型无视频
- },
- "question_data": {
- "target": original_feature, # 例如: "墨镜"
- "start_points": start_points, # 例如: ["墨镜", "猫咪服饰造型元素", "图片中猫咪佩戴墨镜"]
- "query": search_word # 例如: "猫咪服饰造型元素"
- }
- }
- class DeconstructionAPIClient:
- """解构分析 API 客户端"""
- def __init__(
- self,
- api_url: str = "http://192.168.245.150:7000/what/analysis/single",
- timeout: int = 30,
- max_retries: int = 3
- ):
- """
- 初始化 API 客户端
- Args:
- api_url: API 地址
- timeout: 超时时间(秒)
- max_retries: 最大重试次数
- """
- self.api_url = api_url
- self.timeout = timeout
- self.max_retries = max_retries
- def call_api(
- self,
- api_payload: Dict
- ) -> Dict:
- """
- 调用解构 API(带重试机制)
- Args:
- api_payload: API 请求数据
- Returns:
- {
- 'status': 'success' | 'failed',
- 'result': API响应数据(成功时),
- 'error': 错误信息(失败时)
- }
- """
- for attempt in range(self.max_retries):
- try:
- response = requests.post(
- self.api_url,
- json=api_payload,
- headers={'Content-Type': 'application/json'},
- timeout=self.timeout
- )
- if response.status_code == 200:
- return {
- 'status': 'success',
- 'result': response.json(),
- 'error': None
- }
- else:
- error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
- # 如果还有重试机会,继续重试
- if attempt < self.max_retries - 1:
- wait_time = 2 ** attempt # 指数退避: 1s, 2s, 4s
- logger.warning(f" API 调用失败,{wait_time}s 后重试 ({attempt + 1}/{self.max_retries})")
- time.sleep(wait_time)
- continue
- # 最后一次重试也失败
- return {
- 'status': 'failed',
- 'result': None,
- 'error': error_msg
- }
- except requests.Timeout:
- if attempt < self.max_retries - 1:
- wait_time = 2 ** attempt
- logger.warning(f" API 超时,{wait_time}s 后重试 ({attempt + 1}/{self.max_retries})")
- time.sleep(wait_time)
- continue
- return {
- 'status': 'failed',
- 'result': None,
- 'error': f'API timeout after {self.timeout}s'
- }
- except Exception as e:
- if attempt < self.max_retries - 1:
- wait_time = 2 ** attempt
- logger.warning(f" API 异常,{wait_time}s 后重试 ({attempt + 1}/{self.max_retries}): {e}")
- time.sleep(wait_time)
- continue
- return {
- 'status': 'failed',
- 'result': None,
- 'error': f'Exception: {str(e)}'
- }
- # 理论上不会到这里
- return {
- 'status': 'failed',
- 'result': None,
- 'error': 'Max retries exceeded'
- }
- def test_api_client():
- """测试 API 客户端"""
- # 模拟数据
- test_note = {
- 'id': '68ba3a27000000001c00f8fc'
- }
- test_note_card = {
- 'display_title': '测试标题',
- 'desc': '测试内容',
- 'image_list': [
- 'https://example.com/image1.jpg',
- 'https://example.com/image2.jpg'
- ],
- 'user': {
- 'user_id': '123456',
- 'nick_name': '测试用户'
- },
- 'interact_info': {
- 'liked_count': 100,
- 'collected_count': 50,
- 'comment_count': 10
- },
- 'publish_timestamp': 1640000000
- }
- test_evaluation = {
- '综合得分': 9.0,
- '关键匹配点': ['测试匹配点1', '测试匹配点2']
- }
- # 数据映射测试
- api_payload = map_note_to_api_format(
- note=test_note,
- note_card=test_note_card,
- evaluation=test_evaluation,
- search_word='测试搜索词',
- original_feature='测试特征',
- start_points=['起点1', '起点2']
- )
- print("API Payload:")
- import json
- print(json.dumps(api_payload, ensure_ascii=False, indent=2))
- # API 调用测试(需要实际 API 服务)
- # client = DeconstructionAPIClient()
- # result = client.call_api(api_payload)
- # print("\nAPI Result:")
- # print(json.dumps(result, ensure_ascii=False, indent=2))
- if __name__ == '__main__':
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- test_api_client()
|