| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- from __future__ import annotations
- import os
- from pathlib import Path
- from typing import Any
- from urllib.parse import urljoin
- import httpx
- SENSITIVE_KEYS = {
- "password",
- "token",
- "access_token",
- "refresh_token",
- "api_key",
- "apikey",
- "secret",
- "dsn",
- "authorization",
- "cookie",
- "session",
- "credential",
- }
- class AigcDecodeClient:
- def __init__(
- self,
- *,
- base_url: str,
- token: str,
- submit_path: str = "/aigc/api/task/decode",
- result_path: str = "/aigc/api/task/decode/result",
- config_id: int = 58,
- timeout_seconds: float = 60.0,
- http_client: Any | None = None,
- ) -> None:
- self.base_url = base_url.rstrip("/") + "/"
- self.token = token
- self.submit_path = submit_path.lstrip("/")
- self.result_path = result_path.lstrip("/")
- self.config_id = config_id
- self.timeout_seconds = timeout_seconds
- self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
- @classmethod
- def from_env(cls, env_path: str | Path = ".env") -> "AigcDecodeClient":
- env = _merged_env(env_path)
- return cls(
- base_url=_env("CONTENTFIND_API_AIGC_BASE_URL", env, required=True),
- token=_env("CONTENTFIND_API_AIGC_TOKEN", env, default=env.get("AIGC_TOKEN"), required=True),
- submit_path=_env(
- "CONTENTFIND_AIGC_DECODE_SUBMIT_PATH",
- env,
- default="/aigc/api/task/decode",
- ),
- result_path=_env(
- "CONTENTFIND_AIGC_DECODE_RESULT_PATH",
- env,
- default="/aigc/api/task/decode/result",
- ),
- config_id=int(_env("CONTENTFIND_AIGC_DECODE_CONFIG_ID", env, default="58")),
- timeout_seconds=float(_env("CONTENTFIND_API_AIGC_TIMEOUT_SECONDS", env, default="60")),
- )
- def submit_decode(
- self,
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- payload = {
- "params": {
- "configId": self.config_id,
- "skipCompleted": False,
- "posts": [_post_payload(content, media, source_context)],
- }
- }
- response = self._post_json(self.submit_path, payload)
- return {
- "request": redact_sensitive_payload(payload),
- "response": redact_sensitive_payload(response),
- "decode_task_id": _extract_decode_task_id(response),
- "raw_response": response,
- }
- def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
- payload = {"params": {"configId": self.config_id, "channelContentIds": [decode_task_id]}}
- response = self._post_json(self.result_path, payload)
- return {
- "request": redact_sensitive_payload(payload),
- "response": redact_sensitive_payload(response),
- "raw_response": response,
- }
- def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
- url = urljoin(self.base_url, path)
- try:
- response = self.http_client.post(
- url,
- json=payload,
- headers={
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.token}",
- },
- timeout=self.timeout_seconds,
- )
- response.raise_for_status()
- data = response.json()
- except httpx.HTTPStatusError as exc:
- status_code = exc.response.status_code if exc.response is not None else "unknown"
- raise RuntimeError(f"aigc decode failed: HTTP {status_code}") from exc
- except httpx.HTTPError as exc:
- raise RuntimeError("aigc decode failed: network_error") from exc
- except ValueError as exc:
- raise RuntimeError("aigc decode failed: bad_json") from exc
- if not isinstance(data, dict):
- raise RuntimeError("aigc decode failed: bad_response")
- return data
- def redact_sensitive_payload(value: Any) -> Any:
- if isinstance(value, dict):
- result: dict[str, Any] = {}
- for key, child in value.items():
- if str(key).lower() in SENSITIVE_KEYS:
- result[f"{key}_redacted"] = "<redacted>"
- else:
- result[key] = redact_sensitive_payload(child)
- return result
- if isinstance(value, list):
- return [redact_sensitive_payload(item) for item in value]
- return value
- def _post_payload(
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
- description = content.get("description") or ""
- return {
- "channelContentId": content.get("platform_content_id"),
- "title": description,
- "bodyText": description,
- "images": [],
- "video": media.get("play_url"),
- "contentModal": 4,
- "channel": 2,
- "mergeLeve1": "",
- "mergeLeve2": source_context.get("merge_leve2") or source_context.get("name") or "",
- "metadata": {
- "tags": content.get("tags", []),
- "platform": content.get("platform", "douyin"),
- "source_post_id": evidence_pack.get("source_post_id"),
- "pattern_execution_id": evidence_pack.get("pattern_execution_id"),
- },
- }
- def _extract_decode_task_id(response: dict[str, Any]) -> str | None:
- candidates = [
- response.get("taskId"),
- response.get("task_id"),
- response.get("data", {}).get("taskId") if isinstance(response.get("data"), dict) else None,
- response.get("data", {}).get("task_id") if isinstance(response.get("data"), dict) else None,
- ]
- data = response.get("data")
- if isinstance(data, list):
- for item in data:
- if not isinstance(item, dict):
- continue
- candidates.extend(
- [
- item.get("taskId"),
- item.get("task_id"),
- item.get("channelContentId"),
- item.get("channel_content_id"),
- ]
- )
- for candidate in candidates:
- if candidate:
- return str(candidate)
- return None
- def _merged_env(env_path: str | Path) -> dict[str, str]:
- env = _load_env_file(env_path)
- env.update({key: value for key, value in os.environ.items() if value})
- return env
- def _load_env_file(env_path: str | Path) -> dict[str, str]:
- path = Path(env_path)
- if not path.exists():
- return {}
- env: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- env[key.strip()] = value.strip().strip('"').strip("'")
- return env
- def _env(
- key: str,
- file_env: dict[str, str],
- default: str | None = None,
- required: bool = False,
- ) -> str:
- value = file_env.get(key) or default
- if required and not value:
- raise RuntimeError(f"missing required env: {key}")
- return value or ""
|