shipinhao.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """视频号(shipinhao)接入 client (V3-M1C).
  2. 复用 crawapi_http 共享基座(HTTP/限流/env)。search 对暂时性故障(25011/网络/
  3. 超时)按 platform_profiles/shipinhao.json 的口径重试(3 次、退避 1-2-4s),试满
  4. 抛 ContentAgentError 走既有失败通道。归一化输出与抖音同构(canonical 键集合一致)。
  5. blogger/account_info 上游 blocked,fetch_author_works 返回 [] 不请求、不抛。
  6. """
  7. from __future__ import annotations
  8. import re
  9. import time
  10. from pathlib import Path
  11. from typing import Any, Callable
  12. from content_agent.errors import ContentAgentError, ErrorCode
  13. from content_agent.integrations.crawapi_http import (
  14. CrawapiTransientError,
  15. RateLimiter,
  16. _env,
  17. _load_env_file,
  18. content_format,
  19. post_crawapi_json,
  20. score_from_statistics,
  21. )
  22. SEARCH_RATE_LIMIT_BUCKET = "shipinhao_search"
  23. TRANSIENT_BUSINESS_CODES = {"25011"}
  24. _TAG_RE = re.compile(r"#([^\s#@((]+)")
  25. def _retry_transient(
  26. fn: Callable[[], Any],
  27. *,
  28. attempts: int,
  29. backoff_seconds: tuple[int, ...],
  30. sleep_fn: Callable[[float], None],
  31. ) -> Any:
  32. for attempt in range(attempts):
  33. try:
  34. return fn()
  35. except CrawapiTransientError:
  36. if attempt == attempts - 1:
  37. raise
  38. sleep_fn(backoff_seconds[min(attempt, len(backoff_seconds) - 1)])
  39. def _normalize_shipinhao_item(
  40. query: dict[str, Any],
  41. item: dict[str, Any],
  42. index: int,
  43. has_more: bool,
  44. next_cursor: str,
  45. ) -> dict[str, Any]:
  46. title = item.get("title") or ""
  47. statistics = {
  48. "digg_count": int(item.get("like_count") or 0),
  49. "comment_count": int(item.get("comment_count") or 0),
  50. "share_count": int(item.get("share_count") or 0),
  51. "collect_count": int(item.get("collect_count") or 0),
  52. "play_count": int(item.get("play_count") or 0),
  53. }
  54. topic_list = item.get("topic_list") or []
  55. tags = [t if str(t).startswith("#") else f"#{t}" for t in topic_list if t]
  56. if not tags:
  57. tags = [f"#{m}" for m in _TAG_RE.findall(title)]
  58. video_list = item.get("video_url_list") or []
  59. play_url = video_list[0].get("video_url") if video_list else None
  60. platform_content_id = str(item.get("channel_content_id") or "")
  61. platform_author_id = str(item.get("channel_account_id") or "")
  62. publish_ms = item.get("publish_timestamp")
  63. return {
  64. "content_discovery_id": f"{query['search_query_id']}_content_{index:03d}",
  65. "search_query_id": query["search_query_id"],
  66. "platform": "shipinhao",
  67. "platform_content_id": platform_content_id,
  68. "platform_content_format": content_format(item.get("content_type") or "video"),
  69. "play_url": play_url,
  70. "description": title,
  71. "platform_author_id": platform_author_id,
  72. "author_display_name": item.get("channel_account_name") or "",
  73. "statistics": statistics,
  74. "tags": list(dict.fromkeys(tags)),
  75. "text_extra": [],
  76. "create_time": int(publish_ms) // 1000 if publish_ms else None,
  77. "has_more": has_more,
  78. "next_cursor": next_cursor,
  79. "score": score_from_statistics(statistics),
  80. "risk_level": "unknown",
  81. "discovery_relation": "derived_from_pattern_demand",
  82. "discovery_start_source": query["discovery_start_source"],
  83. "previous_discovery_step": "search_query_direct",
  84. "content_metadata_source": "shipinhao_keyword_search",
  85. "platform_auth_mode": "no_bearer",
  86. "platform_raw_payload": {
  87. "channel_content_id": platform_content_id,
  88. "channel_account_id": platform_author_id,
  89. },
  90. }
  91. class CrawapiShipinhaoClient:
  92. def __init__(
  93. self,
  94. base_url: str,
  95. keyword_path: str = "/crawler/shi_pin_hao/keyword",
  96. timeout_seconds: float = 60.0,
  97. max_results_per_query: int | None = None,
  98. max_attempts: int = 3,
  99. backoff_seconds: tuple[int, ...] = (1, 2, 4),
  100. http_client: Any | None = None,
  101. rate_limiter: RateLimiter | None = None,
  102. sleep_fn: Callable[[float], None] = time.sleep,
  103. ) -> None:
  104. import httpx
  105. self.base_url = base_url.rstrip("/") + "/"
  106. self.keyword_path = keyword_path.lstrip("/")
  107. self.timeout_seconds = timeout_seconds
  108. self.max_results_per_query = max_results_per_query
  109. self.max_attempts = max_attempts
  110. self.backoff_seconds = backoff_seconds
  111. self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
  112. self.rate_limiter = rate_limiter
  113. self.sleep_fn = sleep_fn
  114. @classmethod
  115. def from_env(cls, env_path: str | Path = ".env") -> "CrawapiShipinhaoClient":
  116. env = _load_env_file(env_path)
  117. return cls(
  118. base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
  119. timeout_seconds=float(
  120. _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
  121. ),
  122. rate_limiter=RateLimiter(min_interval_seconds=15.0),
  123. )
  124. def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
  125. payload = {
  126. "keyword": query["search_query"],
  127. "cursor": str(query.get("page_cursor") or ""),
  128. }
  129. def _call() -> dict[str, Any]:
  130. return post_crawapi_json(
  131. http_client=self.http_client,
  132. base_url=self.base_url,
  133. path=self.keyword_path,
  134. payload=payload,
  135. operation="keyword_search",
  136. timeout_seconds=self.timeout_seconds,
  137. rate_limiter=self.rate_limiter,
  138. rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
  139. business_codes=set(),
  140. transient_business_codes=TRANSIENT_BUSINESS_CODES,
  141. )
  142. try:
  143. data = _retry_transient(
  144. _call,
  145. attempts=self.max_attempts,
  146. backoff_seconds=self.backoff_seconds,
  147. sleep_fn=self.sleep_fn,
  148. )
  149. except CrawapiTransientError as exc:
  150. raise ContentAgentError(
  151. ErrorCode.PLATFORM_REQUEST_FAILED,
  152. "shipinhao search exhausted after retries",
  153. {"operation": "keyword_search", "max_attempts": self.max_attempts},
  154. ) from exc
  155. block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
  156. items = block.get("data", []) if isinstance(block.get("data"), list) else []
  157. has_more = bool(block.get("has_more", False))
  158. next_cursor = str(block.get("next_cursor") or "")
  159. selected = items[: self.max_results_per_query] if self.max_results_per_query else items
  160. return [
  161. _normalize_shipinhao_item(query, item, index, has_more, next_cursor)
  162. for index, item in enumerate(selected, start=1)
  163. ]
  164. def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
  165. # 上游 blogger 接口 blocked(code=25011),不发请求、不抛,游走自然退化。
  166. return []