| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- import httpx
- import pytest
- from content_agent.errors import ContentAgentError, ErrorCode
- from content_agent.integrations.douyin import (
- RAW_AUTHOR_ACCOUNT_KEY,
- RAW_AUTHOR_ID_KEY,
- RAW_CONTENT_ID_KEY,
- CrawapiDouyinClient,
- RateLimiter,
- )
- class FakeHttpClient:
- def __init__(self, responses):
- self.responses = list(responses)
- self.requests = []
- def post(self, url, json, headers, timeout):
- self.requests.append({"url": url, "json": json, "headers": headers, "timeout": timeout})
- response = self.responses.pop(0)
- if isinstance(response, Exception):
- raise response
- return response
- def _response(status_code, data):
- return httpx.Response(
- status_code,
- json=data,
- request=httpx.Request("POST", "http://crawapi.test/endpoint"),
- )
- def _client(responses, rate_limiter=None):
- return CrawapiDouyinClient(
- base_url="http://crawapi.test",
- keyword_path="/crawler/dou_yin/keyword",
- blogger_path="/crawler/dou_yin/blogger",
- default_crawapi_account_ref="771431222",
- http_client=FakeHttpClient(responses),
- rate_limiter=rate_limiter,
- )
- def _search_query(text="早上好祝福视频"):
- return {
- "search_query_id": "q_001",
- "search_query": text,
- "discovery_start_source": "pattern_itemset",
- }
- def test_douyin_keyword_search_maps_content_fields():
- client = _client(
- [
- _response(
- 200,
- {
- "data": {
- "data": [
- {
- RAW_CONTENT_ID_KEY: "7615247738577423622",
- "desc": "早睡早起早上好",
- "author": {
- "nickname": "让你心情变浅的兔子",
- RAW_AUTHOR_ID_KEY: "MS4wLjABAAAAdYc",
- },
- "statistics": {
- "digg_count": 1931,
- "comment_count": 45,
- "share_count": 1968,
- "collect_count": 462,
- },
- "text_extra": [{"hashtag_name": "早上好"}],
- }
- ],
- "has_more": True,
- "next_cursor": "10",
- }
- },
- ),
- ]
- )
- results = client.search(_search_query())
- assert len(results) == 1
- result = results[0]
- assert result["content_discovery_id"] == "q_001_content_001"
- assert result["platform_content_id"] == "7615247738577423622"
- assert result["platform_author_id"] == "MS4wLjABAAAAdYc"
- assert result["tags"] == ["#早上好"]
- assert result["has_more"] is True
- assert result["next_cursor"] == "10"
- assert result["discovery_relation"] == "derived_from_pattern_demand"
- assert result["platform_auth_mode"] == "no_bearer"
- assert result["platform_raw_payload"][RAW_CONTENT_ID_KEY] == "7615247738577423622"
- assert client.http_client.requests[0]["json"][RAW_AUTHOR_ACCOUNT_KEY] == "771431222"
- # V3 清理: 画像调用链已砍,搜索一条内容只发 1 次 keyword 请求,不再追加画像请求。
- assert len(client.http_client.requests) == 1
- assert client.http_client.requests[0]["url"].endswith("/crawler/dou_yin/keyword")
- def test_douyin_keyword_search_returns_empty_list():
- client = _client([_response(200, {"data": {"data": [], "has_more": False}})])
- results = client.search(_search_query("无结果"))
- assert results == []
- def test_douyin_keyword_search_uses_explicit_page_cursor():
- client = _client([_response(200, {"data": {"data": [], "has_more": False}})])
- client.search({**_search_query("下一页"), "page_cursor": "20"})
- assert client.http_client.requests[0]["json"]["cursor"] == "20"
- def test_douyin_fetch_author_works_maps_fake_response():
- client = _client(
- [
- _response(
- 200,
- {
- "data": {
- "data": [
- {
- RAW_CONTENT_ID_KEY: "7615247738577423001",
- "desc": "作者作品",
- "author": {
- "nickname": "作者",
- RAW_AUTHOR_ID_KEY: "MS4wLjABAAAA001",
- },
- "statistics": {"digg_count": 100},
- }
- ],
- "has_more": False,
- }
- },
- ),
- ]
- )
- results = client.fetch_author_works(
- {
- "search_query_id": "author_001",
- "search_query": "作者作品",
- "platform_author_id": "MS4wLjABAAAA001",
- "discovery_start_source": "pattern_itemset",
- }
- )
- # M5A 受控变化: 作者作品改打 blogger 接口,payload 用 account_id 三字段合同。
- assert results[0]["search_query_id"] == "author_001"
- assert results[0]["previous_discovery_step"] == "author_works"
- assert client.http_client.requests[0]["json"][RAW_AUTHOR_ACCOUNT_KEY] == "MS4wLjABAAAA001"
- assert len(client.http_client.requests) == 1
- def test_douyin_keyword_search_http_error_is_sanitized():
- client = _client([_response(500, {"error": "server failed"})])
- with pytest.raises(RuntimeError, match="keyword_search failed: HTTP 500"):
- client.search(_search_query("接口失败"))
- def test_douyin_keyword_search_business_error_is_failed():
- client = _client([_response(200, {"code": 22001, "msg": "强制登录", "data": None})])
- with pytest.raises(RuntimeError, match="keyword_search failed: business_error"):
- client.search(_search_query("账号态失败"))
- def test_douyin_keyword_search_network_error_is_sanitized():
- client = _client([httpx.ConnectError("network failed")])
- with pytest.raises(RuntimeError, match="keyword_search failed: network_error"):
- client.search(_search_query("网络失败"))
- def test_douyin_keyword_search_bad_json_is_sanitized():
- client = CrawapiDouyinClient(
- base_url="http://crawapi.test",
- keyword_path="/crawler/dou_yin/keyword",
- http_client=FakeHttpClient(
- [
- httpx.Response(
- 200,
- content=b"not json",
- request=httpx.Request("POST", "http://crawapi.test/endpoint"),
- )
- ]
- ),
- )
- with pytest.raises(RuntimeError, match="keyword_search failed: bad_json"):
- client.search(_search_query("坏 JSON"))
- def test_douyin_keyword_search_can_limit_results_per_query():
- client = CrawapiDouyinClient(
- base_url="http://crawapi.test",
- keyword_path="/crawler/dou_yin/keyword",
- max_results_per_query=1,
- http_client=FakeHttpClient(
- [
- _response(
- 200,
- {
- "data": {
- "data": [
- {RAW_CONTENT_ID_KEY: "1", "author": {}, "statistics": {}},
- {RAW_CONTENT_ID_KEY: "2", "author": {}, "statistics": {}},
- ]
- }
- },
- ),
- ]
- ),
- )
- results = client.search(_search_query("限量"))
- assert [result["platform_content_id"] for result in results] == ["1"]
- assert len(client.http_client.requests) == 1
- def _author_query(author_id="MS4wLjABAAAA001", **extra):
- return {
- "search_query_id": "author_001",
- "search_query": "作者作品",
- "platform_author_id": author_id,
- "discovery_start_source": "pattern_itemset",
- **extra,
- }
- def _blogger_response(items=None, has_more=True, next_cursor="20"):
- return _response(
- 200,
- {
- "code": 0,
- "data": {"data": items or [], "has_more": has_more, "next_cursor": next_cursor},
- },
- )
- class FakeRateLimiter:
- def __init__(self):
- self.buckets = []
- def wait(self, bucket):
- self.buckets.append(bucket)
- def test_fetch_author_works_posts_to_blogger_path():
- client = _client([_blogger_response()])
- client.fetch_author_works(_author_query())
- assert client.http_client.requests[0]["url"].endswith("/crawler/dou_yin/blogger")
- def test_fetch_author_works_payload_uses_account_id_from_platform_author_id():
- client = _client([_blogger_response()])
- client.fetch_author_works(_author_query("MS4wLjABAAAA999"))
- payload = client.http_client.requests[0]["json"]
- assert payload == {
- RAW_AUTHOR_ACCOUNT_KEY: "MS4wLjABAAAA999",
- "sort_type": "最新",
- "cursor": "",
- }
- def test_fetch_author_works_uses_page_cursor():
- client = _client([_blogger_response()])
- client.fetch_author_works(_author_query(page_cursor="20"))
- assert client.http_client.requests[0]["json"]["cursor"] == "20"
- def test_fetch_author_works_normalizes_author_work_fields():
- client = _client(
- [
- _blogger_response(
- items=[
- {
- RAW_CONTENT_ID_KEY: "7615247738577423001",
- "desc": "作者作品",
- "author": {"nickname": "作者", RAW_AUTHOR_ID_KEY: "MS4wLjABAAAA001"},
- "statistics": {"digg_count": 100},
- "create_time": 1733000000,
- }
- ]
- ),
- ]
- )
- results = client.fetch_author_works(_author_query())
- assert results[0]["platform_content_id"] == "7615247738577423001"
- assert results[0]["platform_author_id"] == "MS4wLjABAAAA001"
- assert results[0]["statistics"]["digg_count"] == 100
- assert results[0]["create_time"] == 1733000000
- assert results[0]["previous_discovery_step"] == "author_works"
- assert results[0]["content_metadata_source"] == "douyin_blogger"
- assert len(client.http_client.requests) == 1
- def test_from_env_reads_blogger_path_and_sort_type(monkeypatch, tmp_path):
- monkeypatch.setenv("CONTENTFIND_API_CRAWAPI_BASE_URL", "http://crawapi.test")
- monkeypatch.setenv("CONTENTFIND_DOUYIN_KEYWORD_PATH", "/crawler/dou_yin/keyword")
- monkeypatch.setenv("CONTENTFIND_DOUYIN_BLOGGER_PATH", "/crawler/dou_yin/blogger")
- monkeypatch.setenv("CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE", "最热")
- client = CrawapiDouyinClient.from_env(env_path=tmp_path / "missing.env")
- assert client.blogger_path == "crawler/dou_yin/blogger"
- assert client.default_account_works_sort_type == "最热"
- assert isinstance(client.rate_limiter, RateLimiter)
- def test_rate_limiter_waits_between_keyword_calls():
- clock = {"now": 0.0}
- sleeps = []
- def fake_sleep(seconds):
- sleeps.append(seconds)
- clock["now"] += seconds
- limiter = RateLimiter(min_interval_seconds=12.0, now_fn=lambda: clock["now"], sleep_fn=fake_sleep)
- limiter.wait("douyin_search")
- limiter.wait("douyin_search")
- assert sleeps == [12.0]
- def test_search_chain_uses_shared_search_bucket():
- limiter = FakeRateLimiter()
- client = _client(
- [
- _response(200, {"code": 0, "data": {"data": [], "has_more": False}}),
- _response(200, {"code": 0, "data": {"data": [], "has_more": False}}),
- ],
- rate_limiter=limiter,
- )
- client.search(_search_query("关键词"))
- client.search({**_search_query("关键词"), "page_cursor": "10"})
- assert limiter.buckets == ["douyin_search", "douyin_search"]
- def test_blogger_uses_separate_bucket_from_search_chain():
- limiter = FakeRateLimiter()
- client = _client(
- [
- _response(200, {"code": 0, "data": {"data": [], "has_more": False}}),
- _blogger_response(),
- ],
- rate_limiter=limiter,
- )
- client.search(_search_query("关键词"))
- client.fetch_author_works(_author_query())
- assert limiter.buckets == ["douyin_search", "douyin_blogger"]
- def test_http_429_maps_to_platform_rate_limited():
- client = _client([_response(429, {"error": "too many"})])
- with pytest.raises(ContentAgentError) as exc_info:
- client.search(_search_query("被限流"))
- assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
- assert exc_info.value.detail["status_code"] == 429
- def test_business_rate_limit_code_maps_to_platform_rate_limited(monkeypatch):
- from content_agent.integrations import douyin
- monkeypatch.setattr(douyin, "RATE_LIMIT_BUSINESS_CODES", {"30005"})
- client = _client([_response(200, {"code": 30005, "msg": "ok", "data": None})])
- with pytest.raises(ContentAgentError) as exc_info:
- client.search(_search_query("业务限流"))
- assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
- assert exc_info.value.detail["business_code"] == "30005"
- def test_rate_limit_message_token_maps_to_platform_rate_limited():
- client = _client([_response(200, {"code": 1, "msg": "请求频繁,请稍后再试", "data": None})])
- with pytest.raises(ContentAgentError) as exc_info:
- client.search(_search_query("消息限流"))
- assert exc_info.value.error_code == ErrorCode.PLATFORM_RATE_LIMITED
- def test_force_login_without_rate_limit_code_is_not_rate_limited():
- client = _client([_response(200, {"code": 22001, "msg": "强制登录", "data": None})])
- with pytest.raises(RuntimeError, match="business_error"):
- client.search(_search_query("强制登录"))
- def test_bad_json_is_not_rate_limited():
- client = _client(
- [
- httpx.Response(
- 200, content=b"not json",
- request=httpx.Request("POST", "http://crawapi.test/endpoint"),
- )
- ]
- )
- with pytest.raises(RuntimeError, match="bad_json"):
- client.search(_search_query("坏响应"))
- def test_plain_500_is_not_rate_limited():
- client = _client([_response(500, {"error": "server failed"})])
- with pytest.raises(RuntimeError, match="HTTP 500"):
- client.search(_search_query("普通失败"))
|