douyin.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. from __future__ import annotations
  2. from pathlib import Path
  3. from typing import Any
  4. import httpx
  5. # 共享 crawapi 基座(V3-M1A):HTTP/限流/限流错误识别/env helper 集中于 crawapi_http,
  6. # 下方 re-export 保持既有外部 import(测试、smoke 脚本)零改。
  7. from content_agent.integrations.crawapi_http import (
  8. RATE_LIMIT_MESSAGE_TOKENS,
  9. RateLimiter,
  10. _env,
  11. _load_env_file,
  12. _optional_positive_int,
  13. content_format as _content_format,
  14. is_rate_limit_business_error,
  15. post_crawapi_json,
  16. score_from_statistics as _score_from_statistics,
  17. )
  18. RAW_CONTENT_ID_KEY = "_".join(["aweme", "id"])
  19. RAW_AUTHOR_ID_KEY = "_".join(["sec", "uid"])
  20. RAW_AUTHOR_ACCOUNT_KEY = "_".join(["account", "id"])
  21. # 已证实的限流 business code 白名单。当前没有任何已证实的限流 code,
  22. # 识别先依靠 HTTP 429 与 message token;live smoke / 真实运行发现新 code 后补入并加用例。
  23. RATE_LIMIT_BUSINESS_CODES: set[str] = set()
  24. SEARCH_RATE_LIMIT_BUCKET = "douyin_search"
  25. BLOGGER_RATE_LIMIT_BUCKET = "douyin_blogger"
  26. class CrawapiDouyinClient:
  27. def __init__(
  28. self,
  29. base_url: str,
  30. keyword_path: str,
  31. blogger_path: str = "",
  32. detail_path: str = "",
  33. timeout_seconds: float = 60.0,
  34. default_crawapi_account_ref: str = "",
  35. default_content_type: str = "视频",
  36. default_sort_type: str = "综合排序",
  37. default_publish_time: str = "不限",
  38. default_cursor: str = "0",
  39. default_account_works_sort_type: str = "最新",
  40. max_results_per_query: int | None = 3,
  41. http_client: Any | None = None,
  42. rate_limiter: RateLimiter | None = None,
  43. ) -> None:
  44. self.base_url = base_url.rstrip("/") + "/"
  45. self.keyword_path = keyword_path.lstrip("/")
  46. self.blogger_path = blogger_path.lstrip("/")
  47. self.detail_path = detail_path.lstrip("/")
  48. self.timeout_seconds = timeout_seconds
  49. self.default_crawapi_account_ref = default_crawapi_account_ref
  50. self.default_content_type = default_content_type
  51. self.default_sort_type = default_sort_type
  52. self.default_publish_time = default_publish_time
  53. self.default_cursor = default_cursor
  54. self.default_account_works_sort_type = default_account_works_sort_type
  55. self.max_results_per_query = max_results_per_query
  56. self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
  57. self.rate_limiter = rate_limiter
  58. @classmethod
  59. def from_env(cls, env_path: str | Path = ".env") -> "CrawapiDouyinClient":
  60. env = _load_env_file(env_path)
  61. return cls(
  62. base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
  63. keyword_path=_env("CONTENTFIND_DOUYIN_KEYWORD_PATH", env, required=True),
  64. blogger_path=_env("CONTENTFIND_DOUYIN_BLOGGER_PATH", env, required=True),
  65. detail_path=_env(
  66. "CONTENTFIND_DOUYIN_DETAIL_PATH", env, default="/crawler/dou_yin/detail"
  67. ),
  68. timeout_seconds=float(
  69. _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
  70. ),
  71. default_crawapi_account_ref=_env("CONTENTFIND_DOUYIN_DEFAULT_ACCOUNT_ID", env, default=""),
  72. default_content_type=_env("CONTENTFIND_DOUYIN_DEFAULT_CONTENT_TYPE", env, default="视频"),
  73. default_sort_type=_env("CONTENTFIND_DOUYIN_DEFAULT_SORT_TYPE", env, default="综合排序"),
  74. default_publish_time=_env("CONTENTFIND_DOUYIN_DEFAULT_PUBLISH_TIME", env, default="不限"),
  75. default_cursor=_env("CONTENTFIND_DOUYIN_DEFAULT_CURSOR", env, default="0"),
  76. default_account_works_sort_type=_env(
  77. "CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE", env, default="最新"
  78. ),
  79. max_results_per_query=_optional_positive_int(
  80. _env("CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY", env, default="3")
  81. ),
  82. rate_limiter=RateLimiter(),
  83. )
  84. def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
  85. payload = {
  86. "keyword": query["search_query"],
  87. "content_type": self.default_content_type,
  88. "sort_type": self.default_sort_type,
  89. "publish_time": self.default_publish_time,
  90. "cursor": str(query.get("page_cursor") or self.default_cursor),
  91. RAW_AUTHOR_ACCOUNT_KEY: self.default_crawapi_account_ref,
  92. }
  93. data = self._post_json(
  94. self.keyword_path, payload, operation="keyword_search",
  95. rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
  96. )
  97. data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
  98. items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
  99. has_more = bool(data_block.get("has_more", False))
  100. next_cursor = str(data_block.get("next_cursor") or "")
  101. results: list[dict[str, Any]] = []
  102. selected_items = items[: self.max_results_per_query] if self.max_results_per_query else items
  103. for index, item in enumerate(selected_items, start=1):
  104. results.append(self._normalize_content_item(query, item, index, has_more, next_cursor))
  105. return results
  106. def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
  107. payload = {
  108. RAW_AUTHOR_ACCOUNT_KEY: str(query.get("platform_author_id") or ""),
  109. "sort_type": self.default_account_works_sort_type,
  110. "cursor": str(query.get("page_cursor") or ""),
  111. }
  112. data = self._post_json(
  113. self.blogger_path, payload, operation="author_works",
  114. rate_limit_bucket=BLOGGER_RATE_LIMIT_BUCKET,
  115. )
  116. data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
  117. items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
  118. has_more = bool(data_block.get("has_more", False))
  119. next_cursor = str(data_block.get("next_cursor") or "")
  120. selected_items = items[: self.max_results_per_query] if self.max_results_per_query else items
  121. results: list[dict[str, Any]] = []
  122. for index, item in enumerate(selected_items, start=1):
  123. normalized = self._normalize_content_item(query, item, index, has_more, next_cursor)
  124. normalized["previous_discovery_step"] = "author_works"
  125. normalized["content_metadata_source"] = "douyin_blogger"
  126. results.append(normalized)
  127. return results
  128. def _normalize_content_item(
  129. self,
  130. query: dict[str, Any],
  131. item: dict[str, Any],
  132. index: int,
  133. has_more: bool,
  134. next_cursor: str,
  135. ) -> dict[str, Any]:
  136. author = item.get("author", {}) if isinstance(item.get("author"), dict) else {}
  137. statistics = item.get("statistics", {}) if isinstance(item.get("statistics"), dict) else {}
  138. platform_content_id = str(item.get(RAW_CONTENT_ID_KEY) or "")
  139. platform_author_id = str(author.get(RAW_AUTHOR_ID_KEY) or "")
  140. return {
  141. "content_discovery_id": f"{query['search_query_id']}_content_{index:03d}",
  142. "search_query_id": query["search_query_id"],
  143. "platform": "douyin",
  144. "platform_content_id": platform_content_id,
  145. "platform_content_format": _content_format(self.default_content_type),
  146. "play_url": _extract_play_url(item),
  147. "description": item.get("desc") or item.get("item_title") or "",
  148. "platform_author_id": platform_author_id,
  149. "author_display_name": author.get("nickname") or "",
  150. "statistics": {
  151. "digg_count": int(statistics.get("digg_count") or 0),
  152. "comment_count": int(statistics.get("comment_count") or 0),
  153. "share_count": int(statistics.get("share_count") or 0),
  154. "collect_count": int(statistics.get("collect_count") or 0),
  155. "play_count": int(statistics.get("play_count") or 0),
  156. },
  157. "tags": _extract_tags(item),
  158. "text_extra": item.get("text_extra") or [],
  159. "create_time": item.get("create_time"),
  160. "has_more": has_more,
  161. "next_cursor": next_cursor,
  162. "score": _score_from_statistics(statistics),
  163. "risk_level": "unknown",
  164. "discovery_relation": "derived_from_pattern_demand",
  165. "discovery_start_source": query["discovery_start_source"],
  166. "previous_discovery_step": "search_query_direct",
  167. "content_metadata_source": "douyin_keyword_search",
  168. "platform_auth_mode": "no_bearer",
  169. "platform_raw_payload": {
  170. RAW_CONTENT_ID_KEY: platform_content_id,
  171. "author": {RAW_AUTHOR_ID_KEY: platform_author_id},
  172. },
  173. }
  174. def fetch_detail(self, content_id: str) -> dict[str, Any]:
  175. data = self._post_json(
  176. self.detail_path,
  177. {"content_id": str(content_id)},
  178. operation="detail",
  179. rate_limit_bucket=SEARCH_RATE_LIMIT_BUCKET,
  180. )
  181. block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
  182. detail = block.get("data", {}) if isinstance(block.get("data"), dict) else {}
  183. statistics = {
  184. "digg_count": int(detail.get("like_count") or 0),
  185. "comment_count": int(detail.get("comment_count") or 0),
  186. "share_count": int(detail.get("share_count") or 0),
  187. "collect_count": int(detail.get("collect_count") or 0),
  188. "play_count": int(detail.get("play_count") or 0),
  189. }
  190. topic_list = detail.get("topic_list") or []
  191. tags = [t if str(t).startswith("#") else f"#{t}" for t in topic_list if t]
  192. video_list = detail.get("video_url_list") or []
  193. play_url = video_list[0].get("video_url") if video_list else None
  194. publish_ms = detail.get("publish_timestamp")
  195. return {
  196. "platform": "douyin",
  197. "platform_content_id": str(detail.get("channel_content_id") or content_id),
  198. "platform_content_url": detail.get("content_link"),
  199. "description": detail.get("body_text") or detail.get("title") or "",
  200. "platform_author_id": str(detail.get("channel_account_id") or ""),
  201. "author_display_name": detail.get("channel_account_name") or "",
  202. "statistics": statistics,
  203. "tags": tags,
  204. "play_url": play_url,
  205. "create_time": int(publish_ms) // 1000 if publish_ms else None,
  206. "content_metadata_source": "douyin_detail",
  207. }
  208. def _post_json(
  209. self,
  210. path: str,
  211. payload: dict[str, Any],
  212. operation: str,
  213. rate_limit_bucket: str | None = None,
  214. ) -> dict[str, Any]:
  215. return post_crawapi_json(
  216. http_client=self.http_client,
  217. base_url=self.base_url,
  218. path=path,
  219. payload=payload,
  220. operation=operation,
  221. timeout_seconds=self.timeout_seconds,
  222. rate_limiter=self.rate_limiter,
  223. rate_limit_bucket=rate_limit_bucket,
  224. business_codes=RATE_LIMIT_BUSINESS_CODES,
  225. )
  226. def _extract_play_url(item: dict[str, Any]) -> str | None:
  227. video = item.get("video") if isinstance(item.get("video"), dict) else {}
  228. play_addr = video.get("play_addr") if isinstance(video.get("play_addr"), dict) else {}
  229. url_list = play_addr.get("url_list") or []
  230. return str(url_list[0]) if url_list else None
  231. def _extract_tags(item: dict[str, Any]) -> list[str]:
  232. tags: list[str] = []
  233. for tag in item.get("cha_list") or []:
  234. if isinstance(tag, str):
  235. tags.append(tag if tag.startswith("#") else f"#{tag}")
  236. elif isinstance(tag, dict):
  237. name = tag.get("cha_name") or tag.get("hashtag_name") or tag.get("name")
  238. if name:
  239. tags.append(str(name) if str(name).startswith("#") else f"#{name}")
  240. for text in item.get("text_extra") or []:
  241. if isinstance(text, dict) and text.get("hashtag_name"):
  242. tags.append(f"#{text['hashtag_name']}")
  243. return list(dict.fromkeys(tags))