فهرست منبع

feat(v3-m1): 平台接入层双渠道(抖音+视频号)+共享 crawapi 基座

- M1A 共享基座 crawapi_http.py(纯重构抽取):RateLimiter/post_crawapi_json/限流错误识别/env helper/content_format/score_from_statistics;douyin.py 改 import+re-export,既有 26 单测零回归
- M1B 抖音 detail+play_url(加性):_extract_play_url + 归一化补 play_url + fetch_detail 端点 + CONTENTFIND_DOUYIN_DETAIL_PATH env
- M1C 视频号 shipinhao.py:复用基座、归一化与抖音同构、25011/网络暂时性故障重试(3次/1-2-4s,空结果不重试,试满抛ContentAgentError)、blogger blocked 返回[]不抛;新增 CrawapiTransientError(RuntimeError子类,douyin行为不变)
- M1D run_service._platform_client real 三分派(douyin/shipinhao/其他raise),mock不变
- 三原则守住:判定链/builder/media表/字段映射/profile 零改;共享避免冗余;双渠道 canonical 键集合严格一致
- 基线 314 -> 330 passed,M0/V2 回放零回归;测试验收岗 7/7 通过

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sam Lee 2 روز پیش
والد
کامیت
588786c9d1

+ 1 - 0
.env

@@ -83,6 +83,7 @@ CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS=60
 
 CONTENTFIND_DOUYIN_KEYWORD_PATH=/crawler/dou_yin/keyword
 CONTENTFIND_DOUYIN_BLOGGER_PATH=/crawler/dou_yin/blogger
+CONTENTFIND_DOUYIN_DETAIL_PATH=/crawler/dou_yin/detail
 CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH=/crawler/dou_yin/re_dian_bao/video_like_portrait
 CONTENTFIND_DOUYIN_ACCOUNT_FANS_PORTRAIT_PATH=/crawler/dou_yin/re_dian_bao/account_fans_portrait
 CONTENTFIND_HOT_TOPIC_PATH=/crawler/jin_ri_re_bang/content_rank

+ 1 - 0
.env.example

@@ -38,6 +38,7 @@ CONTENTFIND_API_CRAWAPI_KEY=<fill-if-required>
 CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS=60
 CONTENTFIND_DOUYIN_KEYWORD_PATH=/crawler/dou_yin/keyword
 CONTENTFIND_DOUYIN_BLOGGER_PATH=/crawler/dou_yin/blogger
+CONTENTFIND_DOUYIN_DETAIL_PATH=/crawler/dou_yin/detail
 CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH=/crawler/dou_yin/re_dian_bao/video_like_portrait
 CONTENTFIND_DOUYIN_DEFAULT_ACCOUNT_ID=771431222
 CONTENTFIND_DOUYIN_DEFAULT_CONTENT_TYPE=视频

+ 171 - 0
content_agent/integrations/crawapi_http.py

@@ -0,0 +1,171 @@
+"""Shared crawapi HTTP base (V3-M1A).
+
+Extracted verbatim from douyin.py so multiple platform clients (douyin /
+shipinhao) reuse the same HTTP post + rate limiting + rate-limit error
+classification + env-file helpers, instead of each duplicating them.
+Pure refactor: behaviour is identical to the original douyin implementation.
+"""
+
+from __future__ import annotations
+
+import os
+import time
+from pathlib import Path
+from typing import Any, Callable
+from urllib.parse import urljoin
+
+import httpx
+
+from content_agent.errors import ContentAgentError, ErrorCode
+
+RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests")
+
+
+class CrawapiTransientError(RuntimeError):
+    """Retryable crawapi failure (network/timeout, or a platform-declared
+    transient business code such as 视频号 25011). Subclasses RuntimeError so
+    existing `except RuntimeError` handlers keep working unchanged."""
+
+
+class RateLimiter:
+    def __init__(
+        self,
+        min_interval_seconds: float = 12.0,
+        now_fn: Callable[[], float] = time.monotonic,
+        sleep_fn: Callable[[float], None] = time.sleep,
+    ) -> None:
+        self.min_interval_seconds = min_interval_seconds
+        self.now_fn = now_fn
+        self.sleep_fn = sleep_fn
+        self._last_call_by_bucket: dict[str, float] = {}
+
+    def wait(self, bucket: str) -> None:
+        last = self._last_call_by_bucket.get(bucket)
+        if last is not None:
+            remaining = self.min_interval_seconds - (self.now_fn() - last)
+            if remaining > 0:
+                self.sleep_fn(remaining)
+        self._last_call_by_bucket[bucket] = self.now_fn()
+
+
+def is_rate_limit_business_error(
+    code: Any, data: dict[str, Any], *, business_codes: set[str]
+) -> bool:
+    if str(code) in business_codes:
+        return True
+    message = str(data.get("msg") or data.get("message") or "").lower()
+    return any(token in message for token in RATE_LIMIT_MESSAGE_TOKENS)
+
+
+def post_crawapi_json(
+    *,
+    http_client: Any,
+    base_url: str,
+    path: str,
+    payload: dict[str, Any],
+    operation: str,
+    timeout_seconds: float,
+    rate_limiter: RateLimiter | None = None,
+    rate_limit_bucket: str | None = None,
+    business_codes: set[str],
+    transient_business_codes: set[str] = frozenset(),
+) -> dict[str, Any]:
+    if rate_limit_bucket and rate_limiter:
+        rate_limiter.wait(rate_limit_bucket)
+    url = urljoin(base_url, path)
+    try:
+        response = http_client.post(
+            url,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=timeout_seconds,
+        )
+        response.raise_for_status()
+        data = response.json()
+    except httpx.HTTPStatusError as exc:
+        status_code = exc.response.status_code if exc.response is not None else "unknown"
+        if status_code == 429:
+            raise ContentAgentError(
+                ErrorCode.PLATFORM_RATE_LIMITED,
+                f"crawapi {operation} failed: rate_limited",
+                {"operation": operation, "status_code": 429},
+            ) from exc
+        raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc
+    except httpx.HTTPError as exc:
+        raise CrawapiTransientError(f"crawapi {operation} failed: network_error") from exc
+    except ValueError as exc:
+        raise RuntimeError(f"crawapi {operation} failed: bad_json") from exc
+    if not isinstance(data, dict):
+        raise RuntimeError(f"crawapi {operation} failed: bad_response")
+    code = data.get("code")
+    if code is not None and code not in (0, "0"):
+        if is_rate_limit_business_error(code, data, business_codes=business_codes):
+            raise ContentAgentError(
+                ErrorCode.PLATFORM_RATE_LIMITED,
+                f"crawapi {operation} failed: rate_limited",
+                {"operation": operation, "business_code": str(code)},
+            )
+        if str(code) in transient_business_codes:
+            raise CrawapiTransientError(
+                f"crawapi {operation} failed: transient_business_error code={code}"
+            )
+        raise RuntimeError(f"crawapi {operation} failed: business_error")
+    return data
+
+
+def _load_env_file(env_path: str | Path) -> dict[str, str]:
+    path = Path(env_path)
+    if not path.exists():
+        return {}
+    env: dict[str, str] = {}
+    for line in path.read_text(encoding="utf-8").splitlines():
+        stripped = line.strip()
+        if not stripped or stripped.startswith("#") or "=" not in stripped:
+            continue
+        key, value = stripped.split("=", 1)
+        env[key.strip()] = value.strip().strip('"').strip("'")
+    return env
+
+
+def _env(
+    key: str,
+    file_env: dict[str, str],
+    default: str | None = None,
+    required: bool = False,
+) -> str:
+    value = file_env.get(key) or os.getenv(key) or default
+    if required and not value:
+        raise RuntimeError(f"missing required env: {key}")
+    return value or ""
+
+
+def _optional_positive_int(value: str) -> int | None:
+    try:
+        parsed = int(value)
+    except ValueError:
+        return None
+    return parsed if parsed > 0 else None
+
+
+def content_format(raw_content_type: str) -> str:
+    if "图文" in raw_content_type:
+        return "image_text"
+    if "文本" in raw_content_type:
+        return "text"
+    if "直播" in raw_content_type:
+        return "live"
+    return "video"
+
+
+def score_from_statistics(statistics: dict[str, Any]) -> int:
+    digg = int(statistics.get("digg_count") or 0)
+    comment = int(statistics.get("comment_count") or 0)
+    share = int(statistics.get("share_count") or 0)
+    weighted = digg + comment * 3 + share * 4
+    if weighted >= 3000:
+        return 72
+    if weighted >= 1000:
+        return 62
+    if weighted >= 300:
+        return 55
+    return 45

+ 71 - 127
content_agent/integrations/douyin.py

@@ -1,15 +1,24 @@
 from __future__ import annotations
 
-import os
 import re
-import time
 from pathlib import Path
-from typing import Any, Callable
-from urllib.parse import urljoin
+from typing import Any
 
 import httpx
 
-from content_agent.errors import ContentAgentError, ErrorCode
+# 共享 crawapi 基座(V3-M1A):HTTP/限流/限流错误识别/env helper 集中于 crawapi_http,
+# 下方 re-export 保持既有外部 import(测试、smoke 脚本)零改。
+from content_agent.integrations.crawapi_http import (
+    RATE_LIMIT_MESSAGE_TOKENS,
+    RateLimiter,
+    _env,
+    _load_env_file,
+    _optional_positive_int,
+    content_format as _content_format,
+    is_rate_limit_business_error,
+    post_crawapi_json,
+    score_from_statistics as _score_from_statistics,
+)
 
 RAW_CONTENT_ID_KEY = "_".join(["aweme", "id"])
 RAW_AUTHOR_ID_KEY = "_".join(["sec", "uid"])
@@ -18,33 +27,11 @@ RAW_AUTHOR_ACCOUNT_KEY = "_".join(["account", "id"])
 # 已证实的限流 business code 白名单。当前没有任何已证实的限流 code,
 # 识别先依靠 HTTP 429 与 message token;live smoke / 真实运行发现新 code 后补入并加用例。
 RATE_LIMIT_BUSINESS_CODES: set[str] = set()
-RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests")
 
 SEARCH_RATE_LIMIT_BUCKET = "douyin_search"
 BLOGGER_RATE_LIMIT_BUCKET = "douyin_blogger"
 
 
-class RateLimiter:
-    def __init__(
-        self,
-        min_interval_seconds: float = 12.0,
-        now_fn: Callable[[], float] = time.monotonic,
-        sleep_fn: Callable[[float], None] = time.sleep,
-    ) -> None:
-        self.min_interval_seconds = min_interval_seconds
-        self.now_fn = now_fn
-        self.sleep_fn = sleep_fn
-        self._last_call_by_bucket: dict[str, float] = {}
-
-    def wait(self, bucket: str) -> None:
-        last = self._last_call_by_bucket.get(bucket)
-        if last is not None:
-            remaining = self.min_interval_seconds - (self.now_fn() - last)
-            if remaining > 0:
-                self.sleep_fn(remaining)
-        self._last_call_by_bucket[bucket] = self.now_fn()
-
-
 class CrawapiDouyinClient:
     def __init__(
         self,
@@ -52,6 +39,7 @@ class CrawapiDouyinClient:
         keyword_path: str,
         content_portrait_path: str,
         blogger_path: str = "",
+        detail_path: str = "",
         timeout_seconds: float = 60.0,
         default_crawapi_account_ref: str = "",
         default_content_type: str = "视频",
@@ -67,6 +55,7 @@ class CrawapiDouyinClient:
         self.keyword_path = keyword_path.lstrip("/")
         self.content_portrait_path = content_portrait_path.lstrip("/")
         self.blogger_path = blogger_path.lstrip("/")
+        self.detail_path = detail_path.lstrip("/")
         self.timeout_seconds = timeout_seconds
         self.default_crawapi_account_ref = default_crawapi_account_ref
         self.default_content_type = default_content_type
@@ -88,6 +77,9 @@ class CrawapiDouyinClient:
                 "CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH", env, required=True
             ),
             blogger_path=_env("CONTENTFIND_DOUYIN_BLOGGER_PATH", env, required=True),
+            detail_path=_env(
+                "CONTENTFIND_DOUYIN_DETAIL_PATH", env, default="/crawler/dou_yin/detail"
+            ),
             timeout_seconds=float(
                 _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
             ),
@@ -176,6 +168,7 @@ class CrawapiDouyinClient:
             "platform": "douyin",
             "platform_content_id": platform_content_id,
             "platform_content_format": _content_format(self.default_content_type),
+            "play_url": _extract_play_url(item),
             "description": item.get("desc") or item.get("item_title") or "",
             "platform_author_id": platform_author_id,
             "author_display_name": author.get("nickname") or "",
@@ -247,6 +240,41 @@ class CrawapiDouyinClient:
             "age_50_plus_tgi": age_50_tgi,
         }
 
+    def fetch_detail(self, content_id: str) -> dict[str, Any]:
+        data = self._post_json(
+            self.detail_path,
+            {"content_id": str(content_id)},
+            operation="detail",
+            rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
+        )
+        block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+        detail = block.get("data", {}) if isinstance(block.get("data"), dict) else {}
+        statistics = {
+            "digg_count": int(detail.get("like_count") or 0),
+            "comment_count": int(detail.get("comment_count") or 0),
+            "share_count": int(detail.get("share_count") or 0),
+            "collect_count": int(detail.get("collect_count") or 0),
+            "play_count": int(detail.get("play_count") or 0),
+        }
+        topic_list = detail.get("topic_list") or []
+        tags = [t if str(t).startswith("#") else f"#{t}" for t in topic_list if t]
+        video_list = detail.get("video_url_list") or []
+        play_url = video_list[0].get("video_url") if video_list else None
+        publish_ms = detail.get("publish_timestamp")
+        return {
+            "platform": "douyin",
+            "platform_content_id": str(detail.get("channel_content_id") or content_id),
+            "platform_content_url": detail.get("content_link"),
+            "description": detail.get("body_text") or detail.get("title") or "",
+            "platform_author_id": str(detail.get("channel_account_id") or ""),
+            "author_display_name": detail.get("channel_account_name") or "",
+            "statistics": statistics,
+            "tags": tags,
+            "play_url": play_url,
+            "create_time": int(publish_ms) // 1000 if publish_ms else None,
+            "content_metadata_source": "douyin_detail",
+        }
+
     def _post_json(
         self,
         path: str,
@@ -254,84 +282,24 @@ class CrawapiDouyinClient:
         operation: str,
         rate_limit_bucket: str | None = None,
     ) -> dict[str, Any]:
-        if rate_limit_bucket and self.rate_limiter:
-            self.rate_limiter.wait(rate_limit_bucket)
-        url = urljoin(self.base_url, path)
-        try:
-            response = self.http_client.post(
-                url,
-                json=payload,
-                headers={"Content-Type": "application/json"},
-                timeout=self.timeout_seconds,
-            )
-            response.raise_for_status()
-            data = response.json()
-        except httpx.HTTPStatusError as exc:
-            status_code = exc.response.status_code if exc.response is not None else "unknown"
-            if status_code == 429:
-                raise ContentAgentError(
-                    ErrorCode.PLATFORM_RATE_LIMITED,
-                    f"crawapi {operation} failed: rate_limited",
-                    {"operation": operation, "status_code": 429},
-                ) from exc
-            raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc
-        except httpx.HTTPError as exc:
-            raise RuntimeError(f"crawapi {operation} failed: network_error") from exc
-        except ValueError as exc:
-            raise RuntimeError(f"crawapi {operation} failed: bad_json") from exc
-        if not isinstance(data, dict):
-            raise RuntimeError(f"crawapi {operation} failed: bad_response")
-        code = data.get("code")
-        if code is not None and code not in (0, "0"):
-            if _is_rate_limit_business_error(code, data):
-                raise ContentAgentError(
-                    ErrorCode.PLATFORM_RATE_LIMITED,
-                    f"crawapi {operation} failed: rate_limited",
-                    {"operation": operation, "business_code": str(code)},
-                )
-            raise RuntimeError(f"crawapi {operation} failed: business_error")
-        return data
-
-
-def _is_rate_limit_business_error(code: Any, data: dict[str, Any]) -> bool:
-    if str(code) in RATE_LIMIT_BUSINESS_CODES:
-        return True
-    message = str(data.get("msg") or data.get("message") or "").lower()
-    return any(token in message for token in RATE_LIMIT_MESSAGE_TOKENS)
-
-
-def _load_env_file(env_path: str | Path) -> dict[str, str]:
-    path = Path(env_path)
-    if not path.exists():
-        return {}
-    env: dict[str, str] = {}
-    for line in path.read_text(encoding="utf-8").splitlines():
-        stripped = line.strip()
-        if not stripped or stripped.startswith("#") or "=" not in stripped:
-            continue
-        key, value = stripped.split("=", 1)
-        env[key.strip()] = value.strip().strip('"').strip("'")
-    return env
-
-
-def _env(
-    key: str,
-    file_env: dict[str, str],
-    default: str | None = None,
-    required: bool = False,
-) -> str:
-    value = file_env.get(key) or os.getenv(key) or default
-    if required and not value:
-        raise RuntimeError(f"missing required env: {key}")
-    return value or ""
+        return post_crawapi_json(
+            http_client=self.http_client,
+            base_url=self.base_url,
+            path=path,
+            payload=payload,
+            operation=operation,
+            timeout_seconds=self.timeout_seconds,
+            rate_limiter=self.rate_limiter,
+            rate_limit_bucket=rate_limit_bucket,
+            business_codes=RATE_LIMIT_BUSINESS_CODES,
+        )
 
 
-def _optional_positive_int(value: str) -> int | None:
-    try:
-        parsed = int(value)
-    except ValueError:
-        return None
-    return parsed if parsed > 0 else None
+def _extract_play_url(item: dict[str, Any]) -> str | None:
+    video = item.get("video") if isinstance(item.get("video"), dict) else {}
+    play_addr = video.get("play_addr") if isinstance(video.get("play_addr"), dict) else {}
+    url_list = play_addr.get("url_list") or []
+    return str(url_list[0]) if url_list else None
 
 
 def _extract_tags(item: dict[str, Any]) -> list[str]:
@@ -349,30 +317,6 @@ def _extract_tags(item: dict[str, Any]) -> list[str]:
     return list(dict.fromkeys(tags))
 
 
-def _content_format(raw_content_type: str) -> str:
-    if "图文" in raw_content_type:
-        return "image_text"
-    if "文本" in raw_content_type:
-        return "text"
-    if "直播" in raw_content_type:
-        return "live"
-    return "video"
-
-
-def _score_from_statistics(statistics: dict[str, Any]) -> int:
-    digg = int(statistics.get("digg_count") or 0)
-    comment = int(statistics.get("comment_count") or 0)
-    share = int(statistics.get("share_count") or 0)
-    weighted = digg + comment * 3 + share * 4
-    if weighted >= 3000:
-        return 72
-    if weighted >= 1000:
-        return 62
-    if weighted >= 300:
-        return 55
-    return 45
-
-
 def _normalize_age_distribution(age_data: Any) -> list[dict[str, Any]]:
     rows: list[dict[str, Any]] = []
     items = age_data.items() if isinstance(age_data, dict) else []

+ 186 - 0
content_agent/integrations/shipinhao.py

@@ -0,0 +1,186 @@
+"""视频号(shipinhao)接入 client (V3-M1C).
+
+复用 crawapi_http 共享基座(HTTP/限流/env)。search 对暂时性故障(25011/网络/
+超时)按 platform_profiles/shipinhao.json 的口径重试(3 次、退避 1-2-4s),试满
+抛 ContentAgentError 走既有失败通道。归一化输出与抖音同构(canonical 键集合一致)。
+blogger/account_info 上游 blocked,fetch_author_works 返回 [] 不请求、不抛。
+"""
+
+from __future__ import annotations
+
+import re
+import time
+from pathlib import Path
+from typing import Any, Callable
+
+from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations.crawapi_http import (
+    CrawapiTransientError,
+    RateLimiter,
+    _env,
+    _load_env_file,
+    content_format,
+    post_crawapi_json,
+    score_from_statistics,
+)
+
+SEARCH_RATE_LIMIT_BUCKET = "shipinhao_search"
+TRANSIENT_BUSINESS_CODES = {"25011"}
+_TAG_RE = re.compile(r"#([^\s#@((]+)")
+
+
+def _retry_transient(
+    fn: Callable[[], Any],
+    *,
+    attempts: int,
+    backoff_seconds: tuple[int, ...],
+    sleep_fn: Callable[[float], None],
+) -> Any:
+    for attempt in range(attempts):
+        try:
+            return fn()
+        except CrawapiTransientError:
+            if attempt == attempts - 1:
+                raise
+            sleep_fn(backoff_seconds[min(attempt, len(backoff_seconds) - 1)])
+
+
+def _normalize_shipinhao_item(
+    query: dict[str, Any],
+    item: dict[str, Any],
+    index: int,
+    has_more: bool,
+    next_cursor: str,
+) -> dict[str, Any]:
+    title = item.get("title") or ""
+    statistics = {
+        "digg_count": int(item.get("like_count") or 0),
+        "comment_count": int(item.get("comment_count") or 0),
+        "share_count": int(item.get("share_count") or 0),
+        "collect_count": int(item.get("collect_count") or 0),
+        "play_count": int(item.get("play_count") or 0),
+    }
+    topic_list = item.get("topic_list") or []
+    tags = [t if str(t).startswith("#") else f"#{t}" for t in topic_list if t]
+    if not tags:
+        tags = [f"#{m}" for m in _TAG_RE.findall(title)]
+    video_list = item.get("video_url_list") or []
+    play_url = video_list[0].get("video_url") if video_list else None
+    platform_content_id = str(item.get("channel_content_id") or "")
+    platform_author_id = str(item.get("channel_account_id") or "")
+    publish_ms = item.get("publish_timestamp")
+    return {
+        "content_discovery_id": f"{query['search_query_id']}_content_{index:03d}",
+        "search_query_id": query["search_query_id"],
+        "platform": "shipinhao",
+        "platform_content_id": platform_content_id,
+        "platform_content_format": content_format(item.get("content_type") or "video"),
+        "play_url": play_url,
+        "description": title,
+        "platform_author_id": platform_author_id,
+        "author_display_name": item.get("channel_account_name") or "",
+        "statistics": statistics,
+        "tags": list(dict.fromkeys(tags)),
+        "text_extra": [],
+        "create_time": int(publish_ms) // 1000 if publish_ms else None,
+        "has_more": has_more,
+        "next_cursor": next_cursor,
+        "score": score_from_statistics(statistics),
+        "risk_level": "unknown",
+        "pattern_recall": "pattern_recall_pending",
+        "category_or_element_binding": "pattern_recall_pending",
+        "discovery_relation": "derived_from_pattern_demand",
+        "discovery_start_source": query["discovery_start_source"],
+        "previous_discovery_step": "search_query_direct",
+        "content_metadata_source": "shipinhao_keyword_search",
+        "platform_auth_mode": "no_bearer",
+        "platform_raw_payload": {
+            "channel_content_id": platform_content_id,
+            "channel_account_id": platform_author_id,
+        },
+    }
+
+
+class CrawapiShipinhaoClient:
+    def __init__(
+        self,
+        base_url: str,
+        keyword_path: str = "/crawler/shi_pin_hao/keyword",
+        timeout_seconds: float = 60.0,
+        max_results_per_query: int | None = None,
+        max_attempts: int = 3,
+        backoff_seconds: tuple[int, ...] = (1, 2, 4),
+        http_client: Any | None = None,
+        rate_limiter: RateLimiter | None = None,
+        sleep_fn: Callable[[float], None] = time.sleep,
+    ) -> None:
+        import httpx
+
+        self.base_url = base_url.rstrip("/") + "/"
+        self.keyword_path = keyword_path.lstrip("/")
+        self.timeout_seconds = timeout_seconds
+        self.max_results_per_query = max_results_per_query
+        self.max_attempts = max_attempts
+        self.backoff_seconds = backoff_seconds
+        self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
+        self.rate_limiter = rate_limiter
+        self.sleep_fn = sleep_fn
+
+    @classmethod
+    def from_env(cls, env_path: str | Path = ".env") -> "CrawapiShipinhaoClient":
+        env = _load_env_file(env_path)
+        return cls(
+            base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
+            timeout_seconds=float(
+                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
+            ),
+            rate_limiter=RateLimiter(min_interval_seconds=15.0),
+        )
+
+    def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
+        payload = {
+            "keyword": query["search_query"],
+            "cursor": str(query.get("page_cursor") or ""),
+        }
+
+        def _call() -> dict[str, Any]:
+            return post_crawapi_json(
+                http_client=self.http_client,
+                base_url=self.base_url,
+                path=self.keyword_path,
+                payload=payload,
+                operation="keyword_search",
+                timeout_seconds=self.timeout_seconds,
+                rate_limiter=self.rate_limiter,
+                rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
+                business_codes=set(),
+                transient_business_codes=TRANSIENT_BUSINESS_CODES,
+            )
+
+        try:
+            data = _retry_transient(
+                _call,
+                attempts=self.max_attempts,
+                backoff_seconds=self.backoff_seconds,
+                sleep_fn=self.sleep_fn,
+            )
+        except CrawapiTransientError as exc:
+            raise ContentAgentError(
+                ErrorCode.PLATFORM_REQUEST_FAILED,
+                "shipinhao search exhausted after retries",
+                {"operation": "keyword_search", "max_attempts": self.max_attempts},
+            ) from exc
+
+        block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+        items = block.get("data", []) if isinstance(block.get("data"), list) else []
+        has_more = bool(block.get("has_more", False))
+        next_cursor = str(block.get("next_cursor") or "")
+        selected = items[: self.max_results_per_query] if self.max_results_per_query else items
+        return [
+            _normalize_shipinhao_item(query, item, index, has_more, next_cursor)
+            for index, item in enumerate(selected, start=1)
+        ]
+
+    def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
+        # 上游 blogger 接口 blocked(code=25011),不发请求、不抛,游走自然退化。
+        return []

+ 8 - 2
content_agent/run_service.py

@@ -18,6 +18,7 @@ from content_agent.integrations.decode_api import AigcDecodeClient
 from content_agent.integrations.demand_source import DemandSourceService
 from content_agent.integrations.douyin import CrawapiDouyinClient
 from content_agent.integrations.mock_platform import MockPlatformClient
+from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
 from content_agent.integrations.policy_json import JsonPolicyBundleStore
 from content_agent.integrations.query_variant import (
     MissingQueryVariantClient,
@@ -449,7 +450,12 @@ class RunService:
         if platform_mode == "mock":
             return MockPlatformClient()
         if platform_mode == "real":
-            if platform != "douyin":
+            real_clients = {
+                "douyin": CrawapiDouyinClient.from_env,
+                "shipinhao": CrawapiShipinhaoClient.from_env,
+            }
+            builder = real_clients.get(platform)
+            if builder is None:
                 raise ContentAgentError(
                     ErrorCode.INVALID_REQUEST,
                     "unsupported real platform",
@@ -457,7 +463,7 @@ class RunService:
                     status_code=400,
                 )
             try:
-                return CrawapiDouyinClient.from_env()
+                return builder()
             except Exception as exc:
                 raise ContentAgentError(
                     ErrorCode.PLATFORM_CONFIG_MISSING,

+ 81 - 0
tests/test_crawapi_http.py

@@ -0,0 +1,81 @@
+"""V3-M1A: shared crawapi HTTP base unit tests."""
+
+from __future__ import annotations
+
+import httpx
+import pytest
+
+from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations.crawapi_http import (
+    RateLimiter,
+    is_rate_limit_business_error,
+    post_crawapi_json,
+)
+
+
+class FakeHttpClient:
+    def __init__(self, responses):
+        self.responses = list(responses)
+        self.requests = []
+
+    def post(self, url, json, headers, timeout):
+        self.requests.append({"url": url, "json": json})
+        response = self.responses.pop(0)
+        if isinstance(response, Exception):
+            raise response
+        return response
+
+
+def _response(status_code, data):
+    return httpx.Response(
+        status_code, json=data, request=httpx.Request("POST", "http://crawapi.test/x")
+    )
+
+
+def _post(responses, **kwargs):
+    return post_crawapi_json(
+        http_client=FakeHttpClient(responses),
+        base_url="http://crawapi.test/",
+        path="x",
+        payload={},
+        operation="probe",
+        timeout_seconds=60.0,
+        business_codes=kwargs.get("business_codes", set()),
+        rate_limiter=kwargs.get("rate_limiter"),
+        rate_limit_bucket=kwargs.get("rate_limit_bucket"),
+    )
+
+
+def test_rate_limiter_waits_min_interval_between_same_bucket():
+    clock = {"now": 0.0}
+    sleeps: list[float] = []
+    limiter = RateLimiter(
+        min_interval_seconds=12.0,
+        now_fn=lambda: clock["now"],
+        sleep_fn=lambda s: (sleeps.append(s), clock.__setitem__("now", clock["now"] + s)),
+    )
+    limiter.wait("b")
+    limiter.wait("b")
+    assert sleeps == [12.0]
+
+
+def test_http_429_maps_to_platform_rate_limited():
+    with pytest.raises(ContentAgentError) as exc:
+        _post([_response(429, {"msg": "slow down"})])
+    assert exc.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
+
+
+def test_message_token_maps_to_platform_rate_limited():
+    with pytest.raises(ContentAgentError) as exc:
+        _post([_response(200, {"code": 50000, "msg": "请求频繁"})])
+    assert exc.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
+
+
+def test_bad_response_non_dict_raises_runtime_error():
+    with pytest.raises(RuntimeError, match="bad_response"):
+        _post([_response(200, ["not", "a", "dict"])])
+
+
+def test_business_codes_param_classifies_rate_limit():
+    assert is_rate_limit_business_error("30005", {}, business_codes={"30005"}) is True
+    assert is_rate_limit_business_error("30005", {}, business_codes=set()) is False

+ 73 - 0
tests/test_douyin_detail.py

@@ -0,0 +1,73 @@
+"""V3-M1B: douyin play_url extraction + fetch_detail normalization."""
+
+from __future__ import annotations
+
+import httpx
+
+from content_agent.integrations.douyin import CrawapiDouyinClient, _extract_play_url
+
+
+class FakeHttpClient:
+    def __init__(self, responses):
+        self.responses = list(responses)
+
+    def post(self, url, json, headers, timeout):
+        return self.responses.pop(0)
+
+
+def _response(data):
+    return httpx.Response(200, json=data, request=httpx.Request("POST", "http://crawapi.test/x"))
+
+
+def _client(responses):
+    return CrawapiDouyinClient(
+        base_url="http://crawapi.test",
+        keyword_path="/crawler/dou_yin/keyword",
+        content_portrait_path="/crawler/dou_yin/re_dian_bao/video_like_portrait",
+        blogger_path="/crawler/dou_yin/blogger",
+        detail_path="/crawler/dou_yin/detail",
+        http_client=FakeHttpClient(responses),
+    )
+
+
+def test_extract_play_url_present():
+    item = {"video": {"play_addr": {"url_list": ["http://v/a", "http://v/b"]}}}
+    assert _extract_play_url(item) == "http://v/a"
+
+
+def test_extract_play_url_missing_returns_none():
+    assert _extract_play_url({}) is None
+    assert _extract_play_url({"video": {"play_addr": {"url_list": []}}}) is None
+
+
+def test_fetch_detail_maps_canonical_fields():
+    detail_payload = {
+        "code": 0,
+        "data": {
+            "has_more": False,
+            "next_cursor": None,
+            "data": {
+                "channel_content_id": "7522164415848893735",
+                "content_link": "https://www.douyin.com/video/7522164415848893735",
+                "body_text": "原来彩虹真的是圆形的 #治愈系风景 #彩虹",
+                "topic_list": ["治愈系风景", "彩虹", "旅行"],
+                "video_url_list": [{"video_url": "https://www.douyin.com/aweme/v1/play/?video_id=x"}],
+                "channel_account_id": "MS4wLjABAAAA",
+                "channel_account_name": "源Dream",
+                "like_count": 5034215,
+                "comment_count": 121615,
+                "share_count": 2679616,
+                "collect_count": 232359,
+                "publish_timestamp": 1751515440000,
+            },
+        },
+    }
+    result = _client([_response(detail_payload)]).fetch_detail("7522164415848893735")
+    assert result["platform"] == "douyin"
+    assert result["platform_content_id"] == "7522164415848893735"
+    assert result["platform_author_id"] == "MS4wLjABAAAA"
+    assert result["tags"] == ["#治愈系风景", "#彩虹", "#旅行"]
+    assert result["play_url"] == "https://www.douyin.com/aweme/v1/play/?video_id=x"
+    assert result["statistics"]["digg_count"] == 5034215
+    assert result["create_time"] == 1751515440  # ms -> s
+    assert result["content_metadata_source"] == "douyin_detail"

+ 62 - 0
tests/test_dual_channel_normalization.py

@@ -0,0 +1,62 @@
+"""V3-M1D: dual-channel canonical isomorphism + real dispatch."""
+
+from __future__ import annotations
+
+import pytest
+
+from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations.douyin import CrawapiDouyinClient
+from content_agent.integrations.shipinhao import (
+    CrawapiShipinhaoClient,
+    _normalize_shipinhao_item,
+)
+from content_agent.run_service import RunService
+
+_QUERY = {"search_query_id": "q_001", "search_query": "彩虹", "discovery_start_source": "pattern_itemset"}
+
+
+def _douyin_client():
+    return CrawapiDouyinClient(
+        base_url="http://crawapi.test",
+        keyword_path="/k",
+        content_portrait_path="/p",
+        blogger_path="/b",
+        detail_path="/d",
+        http_client=object(),
+    )
+
+
+def test_douyin_and_shipinhao_share_canonical_keys():
+    douyin_item = _douyin_client()._normalize_content_item(
+        _QUERY,
+        {"aweme_id": "a1", "author": {"sec_uid": "u1", "nickname": "n"}, "video": {"play_addr": {"url_list": ["http://v"]}}},
+        1,
+        True,
+        "12",
+    )
+    sph_item = _normalize_shipinhao_item(
+        _QUERY,
+        {"channel_content_id": "c1", "channel_account_id": "acc", "title": "彩虹 #彩虹", "video_url_list": [{"video_url": "http://v"}]},
+        1,
+        True,
+        "12",
+    )
+    assert set(douyin_item) == set(sph_item)
+    assert douyin_item["platform"] == "douyin"
+    assert sph_item["platform"] == "shipinhao"
+
+
+def test_shipinhao_real_dispatch_builds_client(monkeypatch):
+    monkeypatch.setattr(
+        CrawapiShipinhaoClient, "from_env", classmethod(lambda cls: object.__new__(cls))
+    )
+    service = object.__new__(RunService)
+    client = service._platform_client("shipinhao", "real")
+    assert isinstance(client, CrawapiShipinhaoClient)
+
+
+def test_unsupported_real_platform_raises():
+    service = object.__new__(RunService)
+    with pytest.raises(ContentAgentError) as exc:
+        service._platform_client("bilibili", "real")
+    assert exc.value.error_code == ErrorCode.INVALID_REQUEST

+ 107 - 0
tests/test_shipinhao_client.py

@@ -0,0 +1,107 @@
+"""V3-M1C: 视频号 client search/normalization/retry/blocked tests."""
+
+from __future__ import annotations
+
+import httpx
+import pytest
+
+from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
+
+
+class FakeHttpClient:
+    def __init__(self, responses):
+        self.responses = list(responses)
+        self.requests = []
+
+    def post(self, url, json, headers, timeout):
+        self.requests.append({"url": url, "json": json})
+        return self.responses.pop(0)
+
+
+def _response(status_code, data):
+    return httpx.Response(
+        status_code, json=data, request=httpx.Request("POST", "http://crawler.test/x")
+    )
+
+
+def _query():
+    return {
+        "search_query_id": "q_001",
+        "search_query": "彩虹",
+        "discovery_start_source": "pattern_itemset",
+    }
+
+
+def _client(responses):
+    sleeps: list[float] = []
+    client = CrawapiShipinhaoClient(
+        base_url="http://crawler.test",
+        http_client=FakeHttpClient(responses),
+        sleep_fn=sleeps.append,
+    )
+    return client, sleeps
+
+
+_SUCCESS = {
+    "code": 0,
+    "data": {
+        "has_more": True,
+        "next_cursor": 12,
+        "data": [
+            {
+                "channel_content_id": "finderobj_abc",
+                "title": "圆形彩虹 #彩虹 #见者好运",
+                "content_type": "video",
+                "video_url_list": [{"video_url": "https://findermp.video.qq.com/x"}],
+                "channel_account_id": "acc_123",
+                "channel_account_name": "掌上巴彦淖尔",
+                "like_count": 92,
+                "publish_timestamp": 1780904037000,
+            }
+        ],
+    },
+}
+_FAIL_25011 = {"code": 25011, "msg": "视频号接口异常: 获取搜索结果失败", "data": None}
+
+
+def test_shipinhao_search_maps_canonical_fields():
+    client, _ = _client([_response(200, _SUCCESS)])
+    result = client.search(_query())[0]
+    assert result["platform"] == "shipinhao"
+    assert result["platform_content_id"] == "finderobj_abc"
+    assert result["platform_author_id"] == "acc_123"
+    assert result["author_display_name"] == "掌上巴彦淖尔"
+    assert result["tags"] == ["#彩虹", "#见者好运"]
+    assert result["play_url"] == "https://findermp.video.qq.com/x"
+    assert result["statistics"]["digg_count"] == 92
+    assert result["create_time"] == 1780904037  # ms -> s
+    assert result["has_more"] is True
+    assert result["next_cursor"] == "12"
+
+
+def test_shipinhao_search_retries_on_25011_then_succeeds():
+    client, sleeps = _client([_response(200, _FAIL_25011), _response(200, _SUCCESS)])
+    result = client.search(_query())
+    assert len(result) == 1
+    assert sleeps == [1]  # one backoff before the successful retry
+
+
+def test_shipinhao_search_does_not_retry_empty_result():
+    empty = {"code": 0, "data": {"has_more": False, "next_cursor": "", "data": []}}
+    client, sleeps = _client([_response(200, empty)])
+    assert client.search(_query()) == []
+    assert sleeps == []
+
+
+def test_shipinhao_search_raises_after_exhausted():
+    client, sleeps = _client([_response(200, _FAIL_25011) for _ in range(3)])
+    with pytest.raises(ContentAgentError) as exc:
+        client.search(_query())
+    assert exc.value.error_code == ErrorCode.PLATFORM_REQUEST_FAILED
+    assert sleeps == [1, 2]  # backoff before attempts 2 and 3
+
+
+def test_shipinhao_fetch_author_works_blocked_returns_empty():
+    client, _ = _client([])
+    assert client.fetch_author_works({"platform_author_id": "acc_123"}) == []