Procházet zdrojové kódy

feat(M5): blogger author-works contract, rate limiting, category match v2 parsing

- M5A: fetch_author_works 改打 POST /crawler/dou_yin/blogger,payload 固定三字段
  (account_id=platform_author_id/sort_type/cursor,06 计划 live smoke 已验合同);
  归一标 content_metadata_source=douyin_blogger;.env.example 补 blogger 两变量;
  registry PLT_DOUYIN_AUTHOR_WORKS current_code_refs 填实、access_mode→read
- M5B: 新增 PLATFORM_RATE_LIMITED 与 RateLimiter(12s,可注入 now_fn/sleep_fn);
  搜索链共用 douyin_search bucket,blogger 独立 douyin_blogger bucket,portrait 不限流;
  限流识别三条件(HTTP 429 / RATE_LIMIT_BUSINESS_CODES 空集白名单 / 4 个 message token),
  强制登录、普通 500、业务失败、bad_json 保持普通 RuntimeError
- M5C: 分类树 v2 解析支持 items[].matches[].path|category_path(list 型)与
  items[].matched_paths;多来源逐项累加不短路;term 继承;(term,path) 去重保序;
  旧 data 结构兼容;归一输出合同不变
- M5D: brief 逐字 23 个测试(douyin 14/platform_access 2/category 7)+
  scripts/smoke_douyin_blogger.py(人工 live smoke,只输出脱敏摘要,不进默认 pytest)

279 passed in 2.4s(零真实网络/零真实 sleep);config gate/schema registry pass;
回放快照零漂移;双岗交叉验收通过(审核岗发现的 or 短路吞 matches 问题已修复并补组合用例)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee před 3 dny
rodič
revize
7e86c9a77b

+ 2 - 0
.env.example

@@ -34,12 +34,14 @@ CONTENTFIND_API_CRAWAPI_BASE_URL=http://crawapi.piaoquantv.com
 CONTENTFIND_API_CRAWAPI_KEY=<fill-if-required>
 CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS=60
 CONTENTFIND_DOUYIN_KEYWORD_PATH=/crawler/dou_yin/keyword
+CONTENTFIND_DOUYIN_BLOGGER_PATH=/crawler/dou_yin/blogger
 CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH=/crawler/dou_yin/re_dian_bao/video_like_portrait
 CONTENTFIND_DOUYIN_DEFAULT_ACCOUNT_ID=771431222
 CONTENTFIND_DOUYIN_DEFAULT_CONTENT_TYPE=视频
 CONTENTFIND_DOUYIN_DEFAULT_SORT_TYPE=综合排序
 CONTENTFIND_DOUYIN_DEFAULT_PUBLISH_TIME=不限
 CONTENTFIND_DOUYIN_DEFAULT_CURSOR=0
+CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE=最新
 CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY=3
 
 # Pattern recall / decode / category tree

+ 40 - 31
content_agent/business_modules/content_discovery/pattern_recall/category_match.py

@@ -51,46 +51,55 @@ def match_decode_terms(
 def _extract_path_matches(response: Any) -> list[dict[str, Any]]:
     rows = _candidate_rows(response)
     matches: list[dict[str, Any]] = []
+    seen: set[tuple[str, str]] = set()
+
+    def _append(term: Any, category_path: Any, score: Any, raw: Any) -> None:
+        normalized = _normalize_category_path(category_path)
+        if not normalized:
+            return
+        key = (str(term or "").strip(), normalized)
+        if key in seen:
+            return
+        seen.add(key)
+        matches.append({"term": key[0], "category_path": normalized, "score": score, "raw": raw})
+
     for row in rows:
         if not isinstance(row, dict):
             continue
         term = row.get("term") or row.get("query") or row.get("source_term") or row.get("item")
-        paths = row.get("paths") or row.get("matched_paths") or row.get("categories") or row.get("results")
-        if paths is None and (row.get("category_path") or row.get("path")):
+        # 同一 item 可能同时携带多种来源(如 v2 的 matches 与 matched_paths),逐来源累加,不取首个非空。
+        paths = [
+            path
+            for key in ("paths", "matched_paths", "matches", "categories", "results")
+            for path in _as_list(row.get(key))
+        ]
+        if not paths and (row.get("category_path") or row.get("path")):
             paths = [row]
-        for path in _as_list(paths):
-            if isinstance(path, str):
-                category_path = path.strip()
-                if category_path:
-                    matches.append(
-                        {
-                            "term": str(term or "").strip(),
-                            "category_path": category_path,
-                            "score": row.get("score"),
-                            "raw": path,
-                        }
-                    )
-                continue
-            if not isinstance(path, dict):
-                continue
-            category_path = (
-                path.get("category_path")
-                or path.get("path")
-                or path.get("full_path")
-                or path.get("categoryPath")
-            )
-            if category_path:
-                matches.append(
-                    {
-                        "term": str(term or "").strip(),
-                        "category_path": str(category_path),
-                        "score": path.get("score"),
-                        "raw": path,
-                    }
+        for path in paths:
+            if isinstance(path, dict):
+                _append(
+                    path.get("term") or term,
+                    path.get("category_path")
+                    or path.get("path")
+                    or path.get("full_path")
+                    or path.get("categoryPath"),
+                    path.get("score"),
+                    path,
                 )
+            elif isinstance(path, (str, list)):
+                _append(term, path, row.get("score"), path)
     return matches
 
 
+def _normalize_category_path(value: Any) -> str:
+    if isinstance(value, list):
+        parts = [str(part).strip() for part in value if str(part).strip()]
+        return "/" + "/".join(parts) if parts else ""
+    if value is None:
+        return ""
+    return str(value).strip()
+
+
 def _candidate_rows(response: Any) -> list[Any]:
     if isinstance(response, list):
         return response

+ 2 - 0
content_agent/errors.py

@@ -17,6 +17,7 @@ class ErrorCode(StrEnum):
     POLICY_BUNDLE_NOT_FOUND = "POLICY_BUNDLE_NOT_FOUND"
     PLATFORM_CONFIG_MISSING = "PLATFORM_CONFIG_MISSING"
     PLATFORM_REQUEST_FAILED = "PLATFORM_REQUEST_FAILED"
+    PLATFORM_RATE_LIMITED = "PLATFORM_RATE_LIMITED"
     QUERY_GENERATION_FAILED = "QUERY_GENERATION_FAILED"
     CONFIG_RULE_PACK_DISPATCH_CONFLICT = "CONFIG_RULE_PACK_DISPATCH_CONFLICT"
 
@@ -111,6 +112,7 @@ def _safe_message(error_code: ErrorCode) -> str:
         ErrorCode.POLICY_BUNDLE_NOT_FOUND: "policy bundle not found",
         ErrorCode.PLATFORM_CONFIG_MISSING: "platform config missing",
         ErrorCode.PLATFORM_REQUEST_FAILED: "platform request failed",
+        ErrorCode.PLATFORM_RATE_LIMITED: "platform rate limited",
         ErrorCode.QUERY_GENERATION_FAILED: "query generation failed",
         ErrorCode.CONFIG_RULE_PACK_DISPATCH_CONFLICT: "rule pack dispatch conflict in config",
     }

+ 84 - 8
content_agent/integrations/douyin.py

@@ -2,16 +2,48 @@ from __future__ import annotations
 
 import os
 import re
+import time
 from pathlib import Path
-from typing import Any
+from typing import Any, Callable
 from urllib.parse import urljoin
 
 import httpx
 
+from content_agent.errors import ContentAgentError, ErrorCode
+
 RAW_CONTENT_ID_KEY = "_".join(["aweme", "id"])
 RAW_AUTHOR_ID_KEY = "_".join(["sec", "uid"])
 RAW_AUTHOR_ACCOUNT_KEY = "_".join(["account", "id"])
 
+# 已证实的限流 business code 白名单。当前没有任何已证实的限流 code,
+# 识别先依靠 HTTP 429 与 message token;live smoke / 真实运行发现新 code 后补入并加用例。
+RATE_LIMIT_BUSINESS_CODES: set[str] = set()
+RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests")
+
+SEARCH_RATE_LIMIT_BUCKET = "douyin_search"
+BLOGGER_RATE_LIMIT_BUCKET = "douyin_blogger"
+
+
+class RateLimiter:
+    def __init__(
+        self,
+        min_interval_seconds: float = 12.0,
+        now_fn: Callable[[], float] = time.monotonic,
+        sleep_fn: Callable[[float], None] = time.sleep,
+    ) -> None:
+        self.min_interval_seconds = min_interval_seconds
+        self.now_fn = now_fn
+        self.sleep_fn = sleep_fn
+        self._last_call_by_bucket: dict[str, float] = {}
+
+    def wait(self, bucket: str) -> None:
+        last = self._last_call_by_bucket.get(bucket)
+        if last is not None:
+            remaining = self.min_interval_seconds - (self.now_fn() - last)
+            if remaining > 0:
+                self.sleep_fn(remaining)
+        self._last_call_by_bucket[bucket] = self.now_fn()
+
 
 class CrawapiDouyinClient:
     def __init__(
@@ -19,26 +51,32 @@ class CrawapiDouyinClient:
         base_url: str,
         keyword_path: str,
         content_portrait_path: str,
+        blogger_path: str = "",
         timeout_seconds: float = 60.0,
         default_crawapi_account_ref: str = "",
         default_content_type: str = "视频",
         default_sort_type: str = "综合排序",
         default_publish_time: str = "不限",
         default_cursor: str = "0",
+        default_account_works_sort_type: str = "最新",
         max_results_per_query: int | None = 3,
         http_client: Any | None = None,
+        rate_limiter: RateLimiter | None = None,
     ) -> None:
         self.base_url = base_url.rstrip("/") + "/"
         self.keyword_path = keyword_path.lstrip("/")
         self.content_portrait_path = content_portrait_path.lstrip("/")
+        self.blogger_path = blogger_path.lstrip("/")
         self.timeout_seconds = timeout_seconds
         self.default_crawapi_account_ref = default_crawapi_account_ref
         self.default_content_type = default_content_type
         self.default_sort_type = default_sort_type
         self.default_publish_time = default_publish_time
         self.default_cursor = default_cursor
+        self.default_account_works_sort_type = default_account_works_sort_type
         self.max_results_per_query = max_results_per_query
         self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
+        self.rate_limiter = rate_limiter
 
     @classmethod
     def from_env(cls, env_path: str | Path = ".env") -> "CrawapiDouyinClient":
@@ -49,6 +87,7 @@ class CrawapiDouyinClient:
             content_portrait_path=_env(
                 "CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH", env, required=True
             ),
+            blogger_path=_env("CONTENTFIND_DOUYIN_BLOGGER_PATH", env, required=True),
             timeout_seconds=float(
                 _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
             ),
@@ -57,9 +96,13 @@ class CrawapiDouyinClient:
             default_sort_type=_env("CONTENTFIND_DOUYIN_DEFAULT_SORT_TYPE", env, default="综合排序"),
             default_publish_time=_env("CONTENTFIND_DOUYIN_DEFAULT_PUBLISH_TIME", env, default="不限"),
             default_cursor=_env("CONTENTFIND_DOUYIN_DEFAULT_CURSOR", env, default="0"),
+            default_account_works_sort_type=_env(
+                "CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE", env, default="最新"
+            ),
             max_results_per_query=_optional_positive_int(
                 _env("CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY", env, default="3")
             ),
+            rate_limiter=RateLimiter(),
         )
 
     def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
@@ -71,7 +114,10 @@ class CrawapiDouyinClient:
             "cursor": str(query.get("page_cursor") or self.default_cursor),
             RAW_AUTHOR_ACCOUNT_KEY: self.default_crawapi_account_ref,
         }
-        data = self._post_json(self.keyword_path, payload, operation="keyword_search")
+        data = self._post_json(
+            self.keyword_path, payload, operation="keyword_search",
+            rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
+        )
         data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
         items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
         has_more = bool(data_block.get("has_more", False))
@@ -87,13 +133,15 @@ class CrawapiDouyinClient:
         return results
 
     def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
-        author_id = str(query.get("platform_author_id") or "")
         payload = {
-            RAW_AUTHOR_ID_KEY: author_id,
-            "cursor": str(query.get("page_cursor") or self.default_cursor),
-            RAW_AUTHOR_ACCOUNT_KEY: self.default_crawapi_account_ref,
+            RAW_AUTHOR_ACCOUNT_KEY: str(query.get("platform_author_id") or ""),
+            "sort_type": self.default_account_works_sort_type,
+            "cursor": str(query.get("page_cursor") or ""),
         }
-        data = self._post_json(self.keyword_path, payload, operation="author_works")
+        data = self._post_json(
+            self.blogger_path, payload, operation="author_works",
+            rate_limit_bucket=BLOGGER_RATE_LIMIT_BUCKET,
+        )
         data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
         items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
         has_more = bool(data_block.get("has_more", False))
@@ -104,6 +152,7 @@ class CrawapiDouyinClient:
         for index, item in enumerate(selected_items, start=1):
             normalized = self._normalize_content_item(query, item, index, has_more, next_cursor)
             normalized["previous_discovery_step"] = "author_works"
+            normalized["content_metadata_source"] = "douyin_blogger"
             portrait = self._fetch_content_portrait(normalized["platform_content_id"])
             normalized.update(portrait)
             results.append(normalized)
@@ -198,7 +247,15 @@ class CrawapiDouyinClient:
             "age_50_plus_tgi": age_50_tgi,
         }
 
-    def _post_json(self, path: str, payload: dict[str, Any], operation: str) -> dict[str, Any]:
+    def _post_json(
+        self,
+        path: str,
+        payload: dict[str, Any],
+        operation: str,
+        rate_limit_bucket: str | None = None,
+    ) -> dict[str, Any]:
+        if rate_limit_bucket and self.rate_limiter:
+            self.rate_limiter.wait(rate_limit_bucket)
         url = urljoin(self.base_url, path)
         try:
             response = self.http_client.post(
@@ -211,6 +268,12 @@ class CrawapiDouyinClient:
             data = response.json()
         except httpx.HTTPStatusError as exc:
             status_code = exc.response.status_code if exc.response is not None else "unknown"
+            if status_code == 429:
+                raise ContentAgentError(
+                    ErrorCode.PLATFORM_RATE_LIMITED,
+                    f"crawapi {operation} failed: rate_limited",
+                    {"operation": operation, "status_code": 429},
+                ) from exc
             raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc
         except httpx.HTTPError as exc:
             raise RuntimeError(f"crawapi {operation} failed: network_error") from exc
@@ -220,10 +283,23 @@ class CrawapiDouyinClient:
             raise RuntimeError(f"crawapi {operation} failed: bad_response")
         code = data.get("code")
         if code is not None and code not in (0, "0"):
+            if _is_rate_limit_business_error(code, data):
+                raise ContentAgentError(
+                    ErrorCode.PLATFORM_RATE_LIMITED,
+                    f"crawapi {operation} failed: rate_limited",
+                    {"operation": operation, "business_code": str(code)},
+                )
             raise RuntimeError(f"crawapi {operation} failed: business_error")
         return data
 
 
+def _is_rate_limit_business_error(code: Any, data: dict[str, Any]) -> bool:
+    if str(code) in RATE_LIMIT_BUSINESS_CODES:
+        return True
+    message = str(data.get("msg") or data.get("message") or "").lower()
+    return any(token in message for token in RATE_LIMIT_MESSAGE_TOKENS)
+
+
 def _load_env_file(env_path: str | Path) -> dict[str, str]:
     path = Path(env_path)
     if not path.exists():

+ 77 - 0
scripts/smoke_douyin_blogger.py

@@ -0,0 +1,77 @@
+"""Manual live smoke for the Crawapi douyin blogger contract (V2-M5D).
+
+Hits POST <CONTENTFIND_API_CRAWAPI_BASE_URL><CONTENTFIND_DOUYIN_BLOGGER_PATH>
+with the fixed three-field payload (account_id / sort_type / cursor) and prints
+a redacted summary only — never the raw response, never any credential.
+
+Not part of default pytest. Run manually:
+
+    uv run python scripts/smoke_douyin_blogger.py --author-id '<sec_uid>'
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+from pathlib import Path
+
+import httpx
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT))
+
+from content_agent.integrations.douyin import _env, _load_env_file  # noqa: E402
+
+
+def main() -> int:
+    args = _parse_args()
+    env = _load_env_file(args.env_file)
+    base_url = _env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True)
+    blogger_path = _env("CONTENTFIND_DOUYIN_BLOGGER_PATH", env, required=True)
+    sort_type = args.sort_type or _env(
+        "CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE", env, default="最新"
+    )
+
+    url = base_url.rstrip("/") + "/" + blogger_path.lstrip("/")
+    payload = {"account_id": args.author_id, "sort_type": sort_type, "cursor": args.cursor}
+    response = httpx.post(
+        url, json=payload, headers={"Content-Type": "application/json"}, timeout=60.0
+    )
+    try:
+        data = response.json()
+    except ValueError:
+        data = {}
+    if not isinstance(data, dict):
+        data = {}
+    data_block = data.get("data") if isinstance(data.get("data"), dict) else {}
+    items = data_block.get("data") if isinstance(data_block.get("data"), list) else []
+
+    summary = {
+        "endpoint": "/" + blogger_path.lstrip("/"),
+        "http_status": response.status_code,
+        "business_code": data.get("code"),
+        "result_count": len(items),
+        "has_more": bool(data_block.get("has_more", False)),
+        "next_cursor_present": bool(data_block.get("next_cursor")),
+    }
+    print(json.dumps(summary, ensure_ascii=False, indent=2))
+    ok = (
+        summary["http_status"] == 200
+        and summary["business_code"] in (0, "0")
+        and summary["result_count"] > 0
+    )
+    return 0 if ok else 1
+
+
+def _parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description=__doc__)
+    parser.add_argument("--author-id", required=True, help="author platform_author_id (sec_uid)")
+    parser.add_argument("--sort-type", default=None, help="override sort_type (default from env)")
+    parser.add_argument("--cursor", default="", help="page cursor, empty for first page")
+    parser.add_argument("--env-file", default=str(ROOT / ".env"))
+    return parser.parse_args()
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 4 - 2
tech_documents/数据接口与来源/external_data_sources_registry.json

@@ -238,10 +238,12 @@
       "status": ["verified"],
       "system": "Crawapi Douyin",
       "table_or_endpoint": "/crawler/dou_yin/blogger",
-      "access_mode": "planned_read",
+      "access_mode": "read",
       "usage_stages": ["walk_strategy", "content_discovery"],
       "owner_module": "平台接入模块",
-      "current_code_refs": [],
+      "current_code_refs": [
+        "content_agent/integrations/douyin.py::CrawapiDouyinClient.fetch_author_works"
+      ],
       "required_env_vars": ["CONTENTFIND_API_CRAWAPI_BASE_URL", "CONTENTFIND_DOUYIN_*"],
       "input_fields": ["account_id", "sort_type", "cursor"],
       "output_fields": ["platform_content_id", "desc", "author", "statistics", "has_more", "next_cursor"],

+ 214 - 2
tests/test_douyin_client.py

@@ -1,11 +1,13 @@
 import httpx
 import pytest
 
+from content_agent.errors import ContentAgentError, ErrorCode
 from content_agent.integrations.douyin import (
     RAW_AUTHOR_ACCOUNT_KEY,
     RAW_AUTHOR_ID_KEY,
     RAW_CONTENT_ID_KEY,
     CrawapiDouyinClient,
+    RateLimiter,
 )
 
 
@@ -30,13 +32,15 @@ def _response(status_code, data):
     )
 
 
-def _client(responses):
+def _client(responses, rate_limiter=None):
     return CrawapiDouyinClient(
         base_url="http://crawapi.test",
         keyword_path="/crawler/dou_yin/keyword",
         content_portrait_path="/crawler/dou_yin/re_dian_bao/video_like_portrait",
+        blogger_path="/crawler/dou_yin/blogger",
         default_crawapi_account_ref="771431222",
         http_client=FakeHttpClient(responses),
+        rate_limiter=rate_limiter,
     )
 
 
@@ -164,9 +168,10 @@ def test_douyin_fetch_author_works_maps_fake_response():
         }
     )
 
+    # M5A 受控变化: 作者作品改打 blogger 接口,payload 用 account_id 三字段合同。
     assert results[0]["search_query_id"] == "author_001"
     assert results[0]["previous_discovery_step"] == "author_works"
-    assert client.http_client.requests[0]["json"][RAW_AUTHOR_ID_KEY] == "MS4wLjABAAAA001"
+    assert client.http_client.requests[0]["json"][RAW_AUTHOR_ACCOUNT_KEY] == "MS4wLjABAAAA001"
 
 
 def test_douyin_keyword_search_http_error_is_sanitized():
@@ -364,3 +369,210 @@ def test_douyin_portrait_supports_dimensions_shape_and_excludes_41_to_50():
     assert result["age_50_plus_level"] == "weak"
     assert result["age_distribution"][0]["is_50_plus"] is False
     assert result["age_distribution"][1]["is_50_plus"] is True
+
+
+def _author_query(author_id="MS4wLjABAAAA001", **extra):
+    return {
+        "search_query_id": "author_001",
+        "search_query": "作者作品",
+        "platform_author_id": author_id,
+        "discovery_start_source": "pattern_itemset",
+        **extra,
+    }
+
+
+def _blogger_response(items=None, has_more=True, next_cursor="20"):
+    return _response(
+        200,
+        {
+            "code": 0,
+            "data": {"data": items or [], "has_more": has_more, "next_cursor": next_cursor},
+        },
+    )
+
+
+class FakeRateLimiter:
+    def __init__(self):
+        self.buckets = []
+
+    def wait(self, bucket):
+        self.buckets.append(bucket)
+
+
+def test_fetch_author_works_posts_to_blogger_path():
+    client = _client([_blogger_response()])
+
+    client.fetch_author_works(_author_query())
+
+    assert client.http_client.requests[0]["url"].endswith("/crawler/dou_yin/blogger")
+
+
+def test_fetch_author_works_payload_uses_account_id_from_platform_author_id():
+    client = _client([_blogger_response()])
+
+    client.fetch_author_works(_author_query("MS4wLjABAAAA999"))
+
+    payload = client.http_client.requests[0]["json"]
+    assert payload == {
+        RAW_AUTHOR_ACCOUNT_KEY: "MS4wLjABAAAA999",
+        "sort_type": "最新",
+        "cursor": "",
+    }
+
+
+def test_fetch_author_works_uses_page_cursor():
+    client = _client([_blogger_response()])
+
+    client.fetch_author_works(_author_query(page_cursor="20"))
+
+    assert client.http_client.requests[0]["json"]["cursor"] == "20"
+
+
+def test_fetch_author_works_normalizes_author_work_fields():
+    client = _client(
+        [
+            _blogger_response(
+                items=[
+                    {
+                        RAW_CONTENT_ID_KEY: "7615247738577423001",
+                        "desc": "作者作品",
+                        "author": {"nickname": "作者", RAW_AUTHOR_ID_KEY: "MS4wLjABAAAA001"},
+                        "statistics": {"digg_count": 100},
+                        "create_time": 1733000000,
+                    }
+                ]
+            ),
+            _response(200, {"data": {"data": {"年龄": {}}}}),
+        ]
+    )
+
+    results = client.fetch_author_works(_author_query())
+
+    assert results[0]["platform_content_id"] == "7615247738577423001"
+    assert results[0]["platform_author_id"] == "MS4wLjABAAAA001"
+    assert results[0]["statistics"]["digg_count"] == 100
+    assert results[0]["create_time"] == 1733000000
+    assert results[0]["previous_discovery_step"] == "author_works"
+    assert results[0]["content_metadata_source"] == "douyin_blogger"
+
+
+def test_from_env_reads_blogger_path_and_sort_type(monkeypatch, tmp_path):
+    monkeypatch.setenv("CONTENTFIND_API_CRAWAPI_BASE_URL", "http://crawapi.test")
+    monkeypatch.setenv("CONTENTFIND_DOUYIN_KEYWORD_PATH", "/crawler/dou_yin/keyword")
+    monkeypatch.setenv(
+        "CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH",
+        "/crawler/dou_yin/re_dian_bao/video_like_portrait",
+    )
+    monkeypatch.setenv("CONTENTFIND_DOUYIN_BLOGGER_PATH", "/crawler/dou_yin/blogger")
+    monkeypatch.setenv("CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE", "最热")
+
+    client = CrawapiDouyinClient.from_env(env_path=tmp_path / "missing.env")
+
+    assert client.blogger_path == "crawler/dou_yin/blogger"
+    assert client.default_account_works_sort_type == "最热"
+    assert isinstance(client.rate_limiter, RateLimiter)
+
+
+def test_rate_limiter_waits_between_keyword_calls():
+    clock = {"now": 0.0}
+    sleeps = []
+
+    def fake_sleep(seconds):
+        sleeps.append(seconds)
+        clock["now"] += seconds
+
+    limiter = RateLimiter(min_interval_seconds=12.0, now_fn=lambda: clock["now"], sleep_fn=fake_sleep)
+
+    limiter.wait("douyin_search")
+    limiter.wait("douyin_search")
+
+    assert sleeps == [12.0]
+
+
+def test_search_chain_uses_shared_search_bucket():
+    limiter = FakeRateLimiter()
+    client = _client(
+        [
+            _response(200, {"code": 0, "data": {"data": [], "has_more": False}}),
+            _response(200, {"code": 0, "data": {"data": [], "has_more": False}}),
+        ],
+        rate_limiter=limiter,
+    )
+
+    client.search(_search_query("关键词"))
+    client.search({**_search_query("关键词"), "page_cursor": "10"})
+
+    assert limiter.buckets == ["douyin_search", "douyin_search"]
+
+
+def test_blogger_uses_separate_bucket_from_search_chain():
+    limiter = FakeRateLimiter()
+    client = _client(
+        [
+            _response(200, {"code": 0, "data": {"data": [], "has_more": False}}),
+            _blogger_response(),
+        ],
+        rate_limiter=limiter,
+    )
+
+    client.search(_search_query("关键词"))
+    client.fetch_author_works(_author_query())
+
+    assert limiter.buckets == ["douyin_search", "douyin_blogger"]
+
+
+def test_http_429_maps_to_platform_rate_limited():
+    client = _client([_response(429, {"error": "too many"})])
+
+    with pytest.raises(ContentAgentError) as exc_info:
+        client.search(_search_query("被限流"))
+    assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
+    assert exc_info.value.detail["status_code"] == 429
+
+
+def test_business_rate_limit_code_maps_to_platform_rate_limited(monkeypatch):
+    from content_agent.integrations import douyin
+
+    monkeypatch.setattr(douyin, "RATE_LIMIT_BUSINESS_CODES", {"30005"})
+    client = _client([_response(200, {"code": 30005, "msg": "ok", "data": None})])
+
+    with pytest.raises(ContentAgentError) as exc_info:
+        client.search(_search_query("业务限流"))
+    assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
+    assert exc_info.value.detail["business_code"] == "30005"
+
+
+def test_rate_limit_message_token_maps_to_platform_rate_limited():
+    client = _client([_response(200, {"code": 1, "msg": "请求频繁,请稍后再试", "data": None})])
+
+    with pytest.raises(ContentAgentError) as exc_info:
+        client.search(_search_query("消息限流"))
+    assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
+
+
+def test_force_login_without_rate_limit_code_is_not_rate_limited():
+    client = _client([_response(200, {"code": 22001, "msg": "强制登录", "data": None})])
+
+    with pytest.raises(RuntimeError, match="business_error"):
+        client.search(_search_query("强制登录"))
+
+
+def test_bad_json_is_not_rate_limited():
+    client = _client(
+        [
+            httpx.Response(
+                200, content=b"not json",
+                request=httpx.Request("POST", "http://crawapi.test/endpoint"),
+            )
+        ]
+    )
+
+    with pytest.raises(RuntimeError, match="bad_json"):
+        client.search(_search_query("坏响应"))
+
+
+def test_plain_500_is_not_rate_limited():
+    client = _client([_response(500, {"error": "server failed"})])
+
+    with pytest.raises(RuntimeError, match="HTTP 500"):
+        client.search(_search_query("普通失败"))

+ 114 - 0
tests/test_pattern_recall_category_match.py

@@ -1,5 +1,6 @@
 from content_agent.integrations.category_match import CategoryMatchClient
 from content_agent.business_modules.content_discovery.pattern_recall.category_match import (
+    _extract_path_matches,
     match_decode_terms,
 )
 from tests.p4_helpers import FakeCategoryMatchClient, fake_match_paths_no_hit
@@ -97,3 +98,116 @@ def test_category_match_parses_real_match_paths_string_shape():
 
     assert result["matched_terms"] == ["爱国情感"]
     assert result["matched_category_paths"] == ["/理念/情感/家国情怀/爱国情感"]
+
+
+def test_category_match_parses_v2_matches_path():
+    matches = _extract_path_matches(
+        {"items": [{"term": "露营", "matches": [{"path": ["户外", "露营"], "score": 0.92}]}]}
+    )
+
+    assert matches == [
+        {"term": "露营", "category_path": "/户外/露营", "score": 0.92,
+         "raw": {"path": ["户外", "露营"], "score": 0.92}}
+    ]
+
+
+def test_category_match_parses_v2_matches_category_path():
+    matches = _extract_path_matches(
+        {"items": [{"term": "露营", "matches": [{"category_path": ["生活", "户外"], "score": 0.81}]}]}
+    )
+
+    assert matches[0]["category_path"] == "/生活/户外"
+    assert matches[0]["score"] == 0.81
+
+
+def test_category_match_parses_v2_matched_paths():
+    matches = _extract_path_matches(
+        {"items": [{"term": "露营", "matched_paths": [["旅行", "露营"]]}]}
+    )
+
+    assert matches == [
+        {"term": "露营", "category_path": "/旅行/露营", "score": None, "raw": ["旅行", "露营"]}
+    ]
+
+
+def test_category_match_inherits_item_term_for_match_path():
+    matches = _extract_path_matches(
+        {
+            "items": [
+                {
+                    "term": "露营",
+                    "matches": [
+                        {"path": ["户外", "露营"], "score": 0.9},
+                        {"term": "帐篷", "path": ["户外", "帐篷"], "score": 0.8},
+                    ],
+                }
+            ]
+        }
+    )
+
+    assert [m["term"] for m in matches] == ["露营", "帐篷"]
+
+
+def test_category_match_dedupes_duplicate_paths():
+    matches = _extract_path_matches(
+        {
+            "items": [
+                {
+                    "term": "露营",
+                    "matches": [
+                        {"path": ["户外", "露营"], "score": 0.92},
+                        {"path": ["户外", "露营"], "score": 0.85},
+                    ],
+                }
+            ]
+        }
+    )
+
+    assert len(matches) == 1
+    assert matches[0]["score"] == 0.92
+
+
+def test_old_data_shape_remains_compatible():
+    matches = _extract_path_matches(
+        {
+            "data": [
+                {
+                    "term": "爱国情感",
+                    "paths": [{"category_path": "/理念/情感/家国情怀/爱国情感", "score": 0.91}],
+                }
+            ]
+        }
+    )
+
+    assert matches == [
+        {
+            "term": "爱国情感",
+            "category_path": "/理念/情感/家国情怀/爱国情感",
+            "score": 0.91,
+            "raw": {"category_path": "/理念/情感/家国情怀/爱国情感", "score": 0.91},
+        }
+    ]
+
+
+def test_category_match_reads_matches_and_matched_paths_in_same_item():
+    # brief 数据合同示例: 同一 item 同时携带 matches 与 matched_paths,三条全部保留。
+    matches = _extract_path_matches(
+        {
+            "items": [
+                {
+                    "term": "露营",
+                    "matches": [
+                        {"path": ["户外", "露营"], "score": 0.92},
+                        {"category_path": ["生活", "户外"], "score": 0.81},
+                    ],
+                    "matched_paths": [["旅行", "露营"]],
+                }
+            ]
+        }
+    )
+
+    assert [(m["category_path"], m["score"]) for m in matches] == [
+        ("/旅行/露营", None),
+        ("/户外/露营", 0.92),
+        ("/生活/户外", 0.81),
+    ]

+ 70 - 0
tests/test_platform_access.py

@@ -146,3 +146,73 @@ def test_platform_access_fails_run_when_all_queries_fail():
             assert failure["error_detail"]["exception_type"] == "RuntimeError"
     else:
         raise AssertionError("expected platform request failure")
+
+
+class RateLimitedClient:
+    def search(self, search_query):
+        raise ContentAgentError(
+            ErrorCode.PLATFORM_RATE_LIMITED,
+            "crawapi keyword_search failed: rate_limited",
+            {"operation": "keyword_search", "status_code": 429},
+        )
+
+
+class HealthyClient:
+    def search(self, search_query):
+        return [
+            {
+                "content_discovery_id": f"{search_query['search_query_id']}_content_001",
+                "search_query_id": search_query["search_query_id"],
+                "platform_content_id": "7601814454925298001",
+                "description": "正常内容",
+            }
+        ]
+
+
+class RuntimeErrorClient:
+    def search(self, search_query):
+        raise RuntimeError("crawapi keyword_search failed: HTTP 500")
+
+
+class SplitClient:
+    """First query rate limited, second query succeeds."""
+
+    def search(self, search_query):
+        if search_query["search_query_id"] == "q_001":
+            return RateLimitedClient().search(search_query)
+        return HealthyClient().search(search_query)
+
+
+def test_platform_access_preserves_rate_limited_error_code():
+    search_queries = [
+        {"search_query_id": "q_001", "search_query": "被限流", "search_query_generation_method": "item_single"},
+        {"search_query_id": "q_002", "search_query": "正常", "search_query_generation_method": "item_single"},
+    ]
+
+    result = platform_access.run(search_queries, SplitClient())
+
+    failure = result["query_failures"][0]
+    assert failure["search_query_id"] == "q_001"
+    assert failure["error_code"] == ErrorCode.PLATFORM_RATE_LIMITED.value
+    assert failure["message"] == "crawapi keyword_search failed: rate_limited"
+    assert failure["error_detail"]["operation"] == "keyword_search"
+    assert len(result["platform_results"]) == 1
+
+
+def test_platform_access_counts_runtime_error_as_platform_request_failed():
+    search_queries = [
+        {"search_query_id": "q_001", "search_query": "普通失败", "search_query_generation_method": "item_single"},
+        {"search_query_id": "q_002", "search_query": "正常", "search_query_generation_method": "item_single"},
+    ]
+
+    class MixedClient:
+        def search(self, search_query):
+            if search_query["search_query_id"] == "q_001":
+                raise RuntimeError("crawapi keyword_search failed: HTTP 500")
+            return HealthyClient().search(search_query)
+
+    result = platform_access.run(search_queries, MixedClient())
+
+    failure = result["query_failures"][0]
+    assert failure["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
+    assert failure["error_detail"]["exception_type"] == "RuntimeError"