"""Shared crawapi HTTP base (V3-M1A). Extracted verbatim from douyin.py so multiple platform clients (douyin / shipinhao) reuse the same HTTP post + rate limiting + rate-limit error classification + env-file helpers, instead of each duplicating them. Pure refactor: behaviour is identical to the original douyin implementation. """ from __future__ import annotations import os import time from pathlib import Path from typing import Any, Callable from urllib.parse import urljoin import httpx from content_agent.errors import ContentAgentError, ErrorCode RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests") class CrawapiTransientError(RuntimeError): """Retryable crawapi failure (network/timeout, or a platform-declared transient business code such as 视频号 25011). Subclasses RuntimeError so existing `except RuntimeError` handlers keep working unchanged.""" class RateLimiter: def __init__( self, min_interval_seconds: float = 12.0, now_fn: Callable[[], float] = time.monotonic, sleep_fn: Callable[[float], None] = time.sleep, ) -> None: self.min_interval_seconds = min_interval_seconds self.now_fn = now_fn self.sleep_fn = sleep_fn self._last_call_by_bucket: dict[str, float] = {} def wait(self, bucket: str) -> None: last = self._last_call_by_bucket.get(bucket) if last is not None: remaining = self.min_interval_seconds - (self.now_fn() - last) if remaining > 0: self.sleep_fn(remaining) self._last_call_by_bucket[bucket] = self.now_fn() def is_rate_limit_business_error( code: Any, data: dict[str, Any], *, business_codes: set[str] ) -> bool: if str(code) in business_codes: return True message = str(data.get("msg") or data.get("message") or "").lower() return any(token in message for token in RATE_LIMIT_MESSAGE_TOKENS) def post_crawapi_json( *, http_client: Any, base_url: str, path: str, payload: dict[str, Any], operation: str, timeout_seconds: float, rate_limiter: RateLimiter | None = None, rate_limit_bucket: str | None = None, business_codes: set[str], transient_business_codes: set[str] = frozenset(), ) -> dict[str, Any]: if rate_limit_bucket and rate_limiter: rate_limiter.wait(rate_limit_bucket) url = urljoin(base_url, path) try: response = http_client.post( url, json=payload, headers={"Content-Type": "application/json"}, timeout=timeout_seconds, ) response.raise_for_status() data = response.json() except httpx.HTTPStatusError as exc: status_code = exc.response.status_code if exc.response is not None else "unknown" if status_code == 429: raise ContentAgentError( ErrorCode.PLATFORM_RATE_LIMITED, f"crawapi {operation} failed: rate_limited", {"operation": operation, "status_code": 429}, ) from exc raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc except httpx.HTTPError as exc: raise CrawapiTransientError(f"crawapi {operation} failed: network_error") from exc except ValueError as exc: raise RuntimeError(f"crawapi {operation} failed: bad_json") from exc if not isinstance(data, dict): raise RuntimeError(f"crawapi {operation} failed: bad_response") code = data.get("code") if code is not None and code not in (0, "0"): if is_rate_limit_business_error(code, data, business_codes=business_codes): raise ContentAgentError( ErrorCode.PLATFORM_RATE_LIMITED, f"crawapi {operation} failed: rate_limited", {"operation": operation, "business_code": str(code)}, ) if str(code) in transient_business_codes: raise CrawapiTransientError( f"crawapi {operation} failed: transient_business_error code={code}" ) raise RuntimeError(f"crawapi {operation} failed: business_error") return data def _load_env_file(env_path: str | Path) -> dict[str, str]: path = Path(env_path) if not path.exists(): return {} env: dict[str, str] = {} for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) env[key.strip()] = value.strip().strip('"').strip("'") return env def _env( key: str, file_env: dict[str, str], default: str | None = None, required: bool = False, ) -> str: value = file_env.get(key) or os.getenv(key) or default if required and not value: raise RuntimeError(f"missing required env: {key}") return value or "" def _optional_positive_int(value: str) -> int | None: try: parsed = int(value) except ValueError: return None return parsed if parsed > 0 else None def content_format(raw_content_type: str) -> str: if "图文" in raw_content_type: return "image_text" if "文本" in raw_content_type: return "text" if "直播" in raw_content_type: return "live" return "video" def score_from_statistics(statistics: dict[str, Any]) -> int: digg = int(statistics.get("digg_count") or 0) comment = int(statistics.get("comment_count") or 0) share = int(statistics.get("share_count") or 0) weighted = digg + comment * 3 + share * 4 if weighted >= 3000: return 72 if weighted >= 1000: return 62 if weighted >= 300: return 55 return 45