| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- """视频号(shipinhao)接入 client (V3-M1C).
- 复用 crawapi_http 共享基座(HTTP/限流/env)。search 对暂时性故障(25011/网络/
- 超时)按 platform_profiles/shipinhao.json 的口径重试(3 次、退避 1-2-4s),试满
- 抛 ContentAgentError 走既有失败通道。归一化输出与抖音同构(canonical 键集合一致)。
- blogger/account_info 上游 blocked,fetch_author_works 返回 [] 不请求、不抛。
- """
- from __future__ import annotations
- import re
- import time
- from pathlib import Path
- from typing import Any, Callable
- from content_agent.errors import ContentAgentError, ErrorCode
- from content_agent.integrations.crawapi_http import (
- CrawapiTransientError,
- RateLimiter,
- _env,
- _load_env_file,
- content_format,
- post_crawapi_json,
- score_from_statistics,
- )
- SEARCH_RATE_LIMIT_BUCKET = "shipinhao_search"
- TRANSIENT_BUSINESS_CODES = {"25011"}
- _TAG_RE = re.compile(r"#([^\s#@((]+)")
- def _retry_transient(
- fn: Callable[[], Any],
- *,
- attempts: int,
- backoff_seconds: tuple[int, ...],
- sleep_fn: Callable[[float], None],
- ) -> Any:
- for attempt in range(attempts):
- try:
- return fn()
- except CrawapiTransientError:
- if attempt == attempts - 1:
- raise
- sleep_fn(backoff_seconds[min(attempt, len(backoff_seconds) - 1)])
- def _normalize_shipinhao_item(
- query: dict[str, Any],
- item: dict[str, Any],
- index: int,
- has_more: bool,
- next_cursor: str,
- ) -> dict[str, Any]:
- title = item.get("title") or ""
- statistics = {
- "digg_count": int(item.get("like_count") or 0),
- "comment_count": int(item.get("comment_count") or 0),
- "share_count": int(item.get("share_count") or 0),
- "collect_count": int(item.get("collect_count") or 0),
- "play_count": int(item.get("play_count") or 0),
- }
- topic_list = item.get("topic_list") or []
- tags = [t if str(t).startswith("#") else f"#{t}" for t in topic_list if t]
- if not tags:
- tags = [f"#{m}" for m in _TAG_RE.findall(title)]
- video_list = item.get("video_url_list") or []
- play_url = video_list[0].get("video_url") if video_list else None
- platform_content_id = str(item.get("channel_content_id") or "")
- platform_author_id = str(item.get("channel_account_id") or "")
- publish_ms = item.get("publish_timestamp")
- return {
- "content_discovery_id": f"{query['search_query_id']}_content_{index:03d}",
- "search_query_id": query["search_query_id"],
- "platform": "shipinhao",
- "platform_content_id": platform_content_id,
- "platform_content_format": content_format(item.get("content_type") or "video"),
- "play_url": play_url,
- "description": title,
- "platform_author_id": platform_author_id,
- "author_display_name": item.get("channel_account_name") or "",
- "statistics": statistics,
- "tags": list(dict.fromkeys(tags)),
- "text_extra": [],
- "create_time": int(publish_ms) // 1000 if publish_ms else None,
- "has_more": has_more,
- "next_cursor": next_cursor,
- "score": score_from_statistics(statistics),
- "risk_level": "unknown",
- "discovery_relation": "derived_from_pattern_demand",
- "discovery_start_source": query["discovery_start_source"],
- "previous_discovery_step": "search_query_direct",
- "content_metadata_source": "shipinhao_keyword_search",
- "platform_auth_mode": "no_bearer",
- "platform_raw_payload": {
- "channel_content_id": platform_content_id,
- "channel_account_id": platform_author_id,
- },
- }
- class CrawapiShipinhaoClient:
- def __init__(
- self,
- base_url: str,
- keyword_path: str = "/crawler/shi_pin_hao/keyword",
- timeout_seconds: float = 60.0,
- max_results_per_query: int | None = None,
- max_attempts: int = 3,
- backoff_seconds: tuple[int, ...] = (1, 2, 4),
- http_client: Any | None = None,
- rate_limiter: RateLimiter | None = None,
- sleep_fn: Callable[[float], None] = time.sleep,
- ) -> None:
- import httpx
- self.base_url = base_url.rstrip("/") + "/"
- self.keyword_path = keyword_path.lstrip("/")
- self.timeout_seconds = timeout_seconds
- self.max_results_per_query = max_results_per_query
- self.max_attempts = max_attempts
- self.backoff_seconds = backoff_seconds
- self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
- self.rate_limiter = rate_limiter
- self.sleep_fn = sleep_fn
- @classmethod
- def from_env(cls, env_path: str | Path = ".env") -> "CrawapiShipinhaoClient":
- env = _load_env_file(env_path)
- return cls(
- base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
- timeout_seconds=float(
- _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
- ),
- rate_limiter=RateLimiter(min_interval_seconds=15.0),
- )
- def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- payload = {
- "keyword": query["search_query"],
- "cursor": str(query.get("page_cursor") or ""),
- }
- def _call() -> dict[str, Any]:
- return post_crawapi_json(
- http_client=self.http_client,
- base_url=self.base_url,
- path=self.keyword_path,
- payload=payload,
- operation="keyword_search",
- timeout_seconds=self.timeout_seconds,
- rate_limiter=self.rate_limiter,
- rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
- business_codes=set(),
- transient_business_codes=TRANSIENT_BUSINESS_CODES,
- )
- try:
- data = _retry_transient(
- _call,
- attempts=self.max_attempts,
- backoff_seconds=self.backoff_seconds,
- sleep_fn=self.sleep_fn,
- )
- except CrawapiTransientError as exc:
- raise ContentAgentError(
- ErrorCode.PLATFORM_REQUEST_FAILED,
- "shipinhao search exhausted after retries",
- {"operation": "keyword_search", "max_attempts": self.max_attempts},
- ) from exc
- block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
- items = block.get("data", []) if isinstance(block.get("data"), list) else []
- has_more = bool(block.get("has_more", False))
- next_cursor = str(block.get("next_cursor") or "")
- selected = items[: self.max_results_per_query] if self.max_results_per_query else items
- return [
- _normalize_shipinhao_item(query, item, index, has_more, next_cursor)
- for index, item in enumerate(selected, start=1)
- ]
- def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- # 上游 blogger 接口 blocked(code=25011),不发请求、不抛,游走自然退化。
- return []
|