""" AIGC plan helpers. This module contains small, side-effect-free helpers for building requests and validating inputs related to AIGC plans. """ from __future__ import annotations import json import logging import os import sys from typing import Any, Dict, Optional, Tuple import requests logger = logging.getLogger(__name__) _AIGC_BASE_URL = "https://aigc-api.aiddit.com" _GET_PRODUCE_PLAN_DETAIL_BY_ID_URL = f"{_AIGC_BASE_URL}/aigc/produce/plan/detail" _PRODUCE_PLAN_SAVE_URL = f"{_AIGC_BASE_URL}/aigc/produce/plan/save" _DEFAULT_TIMEOUT_SECONDS = 60.0 # Keep consistent with `tools/aigc_platform_api.py` to reduce config friction. _DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72" def _get_aigc_token() -> str: token = (os.getenv("AIGC_TOKEN") or "").strip() return token or _DEFAULT_TOKEN def _post_aigc(url: str, params: Any, *, timeout_seconds: float) -> Dict[str, Any]: payload = {"baseInfo": {"token": _get_aigc_token()}, "params": params} try: resp = requests.post( url=url, json=payload, headers={"Content-Type": "application/json"}, timeout=timeout_seconds, ) resp.raise_for_status() data = resp.json() if isinstance(data, dict): return data logger.error("AIGC response json is not dict. url=%s", url) return {} except Exception as e: logger.error( "Invoke AIGC platform failed. url=%s payload=%s err=%s", url, json.dumps(payload, ensure_ascii=False), str(e), ) return {} def build_produce_plan_detail_query(produce_plan_id: str) -> Dict[str, str]: """ Build query params for "produce plan detail by id". This follows the AIGC platform API contract used in `tools/aigc_platform_api.py` (see `find_produce_plan_info_by_id`), where the request params payload is `{"id": }`. Args: produce_plan_id: AIGC produce plan id (non-empty string). Returns: A dict payload to be used as "params" in the API request. Raises: ValueError: If `produce_plan_id` is empty or not a string. """ if not isinstance(produce_plan_id, str): raise ValueError(f"produce_plan_id must be a string, got: {type(produce_plan_id)!r}") plan_id = produce_plan_id.strip() if not plan_id: raise ValueError("produce_plan_id must be a non-empty string") return {"id": plan_id} def query_produce_plan_detail_by_id(produce_plan_id: str) -> Tuple[Optional[Dict[str, Any]], str]: """ Query produce plan detail by id from AIGC platform. This performs the same API call as `tools/aigc_platform_api.py::find_produce_plan_info_by_id`, but is implemented as a standalone utility for reuse. Args: produce_plan_id: AIGC produce plan id (non-empty string). Returns: (data, msg) - data: response["data"] dict when success; otherwise None - msg: empty string on success; otherwise an error message """ try: params = build_produce_plan_detail_query(produce_plan_id) except ValueError as e: return None, str(e) response_json = _post_aigc( _GET_PRODUCE_PLAN_DETAIL_BY_ID_URL, params, timeout_seconds=_DEFAULT_TIMEOUT_SECONDS, ) if not response_json: return None, "AIGC接口调用失败:空响应" if response_json.get("code") != 0: return None, str(response_json.get("msg") or "获取生成计划详情异常") data = response_json.get("data") or {} if not isinstance(data, dict) or not data: return None, str(response_json.get("msg") or "获取生成计划详情异常") return data, "" def shrink_video_group_input_sources(plan_detail: Dict[str, Any]) -> int: """ Mutate plan detail in-place: For each item in inputSourceGroups where groupName == "视频", keep only the first element of inputSources. Returns: Number of groups modified. """ groups = plan_detail.get("inputSourceGroups") if not isinstance(groups, list) or not groups: return 0 modified = 0 for group in groups: if not isinstance(group, dict): continue if group.get("groupName") != "视频": continue sources = group.get("inputSources") if not isinstance(sources, list): group["inputSources"] = [] modified += 1 continue if len(sources) <= 1: continue group["inputSources"] = [sources[0]] modified += 1 return modified def save_produce_plan(plan_detail: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]], str]: """ Save (update) a produce plan detail back to AIGC platform. This mirrors `tools/aigc_platform_api.py` behavior which calls `/aigc/produce/plan/save` with the full plan detail object. Args: plan_detail: The (possibly modified) plan detail dict returned by detail API. Returns: (data, msg) - data: response["data"] dict when success; otherwise None - msg: empty string on success; otherwise an error message """ if not isinstance(plan_detail, dict) or not plan_detail: return None, "plan_detail 参数无效:必须是非空 dict" response_json = _post_aigc( _PRODUCE_PLAN_SAVE_URL, plan_detail, timeout_seconds=_DEFAULT_TIMEOUT_SECONDS, ) if not response_json: return None, "AIGC接口调用失败:空响应" if response_json.get("code") != 0: return None, str(response_json.get("msg") or "保存生成计划异常") data = response_json.get("data") or {} if not isinstance(data, dict) or not data: return None, str(response_json.get("msg") or "保存生成计划异常") return data, "" def main(argv: list[str]) -> int: """ CLI for quick manual testing. Usage: python3 examples/content_finder/utils/aigc_plan.py Env: - AIGC_TOKEN: optional; overrides default token - PRODUCE_PLAN_ID: optional fallback when arg is not provided """ produce_plan_id = (argv[1] if len(argv) > 1 else "").strip() or (os.getenv("PRODUCE_PLAN_ID") or "").strip() if not produce_plan_id: print( "Missing produce_plan_id.\n" "Usage: python3 examples/content_finder/utils/aigc_plan.py \n" "Or set env PRODUCE_PLAN_ID.", file=sys.stderr, ) return 2 data, msg = query_produce_plan_detail_by_id(produce_plan_id) if msg: print(f"Error: {msg}", file=sys.stderr) return 1 modified_groups = shrink_video_group_input_sources(data) if modified_groups: saved, save_msg = save_produce_plan(data) if save_msg: print(f"Error: 保存生成计划失败: {save_msg}", file=sys.stderr) return 1 print(f"Modified groups: {modified_groups}", file=sys.stderr) print(json.dumps(saved, ensure_ascii=False, indent=2)) return 0 print(json.dumps(data, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv))