decode_api.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. from __future__ import annotations
  2. import os
  3. from pathlib import Path
  4. from typing import Any
  5. from urllib.parse import urljoin
  6. import httpx
  7. SENSITIVE_KEYS = {
  8. "password",
  9. "token",
  10. "access_token",
  11. "refresh_token",
  12. "api_key",
  13. "apikey",
  14. "secret",
  15. "dsn",
  16. "authorization",
  17. "cookie",
  18. "session",
  19. "credential",
  20. }
  21. class AigcDecodeClient:
  22. def __init__(
  23. self,
  24. *,
  25. base_url: str,
  26. token: str,
  27. submit_path: str = "/aigc/api/task/decode",
  28. result_path: str = "/aigc/api/task/decode/result",
  29. config_id: int = 58,
  30. timeout_seconds: float = 60.0,
  31. http_client: Any | None = None,
  32. ) -> None:
  33. self.base_url = base_url.rstrip("/") + "/"
  34. self.token = token
  35. self.submit_path = submit_path.lstrip("/")
  36. self.result_path = result_path.lstrip("/")
  37. self.config_id = config_id
  38. self.timeout_seconds = timeout_seconds
  39. self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
  40. @classmethod
  41. def from_env(cls, env_path: str | Path = ".env") -> "AigcDecodeClient":
  42. env = _merged_env(env_path)
  43. return cls(
  44. base_url=_env("CONTENTFIND_API_AIGC_BASE_URL", env, required=True),
  45. token=_env("CONTENTFIND_API_AIGC_TOKEN", env, default=env.get("AIGC_TOKEN"), required=True),
  46. submit_path=_env(
  47. "CONTENTFIND_AIGC_DECODE_SUBMIT_PATH",
  48. env,
  49. default="/aigc/api/task/decode",
  50. ),
  51. result_path=_env(
  52. "CONTENTFIND_AIGC_DECODE_RESULT_PATH",
  53. env,
  54. default="/aigc/api/task/decode/result",
  55. ),
  56. config_id=int(_env("CONTENTFIND_AIGC_DECODE_CONFIG_ID", env, default="58")),
  57. timeout_seconds=float(_env("CONTENTFIND_API_AIGC_TIMEOUT_SECONDS", env, default="60")),
  58. )
  59. def submit_decode(
  60. self,
  61. content: dict[str, Any],
  62. media: dict[str, Any],
  63. source_context: dict[str, Any],
  64. ) -> dict[str, Any]:
  65. payload = {
  66. "params": {
  67. "configId": self.config_id,
  68. "skipCompleted": False,
  69. "posts": [_post_payload(content, media, source_context)],
  70. }
  71. }
  72. response = self._post_json(self.submit_path, payload)
  73. return {
  74. "request": redact_sensitive_payload(payload),
  75. "response": redact_sensitive_payload(response),
  76. "decode_task_id": _extract_decode_task_id(response),
  77. "raw_response": response,
  78. }
  79. def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
  80. payload = {"params": {"configId": self.config_id, "channelContentIds": [decode_task_id]}}
  81. response = self._post_json(self.result_path, payload)
  82. return {
  83. "request": redact_sensitive_payload(payload),
  84. "response": redact_sensitive_payload(response),
  85. "raw_response": response,
  86. }
  87. def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
  88. url = urljoin(self.base_url, path)
  89. try:
  90. response = self.http_client.post(
  91. url,
  92. json=payload,
  93. headers={
  94. "Content-Type": "application/json",
  95. "Authorization": f"Bearer {self.token}",
  96. },
  97. timeout=self.timeout_seconds,
  98. )
  99. response.raise_for_status()
  100. data = response.json()
  101. except httpx.HTTPStatusError as exc:
  102. status_code = exc.response.status_code if exc.response is not None else "unknown"
  103. raise RuntimeError(f"aigc decode failed: HTTP {status_code}") from exc
  104. except httpx.HTTPError as exc:
  105. raise RuntimeError("aigc decode failed: network_error") from exc
  106. except ValueError as exc:
  107. raise RuntimeError("aigc decode failed: bad_json") from exc
  108. if not isinstance(data, dict):
  109. raise RuntimeError("aigc decode failed: bad_response")
  110. return data
  111. def redact_sensitive_payload(value: Any) -> Any:
  112. if isinstance(value, dict):
  113. result: dict[str, Any] = {}
  114. for key, child in value.items():
  115. if str(key).lower() in SENSITIVE_KEYS:
  116. result[f"{key}_redacted"] = "<redacted>"
  117. else:
  118. result[key] = redact_sensitive_payload(child)
  119. return result
  120. if isinstance(value, list):
  121. return [redact_sensitive_payload(item) for item in value]
  122. return value
  123. def _post_payload(
  124. content: dict[str, Any],
  125. media: dict[str, Any],
  126. source_context: dict[str, Any],
  127. ) -> dict[str, Any]:
  128. evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
  129. description = content.get("description") or ""
  130. return {
  131. "channelContentId": content.get("platform_content_id"),
  132. "title": description,
  133. "bodyText": description,
  134. "images": [],
  135. "video": media.get("play_url"),
  136. "contentModal": 4,
  137. "channel": 2,
  138. "mergeLeve1": "",
  139. "mergeLeve2": source_context.get("merge_leve2") or source_context.get("name") or "",
  140. "metadata": {
  141. "tags": content.get("tags", []),
  142. "platform": content.get("platform", "douyin"),
  143. "source_post_id": evidence_pack.get("source_post_id"),
  144. "pattern_execution_id": evidence_pack.get("pattern_execution_id"),
  145. },
  146. }
  147. def _extract_decode_task_id(response: dict[str, Any]) -> str | None:
  148. candidates = [
  149. response.get("taskId"),
  150. response.get("task_id"),
  151. response.get("data", {}).get("taskId") if isinstance(response.get("data"), dict) else None,
  152. response.get("data", {}).get("task_id") if isinstance(response.get("data"), dict) else None,
  153. ]
  154. data = response.get("data")
  155. if isinstance(data, list):
  156. for item in data:
  157. if not isinstance(item, dict):
  158. continue
  159. candidates.extend(
  160. [
  161. item.get("taskId"),
  162. item.get("task_id"),
  163. item.get("channelContentId"),
  164. item.get("channel_content_id"),
  165. ]
  166. )
  167. for candidate in candidates:
  168. if candidate:
  169. return str(candidate)
  170. return None
  171. def _merged_env(env_path: str | Path) -> dict[str, str]:
  172. env = _load_env_file(env_path)
  173. env.update({key: value for key, value in os.environ.items() if value})
  174. return env
  175. def _load_env_file(env_path: str | Path) -> dict[str, str]:
  176. path = Path(env_path)
  177. if not path.exists():
  178. return {}
  179. env: dict[str, str] = {}
  180. for line in path.read_text(encoding="utf-8").splitlines():
  181. stripped = line.strip()
  182. if not stripped or stripped.startswith("#") or "=" not in stripped:
  183. continue
  184. key, value = stripped.split("=", 1)
  185. env[key.strip()] = value.strip().strip('"').strip("'")
  186. return env
  187. def _env(
  188. key: str,
  189. file_env: dict[str, str],
  190. default: str | None = None,
  191. required: bool = False,
  192. ) -> str:
  193. value = file_env.get(key) or default
  194. if required and not value:
  195. raise RuntimeError(f"missing required env: {key}")
  196. return value or ""