| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- """一次性:把某个已完成 run 的入池视频(ADD_TO_CONTENT_POOL)推进 AIGC 生成计划。
- 支持抖音/快手(--platform,默认 douyin);对半分流:前一半 → 生成计划 _1,后一半 → _2(env 配置)。
- 四条保险:
- 1. 默认 dry-run:不加 --live 只打印分流,不调任何接口。
- 2. 安全闸:--live 时若 CAN_NOT_CREATE_PLAN 仍为真值则拒绝(除非 --force)。
- 3. 不去重:真发只跑一次(别跑两次,会建重复爬取计划);用日志当"推过没"的依据。
- 4. 记账:每份结果打屏幕 + 落 e2e_logs/aigc_push_<run>_<ts>.jsonl。
- 用法:
- dry-run: python -m scripts.push_runs_to_aigc --run <run_id> --platform douyin
- 真发: CAN_NOT_CREATE_PLAN=false python -m scripts.push_runs_to_aigc --run <run_id> --platform kuaishou --live
- """
- from __future__ import annotations
- import argparse
- import json
- import sys
- from datetime import datetime, timezone
- from pathlib import Path
- from content_agent.integrations.aigc_platform import AigcPlatformClient
- from content_agent.integrations.database_runtime import ContentSupplyDbConfig
- from content_agent.run_service import _merged_project_env
- # 平台 → 计划名/标签里用的中文名。
- _PLATFORM_LABEL = {"douyin": "抖音", "kuaishou": "快手"}
- _POOL_SQL = """
- SELECT di.platform_content_id AS content_id
- FROM content_agent_rule_decisions rd
- JOIN content_agent_discovered_content_items di
- ON rd.run_id = di.run_id AND rd.decision_target_id = di.platform_content_id
- WHERE rd.run_id = %s AND di.platform = %s AND rd.decision_action = 'ADD_TO_CONTENT_POOL'
- ORDER BY di.id
- """
- def _pooled_content_ids(run_id: str, platform: str) -> list[str]:
- conn = ContentSupplyDbConfig.from_env().connect()
- with conn:
- with conn.cursor() as cur:
- cur.execute(_POOL_SQL, (run_id, platform))
- return [str(row["content_id"]) for row in cur.fetchall()]
- def _split_half(items: list[str]) -> tuple[list[str], list[str]]:
- mid = (len(items) + 1) // 2 # 前一半向上取整(奇数时前半多 1)
- return items[:mid], items[mid:]
- def main() -> None:
- parser = argparse.ArgumentParser(description="把某 run 入池视频对半推进 AIGC 生成计划")
- parser.add_argument("--run", required=True, help="run_id")
- parser.add_argument("--platform", choices=("douyin", "kuaishou"), default="douyin", help="平台(默认 douyin)")
- parser.add_argument("--live", action="store_true", help="真发(否则 dry-run 只打印)")
- parser.add_argument("--force", action="store_true", help="越过 CAN_NOT_CREATE_PLAN 安全闸")
- args = parser.parse_args()
- platform = args.platform
- plat_cn = _PLATFORM_LABEL[platform]
- env = _merged_project_env()
- key_1 = f"CONTENTFIND_AIGC_PRODUCE_PLAN_ID_{platform.upper()}_1"
- key_2 = f"CONTENTFIND_AIGC_PRODUCE_PLAN_ID_{platform.upper()}_2"
- plan_id_1 = env.get(key_1)
- plan_id_2 = env.get(key_2)
- if not (plan_id_1 and plan_id_2):
- sys.exit(f"缺 {key_1} / {key_2}(看 .env)")
- content_ids = _pooled_content_ids(args.run, platform)
- if not content_ids:
- sys.exit(f"{args.run} 没有{plat_cn}入池(ADD_TO_CONTENT_POOL)视频")
- half_1, half_2 = _split_half(content_ids)
- parts = [
- {"half": "1", "produce_plan_id": plan_id_1, "content_ids": half_1},
- {"half": "2", "produce_plan_id": plan_id_2, "content_ids": half_2},
- ]
- ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
- log_path = Path("e2e_logs") / f"aigc_push_{args.run}_{ts}.jsonl"
- log_path.parent.mkdir(parents=True, exist_ok=True)
- def log(record: dict) -> None:
- record["ts"] = datetime.now(timezone.utc).isoformat()
- line = json.dumps(record, ensure_ascii=False)
- print(line)
- with log_path.open("a", encoding="utf-8") as handle:
- handle.write(line + "\n")
- print(
- f"run={args.run} {plat_cn}入池 {len(content_ids)} 个 → "
- f"前 {len(half_1)} 进 {plan_id_1} / 后 {len(half_2)} 进 {plan_id_2}"
- )
- # --- 保险①:dry-run 默认,只打印不调接口 ---
- if not args.live:
- for part in parts:
- log(
- {
- "mode": "dry_run",
- "run_id": args.run,
- "platform": platform,
- "half": part["half"],
- "produce_plan_id": part["produce_plan_id"],
- "content_count": len(part["content_ids"]),
- "content_ids": part["content_ids"],
- }
- )
- print(f"[dry-run] 未调任何接口。核对无误后:设 CAN_NOT_CREATE_PLAN=false + 加 --live 真发。日志:{log_path}")
- return
- # --- 保险②:CAN_NOT_CREATE_PLAN 安全闸 ---
- can_not = str(env.get("CAN_NOT_CREATE_PLAN", "true")).strip().lower()
- if can_not in ("1", "true", "yes") and not args.force:
- sys.exit("安全闸 CAN_NOT_CREATE_PLAN 为真:拒绝真发。要发请设 CAN_NOT_CREATE_PLAN=false,或加 --force。")
- # --- 保险③:别跑两次,真发前交互确认 ---
- if input("⚠️ 真发模式:将建 2 个爬取计划并绑进生成计划(别跑两次!)。输入 yes 继续:").strip() != "yes":
- sys.exit("已取消。")
- # --- 保险④:逐份真调接口 + 记账 ---
- client = AigcPlatformClient.from_env(env)
- for part in parts:
- record = {
- "mode": "live",
- "run_id": args.run,
- "platform": platform,
- "half": part["half"],
- "produce_plan_id": part["produce_plan_id"],
- "content_count": len(part["content_ids"]),
- }
- try:
- name = f"【内容寻找Agent自动创建】{plat_cn}视频直接抓取-{ts}-{args.run}-{part['half']}"
- label = f"原始帖子-视频-{plat_cn}-内容添加计划-{args.run}-{part['half']}"
- crawler_plan_id = client.create_crawler_plan(part["content_ids"], platform=platform, name=name)
- client.bind_crawler_to_produce(
- crawler_plan_id=crawler_plan_id,
- produce_plan_id=part["produce_plan_id"],
- label=label,
- platform=platform,
- )
- record.update({"crawler_plan_id": crawler_plan_id, "status": "ok"})
- except Exception as exc: # noqa: BLE001 — 单份失败不连累另一份,记账后继续
- record.update({"status": "failed", "error": str(exc)})
- log(record)
- print(f"完成。日志:{log_path}")
- if __name__ == "__main__":
- main()
|