aigc_plan.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. """
  2. AIGC plan helpers.
  3. This module contains small, side-effect-free helpers for building requests and
  4. validating inputs related to AIGC plans.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import logging
  9. import os
  10. import sys
  11. from typing import Any, Dict, Optional, Tuple
  12. import requests
  13. logger = logging.getLogger(__name__)
  14. _AIGC_BASE_URL = "https://aigc-api.aiddit.com"
  15. _GET_PRODUCE_PLAN_DETAIL_BY_ID_URL = f"{_AIGC_BASE_URL}/aigc/produce/plan/detail"
  16. _PRODUCE_PLAN_SAVE_URL = f"{_AIGC_BASE_URL}/aigc/produce/plan/save"
  17. _DEFAULT_TIMEOUT_SECONDS = 60.0
  18. # Keep consistent with `tools/aigc_platform_api.py` to reduce config friction.
  19. _DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
  20. def _get_aigc_token() -> str:
  21. token = (os.getenv("AIGC_TOKEN") or "").strip()
  22. return token or _DEFAULT_TOKEN
  23. def _post_aigc(url: str, params: Any, *, timeout_seconds: float) -> Dict[str, Any]:
  24. payload = {"baseInfo": {"token": _get_aigc_token()}, "params": params}
  25. try:
  26. resp = requests.post(
  27. url=url,
  28. json=payload,
  29. headers={"Content-Type": "application/json"},
  30. timeout=timeout_seconds,
  31. )
  32. resp.raise_for_status()
  33. data = resp.json()
  34. if isinstance(data, dict):
  35. return data
  36. logger.error("AIGC response json is not dict. url=%s", url)
  37. return {}
  38. except Exception as e:
  39. logger.error(
  40. "Invoke AIGC platform failed. url=%s payload=%s err=%s",
  41. url,
  42. json.dumps(payload, ensure_ascii=False),
  43. str(e),
  44. )
  45. return {}
  46. def build_produce_plan_detail_query(produce_plan_id: str) -> Dict[str, str]:
  47. """
  48. Build query params for "produce plan detail by id".
  49. This follows the AIGC platform API contract used in
  50. `tools/aigc_platform_api.py` (see `find_produce_plan_info_by_id`), where the
  51. request params payload is `{"id": <produce_plan_id>}`.
  52. Args:
  53. produce_plan_id: AIGC produce plan id (non-empty string).
  54. Returns:
  55. A dict payload to be used as "params" in the API request.
  56. Raises:
  57. ValueError: If `produce_plan_id` is empty or not a string.
  58. """
  59. if not isinstance(produce_plan_id, str):
  60. raise ValueError(f"produce_plan_id must be a string, got: {type(produce_plan_id)!r}")
  61. plan_id = produce_plan_id.strip()
  62. if not plan_id:
  63. raise ValueError("produce_plan_id must be a non-empty string")
  64. return {"id": plan_id}
  65. def query_produce_plan_detail_by_id(produce_plan_id: str) -> Tuple[Optional[Dict[str, Any]], str]:
  66. """
  67. Query produce plan detail by id from AIGC platform.
  68. This performs the same API call as `tools/aigc_platform_api.py::find_produce_plan_info_by_id`,
  69. but is implemented as a standalone utility for reuse.
  70. Args:
  71. produce_plan_id: AIGC produce plan id (non-empty string).
  72. Returns:
  73. (data, msg)
  74. - data: response["data"] dict when success; otherwise None
  75. - msg: empty string on success; otherwise an error message
  76. """
  77. try:
  78. params = build_produce_plan_detail_query(produce_plan_id)
  79. except ValueError as e:
  80. return None, str(e)
  81. response_json = _post_aigc(
  82. _GET_PRODUCE_PLAN_DETAIL_BY_ID_URL,
  83. params,
  84. timeout_seconds=_DEFAULT_TIMEOUT_SECONDS,
  85. )
  86. if not response_json:
  87. return None, "AIGC接口调用失败:空响应"
  88. if response_json.get("code") != 0:
  89. return None, str(response_json.get("msg") or "获取生成计划详情异常")
  90. data = response_json.get("data") or {}
  91. if not isinstance(data, dict) or not data:
  92. return None, str(response_json.get("msg") or "获取生成计划详情异常")
  93. return data, ""
  94. def shrink_video_group_input_sources(plan_detail: Dict[str, Any]) -> int:
  95. """
  96. Mutate plan detail in-place:
  97. For each item in inputSourceGroups where groupName == "视频",
  98. keep only the first element of inputSources.
  99. Returns:
  100. Number of groups modified.
  101. """
  102. groups = plan_detail.get("inputSourceGroups")
  103. if not isinstance(groups, list) or not groups:
  104. return 0
  105. modified = 0
  106. for group in groups:
  107. if not isinstance(group, dict):
  108. continue
  109. if group.get("groupName") != "视频":
  110. continue
  111. sources = group.get("inputSources")
  112. if not isinstance(sources, list):
  113. group["inputSources"] = []
  114. modified += 1
  115. continue
  116. if len(sources) <= 1:
  117. continue
  118. group["inputSources"] = [sources[0]]
  119. modified += 1
  120. return modified
  121. def save_produce_plan(plan_detail: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], str]:
  122. """
  123. Save (update) a produce plan detail back to AIGC platform.
  124. This mirrors `tools/aigc_platform_api.py` behavior which calls
  125. `/aigc/produce/plan/save` with the full plan detail object.
  126. Args:
  127. plan_detail: The (possibly modified) plan detail dict returned by detail API.
  128. Returns:
  129. (data, msg)
  130. - data: response["data"] dict when success; otherwise None
  131. - msg: empty string on success; otherwise an error message
  132. """
  133. if not isinstance(plan_detail, dict) or not plan_detail:
  134. return None, "plan_detail 参数无效:必须是非空 dict"
  135. response_json = _post_aigc(
  136. _PRODUCE_PLAN_SAVE_URL,
  137. plan_detail,
  138. timeout_seconds=_DEFAULT_TIMEOUT_SECONDS,
  139. )
  140. if not response_json:
  141. return None, "AIGC接口调用失败:空响应"
  142. if response_json.get("code") != 0:
  143. return None, str(response_json.get("msg") or "保存生成计划异常")
  144. data = response_json.get("data") or {}
  145. if not isinstance(data, dict) or not data:
  146. return None, str(response_json.get("msg") or "保存生成计划异常")
  147. return data, ""
  148. def main(argv: list[str]) -> int:
  149. """
  150. CLI for quick manual testing.
  151. Usage:
  152. python3 examples/content_finder/utils/aigc_plan.py <produce_plan_id>
  153. Env:
  154. - AIGC_TOKEN: optional; overrides default token
  155. - PRODUCE_PLAN_ID: optional fallback when arg is not provided
  156. """
  157. produce_plan_id = (argv[1] if len(argv) > 1 else "").strip() or (os.getenv("PRODUCE_PLAN_ID") or "").strip()
  158. if not produce_plan_id:
  159. print(
  160. "Missing produce_plan_id.\n"
  161. "Usage: python3 examples/content_finder/utils/aigc_plan.py <produce_plan_id>\n"
  162. "Or set env PRODUCE_PLAN_ID.",
  163. file=sys.stderr,
  164. )
  165. return 2
  166. data, msg = query_produce_plan_detail_by_id(produce_plan_id)
  167. if msg:
  168. print(f"Error: {msg}", file=sys.stderr)
  169. return 1
  170. modified_groups = shrink_video_group_input_sources(data)
  171. if modified_groups:
  172. saved, save_msg = save_produce_plan(data)
  173. if save_msg:
  174. print(f"Error: 保存生成计划失败: {save_msg}", file=sys.stderr)
  175. return 1
  176. print(f"Modified groups: {modified_groups}", file=sys.stderr)
  177. print(json.dumps(saved, ensure_ascii=False, indent=2))
  178. return 0
  179. print(json.dumps(data, ensure_ascii=False, indent=2))
  180. return 0
  181. if __name__ == "__main__":
  182. raise SystemExit(main(sys.argv))