"""一次性:把某个已完成 run 的入池视频(ADD_TO_CONTENT_POOL)推进 AIGC 生成计划。 支持抖音/快手(--platform,默认 douyin);对半分流:前一半 → 生成计划 _1,后一半 → _2(env 配置)。 四条保险: 1. 默认 dry-run:不加 --live 只打印分流,不调任何接口。 2. 安全闸:--live 时若 CAN_NOT_CREATE_PLAN 仍为真值则拒绝(除非 --force)。 3. 不去重:真发只跑一次(别跑两次,会建重复爬取计划);用日志当"推过没"的依据。 4. 记账:每份结果打屏幕 + 落 e2e_logs/aigc_push__.jsonl。 用法: dry-run: python -m scripts.push_runs_to_aigc --run --platform douyin 真发: CAN_NOT_CREATE_PLAN=false python -m scripts.push_runs_to_aigc --run --platform kuaishou --live """ from __future__ import annotations import argparse import json import sys from datetime import datetime, timezone from pathlib import Path from content_agent.integrations.aigc_platform import AigcPlatformClient from content_agent.integrations.database_runtime import ContentSupplyDbConfig from content_agent.run_service import _merged_project_env # 平台 → 计划名/标签里用的中文名。 _PLATFORM_LABEL = {"douyin": "抖音", "kuaishou": "快手"} _POOL_SQL = """ SELECT di.platform_content_id AS content_id FROM content_agent_rule_decisions rd JOIN content_agent_discovered_content_items di ON rd.run_id = di.run_id AND rd.decision_target_id = di.platform_content_id WHERE rd.run_id = %s AND di.platform = %s AND rd.decision_action = 'ADD_TO_CONTENT_POOL' ORDER BY di.id """ def _pooled_content_ids(run_id: str, platform: str) -> list[str]: conn = ContentSupplyDbConfig.from_env().connect() with conn: with conn.cursor() as cur: cur.execute(_POOL_SQL, (run_id, platform)) return [str(row["content_id"]) for row in cur.fetchall()] def _split_half(items: list[str]) -> tuple[list[str], list[str]]: mid = (len(items) + 1) // 2 # 前一半向上取整(奇数时前半多 1) return items[:mid], items[mid:] def main() -> None: parser = argparse.ArgumentParser(description="把某 run 入池视频对半推进 AIGC 生成计划") parser.add_argument("--run", required=True, help="run_id") parser.add_argument("--platform", choices=("douyin", "kuaishou"), default="douyin", help="平台(默认 douyin)") parser.add_argument("--live", action="store_true", help="真发(否则 dry-run 只打印)") parser.add_argument("--force", action="store_true", help="越过 CAN_NOT_CREATE_PLAN 安全闸") args = parser.parse_args() platform = args.platform plat_cn = _PLATFORM_LABEL[platform] env = _merged_project_env() key_1 = f"CONTENTFIND_AIGC_PRODUCE_PLAN_ID_{platform.upper()}_1" key_2 = f"CONTENTFIND_AIGC_PRODUCE_PLAN_ID_{platform.upper()}_2" plan_id_1 = env.get(key_1) plan_id_2 = env.get(key_2) if not (plan_id_1 and plan_id_2): sys.exit(f"缺 {key_1} / {key_2}(看 .env)") content_ids = _pooled_content_ids(args.run, platform) if not content_ids: sys.exit(f"{args.run} 没有{plat_cn}入池(ADD_TO_CONTENT_POOL)视频") half_1, half_2 = _split_half(content_ids) parts = [ {"half": "1", "produce_plan_id": plan_id_1, "content_ids": half_1}, {"half": "2", "produce_plan_id": plan_id_2, "content_ids": half_2}, ] ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") log_path = Path("e2e_logs") / f"aigc_push_{args.run}_{ts}.jsonl" log_path.parent.mkdir(parents=True, exist_ok=True) def log(record: dict) -> None: record["ts"] = datetime.now(timezone.utc).isoformat() line = json.dumps(record, ensure_ascii=False) print(line) with log_path.open("a", encoding="utf-8") as handle: handle.write(line + "\n") print( f"run={args.run} {plat_cn}入池 {len(content_ids)} 个 → " f"前 {len(half_1)} 进 {plan_id_1} / 后 {len(half_2)} 进 {plan_id_2}" ) # --- 保险①:dry-run 默认,只打印不调接口 --- if not args.live: for part in parts: log( { "mode": "dry_run", "run_id": args.run, "platform": platform, "half": part["half"], "produce_plan_id": part["produce_plan_id"], "content_count": len(part["content_ids"]), "content_ids": part["content_ids"], } ) print(f"[dry-run] 未调任何接口。核对无误后:设 CAN_NOT_CREATE_PLAN=false + 加 --live 真发。日志:{log_path}") return # --- 保险②:CAN_NOT_CREATE_PLAN 安全闸 --- can_not = str(env.get("CAN_NOT_CREATE_PLAN", "true")).strip().lower() if can_not in ("1", "true", "yes") and not args.force: sys.exit("安全闸 CAN_NOT_CREATE_PLAN 为真:拒绝真发。要发请设 CAN_NOT_CREATE_PLAN=false,或加 --force。") # --- 保险③:别跑两次,真发前交互确认 --- if input("⚠️ 真发模式:将建 2 个爬取计划并绑进生成计划(别跑两次!)。输入 yes 继续:").strip() != "yes": sys.exit("已取消。") # --- 保险④:逐份真调接口 + 记账 --- client = AigcPlatformClient.from_env(env) for part in parts: record = { "mode": "live", "run_id": args.run, "platform": platform, "half": part["half"], "produce_plan_id": part["produce_plan_id"], "content_count": len(part["content_ids"]), } try: name = f"【内容寻找Agent自动创建】{plat_cn}视频直接抓取-{ts}-{args.run}-{part['half']}" label = f"原始帖子-视频-{plat_cn}-内容添加计划-{args.run}-{part['half']}" crawler_plan_id = client.create_crawler_plan(part["content_ids"], platform=platform, name=name) client.bind_crawler_to_produce( crawler_plan_id=crawler_plan_id, produce_plan_id=part["produce_plan_id"], label=label, platform=platform, ) record.update({"crawler_plan_id": crawler_plan_id, "status": "ok"}) except Exception as exc: # noqa: BLE001 — 单份失败不连累另一份,记账后继续 record.update({"status": "failed", "error": str(exc)}) log(record) print(f"完成。日志:{log_path}") if __name__ == "__main__": main()