| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- """Shared crawapi HTTP base (V3-M1A).
- Extracted verbatim from douyin.py so multiple platform clients (douyin /
- shipinhao) reuse the same HTTP post + rate limiting + rate-limit error
- classification + env-file helpers, instead of each duplicating them.
- Pure refactor: behaviour is identical to the original douyin implementation.
- """
- from __future__ import annotations
- import os
- import time
- from pathlib import Path
- from typing import Any, Callable
- from urllib.parse import urljoin
- import httpx
- from content_agent.errors import ContentAgentError, ErrorCode
- RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests")
- class CrawapiTransientError(RuntimeError):
- """Retryable crawapi failure (network/timeout, or a platform-declared
- transient business code such as 视频号 25011). Subclasses RuntimeError so
- existing `except RuntimeError` handlers keep working unchanged."""
- class RateLimiter:
- def __init__(
- self,
- min_interval_seconds: float = 12.0,
- now_fn: Callable[[], float] = time.monotonic,
- sleep_fn: Callable[[float], None] = time.sleep,
- ) -> None:
- self.min_interval_seconds = min_interval_seconds
- self.now_fn = now_fn
- self.sleep_fn = sleep_fn
- self._last_call_by_bucket: dict[str, float] = {}
- def wait(self, bucket: str) -> None:
- last = self._last_call_by_bucket.get(bucket)
- if last is not None:
- remaining = self.min_interval_seconds - (self.now_fn() - last)
- if remaining > 0:
- self.sleep_fn(remaining)
- self._last_call_by_bucket[bucket] = self.now_fn()
- def is_rate_limit_business_error(
- code: Any, data: dict[str, Any], *, business_codes: set[str]
- ) -> bool:
- if str(code) in business_codes:
- return True
- message = str(data.get("msg") or data.get("message") or "").lower()
- return any(token in message for token in RATE_LIMIT_MESSAGE_TOKENS)
- def post_crawapi_json(
- *,
- http_client: Any,
- base_url: str,
- path: str,
- payload: dict[str, Any],
- operation: str,
- timeout_seconds: float,
- rate_limiter: RateLimiter | None = None,
- rate_limit_bucket: str | None = None,
- business_codes: set[str],
- transient_business_codes: set[str] = frozenset(),
- ) -> dict[str, Any]:
- if rate_limit_bucket and rate_limiter:
- rate_limiter.wait(rate_limit_bucket)
- url = urljoin(base_url, path)
- try:
- response = http_client.post(
- url,
- json=payload,
- headers={"Content-Type": "application/json"},
- timeout=timeout_seconds,
- )
- response.raise_for_status()
- data = response.json()
- except httpx.HTTPStatusError as exc:
- status_code = exc.response.status_code if exc.response is not None else "unknown"
- if status_code == 429:
- raise ContentAgentError(
- ErrorCode.PLATFORM_RATE_LIMITED,
- f"crawapi {operation} failed: rate_limited",
- {"operation": operation, "status_code": 429},
- ) from exc
- raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc
- except httpx.HTTPError as exc:
- raise CrawapiTransientError(f"crawapi {operation} failed: network_error") from exc
- except ValueError as exc:
- raise RuntimeError(f"crawapi {operation} failed: bad_json") from exc
- if not isinstance(data, dict):
- raise RuntimeError(f"crawapi {operation} failed: bad_response")
- code = data.get("code")
- if code is not None and code not in (0, "0"):
- if is_rate_limit_business_error(code, data, business_codes=business_codes):
- raise ContentAgentError(
- ErrorCode.PLATFORM_RATE_LIMITED,
- f"crawapi {operation} failed: rate_limited",
- {"operation": operation, "business_code": str(code)},
- )
- if str(code) in transient_business_codes:
- raise CrawapiTransientError(
- f"crawapi {operation} failed: transient_business_error code={code}"
- )
- raise RuntimeError(f"crawapi {operation} failed: business_error")
- return data
- def _load_env_file(env_path: str | Path) -> dict[str, str]:
- path = Path(env_path)
- if not path.exists():
- return {}
- env: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- env[key.strip()] = value.strip().strip('"').strip("'")
- return env
- def _env(
- key: str,
- file_env: dict[str, str],
- default: str | None = None,
- required: bool = False,
- ) -> str:
- value = file_env.get(key) or os.getenv(key) or default
- if required and not value:
- raise RuntimeError(f"missing required env: {key}")
- return value or ""
- def _optional_positive_int(value: str) -> int | None:
- try:
- parsed = int(value)
- except ValueError:
- return None
- return parsed if parsed > 0 else None
- def content_format(raw_content_type: str) -> str:
- if "图文" in raw_content_type:
- return "image_text"
- if "文本" in raw_content_type:
- return "text"
- if "直播" in raw_content_type:
- return "live"
- return "video"
- def score_from_statistics(statistics: dict[str, Any]) -> int:
- digg = int(statistics.get("digg_count") or 0)
- comment = int(statistics.get("comment_count") or 0)
- share = int(statistics.get("share_count") or 0)
- weighted = digg + comment * 3 + share * 4
- if weighted >= 3000:
- return 72
- if weighted >= 1000:
- return 62
- if weighted >= 300:
- return 55
- return 45
|