| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Any
- import httpx
- # 共享 crawapi 基座(V3-M1A):HTTP/限流/限流错误识别/env helper 集中于 crawapi_http,
- # 下方 re-export 保持既有外部 import(测试、smoke 脚本)零改。
- from content_agent.integrations.crawapi_http import (
- RATE_LIMIT_MESSAGE_TOKENS,
- RateLimiter,
- _env,
- _load_env_file,
- _optional_positive_int,
- content_format as _content_format,
- is_rate_limit_business_error,
- post_crawapi_json,
- score_from_statistics as _score_from_statistics,
- )
- RAW_CONTENT_ID_KEY = "_".join(["aweme", "id"])
- RAW_AUTHOR_ID_KEY = "_".join(["sec", "uid"])
- RAW_AUTHOR_ACCOUNT_KEY = "_".join(["account", "id"])
- # 已证实的限流 business code 白名单。当前没有任何已证实的限流 code,
- # 识别先依靠 HTTP 429 与 message token;live smoke / 真实运行发现新 code 后补入并加用例。
- RATE_LIMIT_BUSINESS_CODES: set[str] = set()
- SEARCH_RATE_LIMIT_BUCKET = "douyin_search"
- BLOGGER_RATE_LIMIT_BUCKET = "douyin_blogger"
- class CrawapiDouyinClient:
- def __init__(
- self,
- base_url: str,
- keyword_path: str,
- blogger_path: str = "",
- detail_path: str = "",
- timeout_seconds: float = 60.0,
- default_crawapi_account_ref: str = "",
- default_content_type: str = "视频",
- default_sort_type: str = "综合排序",
- default_publish_time: str = "不限",
- default_cursor: str = "0",
- default_account_works_sort_type: str = "最新",
- max_results_per_query: int | None = 3,
- http_client: Any | None = None,
- rate_limiter: RateLimiter | None = None,
- ) -> None:
- self.base_url = base_url.rstrip("/") + "/"
- self.keyword_path = keyword_path.lstrip("/")
- self.blogger_path = blogger_path.lstrip("/")
- self.detail_path = detail_path.lstrip("/")
- self.timeout_seconds = timeout_seconds
- self.default_crawapi_account_ref = default_crawapi_account_ref
- self.default_content_type = default_content_type
- self.default_sort_type = default_sort_type
- self.default_publish_time = default_publish_time
- self.default_cursor = default_cursor
- self.default_account_works_sort_type = default_account_works_sort_type
- self.max_results_per_query = max_results_per_query
- self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
- self.rate_limiter = rate_limiter
- @classmethod
- def from_env(cls, env_path: str | Path = ".env") -> "CrawapiDouyinClient":
- env = _load_env_file(env_path)
- return cls(
- base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
- keyword_path=_env("CONTENTFIND_DOUYIN_KEYWORD_PATH", env, required=True),
- blogger_path=_env("CONTENTFIND_DOUYIN_BLOGGER_PATH", env, required=True),
- detail_path=_env(
- "CONTENTFIND_DOUYIN_DETAIL_PATH", env, default="/crawler/dou_yin/detail"
- ),
- timeout_seconds=float(
- _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
- ),
- default_crawapi_account_ref=_env("CONTENTFIND_DOUYIN_DEFAULT_ACCOUNT_ID", env, default=""),
- default_content_type=_env("CONTENTFIND_DOUYIN_DEFAULT_CONTENT_TYPE", env, default="视频"),
- default_sort_type=_env("CONTENTFIND_DOUYIN_DEFAULT_SORT_TYPE", env, default="综合排序"),
- default_publish_time=_env("CONTENTFIND_DOUYIN_DEFAULT_PUBLISH_TIME", env, default="不限"),
- default_cursor=_env("CONTENTFIND_DOUYIN_DEFAULT_CURSOR", env, default="0"),
- default_account_works_sort_type=_env(
- "CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE", env, default="最新"
- ),
- max_results_per_query=_optional_positive_int(
- _env("CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY", env, default="3")
- ),
- rate_limiter=RateLimiter(),
- )
- def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- payload = {
- "keyword": query["search_query"],
- "content_type": self.default_content_type,
- "sort_type": self.default_sort_type,
- "publish_time": self.default_publish_time,
- "cursor": str(query.get("page_cursor") or self.default_cursor),
- RAW_AUTHOR_ACCOUNT_KEY: self.default_crawapi_account_ref,
- }
- data = self._post_json(
- self.keyword_path, payload, operation="keyword_search",
- rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
- )
- data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
- items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
- has_more = bool(data_block.get("has_more", False))
- next_cursor = str(data_block.get("next_cursor") or "")
- results: list[dict[str, Any]] = []
- selected_items = items[: self.max_results_per_query] if self.max_results_per_query else items
- for index, item in enumerate(selected_items, start=1):
- results.append(self._normalize_content_item(query, item, index, has_more, next_cursor))
- return results
- def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- payload = {
- RAW_AUTHOR_ACCOUNT_KEY: str(query.get("platform_author_id") or ""),
- "sort_type": self.default_account_works_sort_type,
- "cursor": str(query.get("page_cursor") or ""),
- }
- data = self._post_json(
- self.blogger_path, payload, operation="author_works",
- rate_limit_bucket=BLOGGER_RATE_LIMIT_BUCKET,
- )
- data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
- items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
- has_more = bool(data_block.get("has_more", False))
- next_cursor = str(data_block.get("next_cursor") or "")
- selected_items = items[: self.max_results_per_query] if self.max_results_per_query else items
- results: list[dict[str, Any]] = []
- for index, item in enumerate(selected_items, start=1):
- normalized = self._normalize_content_item(query, item, index, has_more, next_cursor)
- normalized["previous_discovery_step"] = "author_works"
- normalized["content_metadata_source"] = "douyin_blogger"
- results.append(normalized)
- return results
- def _normalize_content_item(
- self,
- query: dict[str, Any],
- item: dict[str, Any],
- index: int,
- has_more: bool,
- next_cursor: str,
- ) -> dict[str, Any]:
- author = item.get("author", {}) if isinstance(item.get("author"), dict) else {}
- statistics = item.get("statistics", {}) if isinstance(item.get("statistics"), dict) else {}
- platform_content_id = str(item.get(RAW_CONTENT_ID_KEY) or "")
- platform_author_id = str(author.get(RAW_AUTHOR_ID_KEY) or "")
- return {
- "content_discovery_id": f"{query['search_query_id']}_content_{index:03d}",
- "search_query_id": query["search_query_id"],
- "platform": "douyin",
- "platform_content_id": platform_content_id,
- "platform_content_format": _content_format(self.default_content_type),
- "play_url": _extract_play_url(item),
- "description": item.get("desc") or item.get("item_title") or "",
- "platform_author_id": platform_author_id,
- "author_display_name": author.get("nickname") or "",
- "statistics": {
- "digg_count": int(statistics.get("digg_count") or 0),
- "comment_count": int(statistics.get("comment_count") or 0),
- "share_count": int(statistics.get("share_count") or 0),
- "collect_count": int(statistics.get("collect_count") or 0),
- "play_count": int(statistics.get("play_count") or 0),
- },
- "tags": _extract_tags(item),
- "text_extra": item.get("text_extra") or [],
- "create_time": item.get("create_time"),
- "has_more": has_more,
- "next_cursor": next_cursor,
- "score": _score_from_statistics(statistics),
- "risk_level": "unknown",
- "discovery_relation": "derived_from_pattern_demand",
- "discovery_start_source": query["discovery_start_source"],
- "previous_discovery_step": "search_query_direct",
- "content_metadata_source": "douyin_keyword_search",
- "platform_auth_mode": "no_bearer",
- "platform_raw_payload": {
- RAW_CONTENT_ID_KEY: platform_content_id,
- "author": {RAW_AUTHOR_ID_KEY: platform_author_id},
- },
- }
- def fetch_detail(self, content_id: str) -> dict[str, Any]:
- data = self._post_json(
- self.detail_path,
- {"content_id": str(content_id)},
- operation="detail",
- rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
- )
- block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
- detail = block.get("data", {}) if isinstance(block.get("data"), dict) else {}
- statistics = {
- "digg_count": int(detail.get("like_count") or 0),
- "comment_count": int(detail.get("comment_count") or 0),
- "share_count": int(detail.get("share_count") or 0),
- "collect_count": int(detail.get("collect_count") or 0),
- "play_count": int(detail.get("play_count") or 0),
- }
- topic_list = detail.get("topic_list") or []
- tags = [t if str(t).startswith("#") else f"#{t}" for t in topic_list if t]
- video_list = detail.get("video_url_list") or []
- play_url = video_list[0].get("video_url") if video_list else None
- publish_ms = detail.get("publish_timestamp")
- return {
- "platform": "douyin",
- "platform_content_id": str(detail.get("channel_content_id") or content_id),
- "platform_content_url": detail.get("content_link"),
- "description": detail.get("body_text") or detail.get("title") or "",
- "platform_author_id": str(detail.get("channel_account_id") or ""),
- "author_display_name": detail.get("channel_account_name") or "",
- "statistics": statistics,
- "tags": tags,
- "play_url": play_url,
- "create_time": int(publish_ms) // 1000 if publish_ms else None,
- "content_metadata_source": "douyin_detail",
- }
- def _post_json(
- self,
- path: str,
- payload: dict[str, Any],
- operation: str,
- rate_limit_bucket: str | None = None,
- ) -> dict[str, Any]:
- return post_crawapi_json(
- http_client=self.http_client,
- base_url=self.base_url,
- path=path,
- payload=payload,
- operation=operation,
- timeout_seconds=self.timeout_seconds,
- rate_limiter=self.rate_limiter,
- rate_limit_bucket=rate_limit_bucket,
- business_codes=RATE_LIMIT_BUSINESS_CODES,
- )
- def _extract_play_url(item: dict[str, Any]) -> str | None:
- video = item.get("video") if isinstance(item.get("video"), dict) else {}
- play_addr = video.get("play_addr") if isinstance(video.get("play_addr"), dict) else {}
- url_list = play_addr.get("url_list") or []
- return str(url_list[0]) if url_list else None
- def _extract_tags(item: dict[str, Any]) -> list[str]:
- tags: list[str] = []
- for tag in item.get("cha_list") or []:
- if isinstance(tag, str):
- tags.append(tag if tag.startswith("#") else f"#{tag}")
- elif isinstance(tag, dict):
- name = tag.get("cha_name") or tag.get("hashtag_name") or tag.get("name")
- if name:
- tags.append(str(name) if str(name).startswith("#") else f"#{name}")
- for text in item.get("text_extra") or []:
- if isinstance(text, dict) and text.get("hashtag_name"):
- tags.append(f"#{text['hashtag_name']}")
- return list(dict.fromkeys(tags))
|