push_runs_to_aigc.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. """一次性:把某个已完成 run 的入池视频(ADD_TO_CONTENT_POOL)推进 AIGC 生成计划。
  2. 支持抖音/快手(--platform,默认 douyin);对半分流:前一半 → 生成计划 _1,后一半 → _2(env 配置)。
  3. 四条保险:
  4. 1. 默认 dry-run:不加 --live 只打印分流,不调任何接口。
  5. 2. 安全闸:--live 时若 CAN_NOT_CREATE_PLAN 仍为真值则拒绝(除非 --force)。
  6. 3. 不去重:真发只跑一次(别跑两次,会建重复爬取计划);用日志当"推过没"的依据。
  7. 4. 记账:每份结果打屏幕 + 落 e2e_logs/aigc_push_<run>_<ts>.jsonl。
  8. 用法:
  9. dry-run: python -m scripts.push_runs_to_aigc --run <run_id> --platform douyin
  10. 真发: CAN_NOT_CREATE_PLAN=false python -m scripts.push_runs_to_aigc --run <run_id> --platform kuaishou --live
  11. """
  12. from __future__ import annotations
  13. import argparse
  14. import json
  15. import sys
  16. from datetime import datetime, timezone
  17. from pathlib import Path
  18. from content_agent.integrations.aigc_platform import AigcPlatformClient
  19. from content_agent.integrations.database_runtime import ContentSupplyDbConfig
  20. from content_agent.run_service import _merged_project_env
  21. # 平台 → 计划名/标签里用的中文名。
  22. _PLATFORM_LABEL = {"douyin": "抖音", "kuaishou": "快手"}
  23. _POOL_SQL = """
  24. SELECT di.platform_content_id AS content_id
  25. FROM content_agent_rule_decisions rd
  26. JOIN content_agent_discovered_content_items di
  27. ON rd.run_id = di.run_id AND rd.decision_target_id = di.platform_content_id
  28. WHERE rd.run_id = %s AND di.platform = %s AND rd.decision_action = 'ADD_TO_CONTENT_POOL'
  29. ORDER BY di.id
  30. """
  31. def _pooled_content_ids(run_id: str, platform: str) -> list[str]:
  32. conn = ContentSupplyDbConfig.from_env().connect()
  33. with conn:
  34. with conn.cursor() as cur:
  35. cur.execute(_POOL_SQL, (run_id, platform))
  36. return [str(row["content_id"]) for row in cur.fetchall()]
  37. def _split_half(items: list[str]) -> tuple[list[str], list[str]]:
  38. mid = (len(items) + 1) // 2 # 前一半向上取整(奇数时前半多 1)
  39. return items[:mid], items[mid:]
  40. def main() -> None:
  41. parser = argparse.ArgumentParser(description="把某 run 入池视频对半推进 AIGC 生成计划")
  42. parser.add_argument("--run", required=True, help="run_id")
  43. parser.add_argument("--platform", choices=("douyin", "kuaishou"), default="douyin", help="平台(默认 douyin)")
  44. parser.add_argument("--live", action="store_true", help="真发(否则 dry-run 只打印)")
  45. parser.add_argument("--force", action="store_true", help="越过 CAN_NOT_CREATE_PLAN 安全闸")
  46. args = parser.parse_args()
  47. platform = args.platform
  48. plat_cn = _PLATFORM_LABEL[platform]
  49. env = _merged_project_env()
  50. key_1 = f"CONTENTFIND_AIGC_PRODUCE_PLAN_ID_{platform.upper()}_1"
  51. key_2 = f"CONTENTFIND_AIGC_PRODUCE_PLAN_ID_{platform.upper()}_2"
  52. plan_id_1 = env.get(key_1)
  53. plan_id_2 = env.get(key_2)
  54. if not (plan_id_1 and plan_id_2):
  55. sys.exit(f"缺 {key_1} / {key_2}(看 .env)")
  56. content_ids = _pooled_content_ids(args.run, platform)
  57. if not content_ids:
  58. sys.exit(f"{args.run} 没有{plat_cn}入池(ADD_TO_CONTENT_POOL)视频")
  59. half_1, half_2 = _split_half(content_ids)
  60. parts = [
  61. {"half": "1", "produce_plan_id": plan_id_1, "content_ids": half_1},
  62. {"half": "2", "produce_plan_id": plan_id_2, "content_ids": half_2},
  63. ]
  64. ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
  65. log_path = Path("e2e_logs") / f"aigc_push_{args.run}_{ts}.jsonl"
  66. log_path.parent.mkdir(parents=True, exist_ok=True)
  67. def log(record: dict) -> None:
  68. record["ts"] = datetime.now(timezone.utc).isoformat()
  69. line = json.dumps(record, ensure_ascii=False)
  70. print(line)
  71. with log_path.open("a", encoding="utf-8") as handle:
  72. handle.write(line + "\n")
  73. print(
  74. f"run={args.run} {plat_cn}入池 {len(content_ids)} 个 → "
  75. f"前 {len(half_1)} 进 {plan_id_1} / 后 {len(half_2)} 进 {plan_id_2}"
  76. )
  77. # --- 保险①:dry-run 默认,只打印不调接口 ---
  78. if not args.live:
  79. for part in parts:
  80. log(
  81. {
  82. "mode": "dry_run",
  83. "run_id": args.run,
  84. "platform": platform,
  85. "half": part["half"],
  86. "produce_plan_id": part["produce_plan_id"],
  87. "content_count": len(part["content_ids"]),
  88. "content_ids": part["content_ids"],
  89. }
  90. )
  91. print(f"[dry-run] 未调任何接口。核对无误后:设 CAN_NOT_CREATE_PLAN=false + 加 --live 真发。日志:{log_path}")
  92. return
  93. # --- 保险②:CAN_NOT_CREATE_PLAN 安全闸 ---
  94. can_not = str(env.get("CAN_NOT_CREATE_PLAN", "true")).strip().lower()
  95. if can_not in ("1", "true", "yes") and not args.force:
  96. sys.exit("安全闸 CAN_NOT_CREATE_PLAN 为真:拒绝真发。要发请设 CAN_NOT_CREATE_PLAN=false,或加 --force。")
  97. # --- 保险③:别跑两次,真发前交互确认 ---
  98. if input("⚠️ 真发模式:将建 2 个爬取计划并绑进生成计划(别跑两次!)。输入 yes 继续:").strip() != "yes":
  99. sys.exit("已取消。")
  100. # --- 保险④:逐份真调接口 + 记账 ---
  101. client = AigcPlatformClient.from_env(env)
  102. for part in parts:
  103. record = {
  104. "mode": "live",
  105. "run_id": args.run,
  106. "platform": platform,
  107. "half": part["half"],
  108. "produce_plan_id": part["produce_plan_id"],
  109. "content_count": len(part["content_ids"]),
  110. }
  111. try:
  112. name = f"【内容寻找Agent自动创建】{plat_cn}视频直接抓取-{ts}-{args.run}-{part['half']}"
  113. label = f"原始帖子-视频-{plat_cn}-内容添加计划-{args.run}-{part['half']}"
  114. crawler_plan_id = client.create_crawler_plan(part["content_ids"], platform=platform, name=name)
  115. client.bind_crawler_to_produce(
  116. crawler_plan_id=crawler_plan_id,
  117. produce_plan_id=part["produce_plan_id"],
  118. label=label,
  119. platform=platform,
  120. )
  121. record.update({"crawler_plan_id": crawler_plan_id, "status": "ok"})
  122. except Exception as exc: # noqa: BLE001 — 单份失败不连累另一份,记账后继续
  123. record.update({"status": "failed", "error": str(exc)})
  124. log(record)
  125. print(f"完成。日志:{log_path}")
  126. if __name__ == "__main__":
  127. main()