|
|
@@ -1,6 +1,5 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import re
|
|
|
from pathlib import Path
|
|
|
from typing import Any
|
|
|
|
|
|
@@ -37,7 +36,6 @@ class CrawapiDouyinClient:
|
|
|
self,
|
|
|
base_url: str,
|
|
|
keyword_path: str,
|
|
|
- content_portrait_path: str,
|
|
|
blogger_path: str = "",
|
|
|
detail_path: str = "",
|
|
|
timeout_seconds: float = 60.0,
|
|
|
@@ -53,7 +51,6 @@ class CrawapiDouyinClient:
|
|
|
) -> None:
|
|
|
self.base_url = base_url.rstrip("/") + "/"
|
|
|
self.keyword_path = keyword_path.lstrip("/")
|
|
|
- self.content_portrait_path = content_portrait_path.lstrip("/")
|
|
|
self.blogger_path = blogger_path.lstrip("/")
|
|
|
self.detail_path = detail_path.lstrip("/")
|
|
|
self.timeout_seconds = timeout_seconds
|
|
|
@@ -73,9 +70,6 @@ class CrawapiDouyinClient:
|
|
|
return cls(
|
|
|
base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
|
|
|
keyword_path=_env("CONTENTFIND_DOUYIN_KEYWORD_PATH", env, required=True),
|
|
|
- content_portrait_path=_env(
|
|
|
- "CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH", env, required=True
|
|
|
- ),
|
|
|
blogger_path=_env("CONTENTFIND_DOUYIN_BLOGGER_PATH", env, required=True),
|
|
|
detail_path=_env(
|
|
|
"CONTENTFIND_DOUYIN_DETAIL_PATH", env, default="/crawler/dou_yin/detail"
|
|
|
@@ -118,10 +112,7 @@ class CrawapiDouyinClient:
|
|
|
results: list[dict[str, Any]] = []
|
|
|
selected_items = items[: self.max_results_per_query] if self.max_results_per_query else items
|
|
|
for index, item in enumerate(selected_items, start=1):
|
|
|
- normalized = self._normalize_content_item(query, item, index, has_more, next_cursor)
|
|
|
- portrait = self._fetch_content_portrait(normalized["platform_content_id"])
|
|
|
- normalized.update(portrait)
|
|
|
- results.append(normalized)
|
|
|
+ results.append(self._normalize_content_item(query, item, index, has_more, next_cursor))
|
|
|
return results
|
|
|
|
|
|
def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
|
|
|
@@ -145,8 +136,6 @@ class CrawapiDouyinClient:
|
|
|
normalized = self._normalize_content_item(query, item, index, has_more, next_cursor)
|
|
|
normalized["previous_discovery_step"] = "author_works"
|
|
|
normalized["content_metadata_source"] = "douyin_blogger"
|
|
|
- portrait = self._fetch_content_portrait(normalized["platform_content_id"])
|
|
|
- normalized.update(portrait)
|
|
|
results.append(normalized)
|
|
|
return results
|
|
|
|
|
|
@@ -186,8 +175,6 @@ class CrawapiDouyinClient:
|
|
|
"next_cursor": next_cursor,
|
|
|
"score": _score_from_statistics(statistics),
|
|
|
"risk_level": "unknown",
|
|
|
- "pattern_recall": "pattern_recall_pending",
|
|
|
- "category_or_element_binding": "pattern_recall_pending",
|
|
|
"discovery_relation": "derived_from_pattern_demand",
|
|
|
"discovery_start_source": query["discovery_start_source"],
|
|
|
"previous_discovery_step": "search_query_direct",
|
|
|
@@ -199,47 +186,6 @@ class CrawapiDouyinClient:
|
|
|
},
|
|
|
}
|
|
|
|
|
|
- def _fetch_content_portrait(self, platform_content_id: str) -> dict[str, Any]:
|
|
|
- data = None
|
|
|
- for _ in range(2):
|
|
|
- try:
|
|
|
- data = self._post_json(
|
|
|
- self.content_portrait_path,
|
|
|
- {
|
|
|
- "content_id": platform_content_id,
|
|
|
- "need_age": True,
|
|
|
- "need_gender": True,
|
|
|
- "need_province": True,
|
|
|
- "need_city": False,
|
|
|
- "need_city_level": False,
|
|
|
- "need_phone_brand": False,
|
|
|
- "need_phone_price": False,
|
|
|
- },
|
|
|
- operation="content_portrait",
|
|
|
- )
|
|
|
- break
|
|
|
- except RuntimeError:
|
|
|
- continue
|
|
|
- if data is None:
|
|
|
- return {"portrait_available": False, "age_50_plus_level": "missing"}
|
|
|
-
|
|
|
- portrait = _extract_portrait_dimensions(data)
|
|
|
- age_distribution = _normalize_age_distribution(portrait.get("年龄"))
|
|
|
- if not age_distribution:
|
|
|
- return {"portrait_available": False, "age_50_plus_level": "missing"}
|
|
|
-
|
|
|
- age_50_ratio = sum(row["percentage"] for row in age_distribution if row["is_50_plus"])
|
|
|
- age_50_tgi = max(
|
|
|
- [row["preference"] for row in age_distribution if row["is_50_plus"]] or [0.0]
|
|
|
- )
|
|
|
- return {
|
|
|
- "portrait_available": True,
|
|
|
- "age_50_plus_level": _age_level(age_50_ratio, age_50_tgi),
|
|
|
- "age_distribution": age_distribution,
|
|
|
- "age_50_plus_ratio": age_50_ratio,
|
|
|
- "age_50_plus_tgi": age_50_tgi,
|
|
|
- }
|
|
|
-
|
|
|
def fetch_detail(self, content_id: str) -> dict[str, Any]:
|
|
|
data = self._post_json(
|
|
|
self.detail_path,
|
|
|
@@ -315,78 +261,3 @@ def _extract_tags(item: dict[str, Any]) -> list[str]:
|
|
|
if isinstance(text, dict) and text.get("hashtag_name"):
|
|
|
tags.append(f"#{text['hashtag_name']}")
|
|
|
return list(dict.fromkeys(tags))
|
|
|
-
|
|
|
-
|
|
|
-def _normalize_age_distribution(age_data: Any) -> list[dict[str, Any]]:
|
|
|
- rows: list[dict[str, Any]] = []
|
|
|
- items = age_data.items() if isinstance(age_data, dict) else []
|
|
|
- if isinstance(age_data, list):
|
|
|
- items = [(row.get("name"), row) for row in age_data if isinstance(row, dict)]
|
|
|
- for name, value in items:
|
|
|
- metrics = value if isinstance(value, dict) else {}
|
|
|
- label = str(name or metrics.get("name") or "")
|
|
|
- if not label:
|
|
|
- continue
|
|
|
- rows.append(
|
|
|
- {
|
|
|
- "name": label,
|
|
|
- "percentage": _to_float(metrics.get("percentage")),
|
|
|
- "preference": _to_float(metrics.get("preference")),
|
|
|
- "is_50_plus": _is_50_plus_label(label),
|
|
|
- }
|
|
|
- )
|
|
|
- return rows
|
|
|
-
|
|
|
-
|
|
|
-def _extract_portrait_dimensions(data: dict[str, Any]) -> dict[str, Any]:
|
|
|
- data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
|
|
|
- content_blocks = [
|
|
|
- data_block.get("data"),
|
|
|
- data_block.get("portrait"),
|
|
|
- data_block,
|
|
|
- data,
|
|
|
- ]
|
|
|
- for content_block in content_blocks:
|
|
|
- if not isinstance(content_block, dict):
|
|
|
- continue
|
|
|
- dimensions = content_block.get("dimensions")
|
|
|
- if isinstance(dimensions, dict):
|
|
|
- return dimensions
|
|
|
- portrait = content_block.get("portrait")
|
|
|
- if isinstance(portrait, dict) and isinstance(portrait.get("dimensions"), dict):
|
|
|
- return portrait["dimensions"]
|
|
|
- if "年龄" in content_block:
|
|
|
- return content_block
|
|
|
- return {}
|
|
|
-
|
|
|
-
|
|
|
-def _to_float(value: Any) -> float:
|
|
|
- if value is None:
|
|
|
- return 0.0
|
|
|
- if isinstance(value, (int, float)):
|
|
|
- return float(value)
|
|
|
- text = str(value).strip().replace("%", "")
|
|
|
- try:
|
|
|
- parsed = float(text)
|
|
|
- except ValueError:
|
|
|
- return 0.0
|
|
|
- return parsed / 100 if "%" in str(value) else parsed
|
|
|
-
|
|
|
-
|
|
|
-def _is_50_plus_label(label: str) -> bool:
|
|
|
- if "50+" in label or "50以上" in label or "50-" in label or "老年" in label:
|
|
|
- return True
|
|
|
- numbers = [int(value) for value in re.findall(r"\d+", label)]
|
|
|
- if not numbers:
|
|
|
- return False
|
|
|
- if "-" in label and numbers[0] < 50:
|
|
|
- return False
|
|
|
- return min(numbers) >= 50
|
|
|
-
|
|
|
-
|
|
|
-def _age_level(ratio: float, tgi: float) -> str:
|
|
|
- if ratio >= 0.25 or tgi >= 130:
|
|
|
- return "strong"
|
|
|
- if ratio >= 0.1 or tgi >= 100:
|
|
|
- return "medium"
|
|
|
- return "weak"
|