execute_creation_once.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. """模块 B 创意搭建子系统 — 主循环入口(P0-A / P0-C / P0-E,2026-06-09)。
  2. 数据流(三阶段,先审后挂):
  3. Phase 1 — 准备:
  4. 扫账户 → 关联点过滤(读调控当日 latest_decisions 排除 pause)
  5. → find_ads_needing_creatives
  6. → 对每条广告 prepare_one_creative_for_ad(召回 + 上传图 + xcx/save + build body)
  7. → pending_records (含完整 Phase 3 用的 _request_body)
  8. → 写 outputs/data/creation_pending_{date}.json(追溯 + 独立 apply 用)
  9. Phase 2 — 审批(若 CREATION_APPROVAL_REQUIRED=True):
  10. run_approval_workflow 生成 13 列 xlsx → 上传飞书 sheet → 发链接 → 轮询读决策列
  11. actions {row_idx: approve/reject/hold}
  12. → 写回 records["action"]
  13. Phase 3 — 执行:
  14. 对 action=approve 的 → POST /dynamic_creatives/add
  15. → 写 outputs/data/creation_run_{date}.json
  16. → 发飞书"执行汇报"消息
  17. 开关 CREATION_APPROVAL_REQUIRED=False 时跳过 Phase 2,全 records 直接 approve。
  18. """
  19. import json
  20. import logging
  21. import sys
  22. from datetime import datetime, timezone
  23. from pathlib import Path
  24. _HERE = Path(__file__).parent
  25. sys.path.insert(0, str(_HERE.parent.parent))
  26. sys.path.insert(0, str(_HERE))
  27. from dotenv import load_dotenv # noqa: E402
  28. load_dotenv(_HERE / ".env")
  29. from config import ( # noqa: E402
  30. ADS_PER_ACCOUNT,
  31. CREATION_APPROVAL_REQUIRED,
  32. CREATION_APPROVAL_TIMEOUT_MINUTES,
  33. FIXED_TARGETING_AGE,
  34. TARGET_CREATIVES_PER_AD,
  35. WHITELIST_ACCOUNTS,
  36. )
  37. # config import 副作用会改 sys.path(把 im-client / agent/tools/builtin 顶到前面),
  38. # 强占 sys.path[0],只影响本入口自己的 import resolution。
  39. while str(_HERE) in sys.path:
  40. sys.path.remove(str(_HERE))
  41. sys.path.insert(0, str(_HERE))
  42. from tools.ad_creation import ( # noqa: E402
  43. build_ad_request_body,
  44. compute_fingerprint,
  45. enumerate_new_ad_candidates,
  46. post_ad_with_prepared_body,
  47. )
  48. from tools.creative_creation import ( # noqa: E402
  49. find_ads_needing_creatives,
  50. load_excluded_ad_ids_from_adjustment,
  51. prepare_one_creative_for_ad,
  52. )
  53. from execute_creation_apply import ( # noqa: E402
  54. apply_pending_records,
  55. write_summary as write_apply_summary,
  56. _send_apply_summary_to_feishu,
  57. )
  58. logger = logging.getLogger("execute_creation_once")
  59. def _setup_logging() -> None:
  60. logging.basicConfig(
  61. level=logging.INFO,
  62. format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
  63. datefmt="%H:%M:%S",
  64. )
  65. for noisy in ("httpx", "httpcore", "config", "db.config"):
  66. logging.getLogger(noisy).setLevel(logging.WARNING)
  67. def _fetch_existing_fingerprints_for_account(account_id: int) -> set[str]:
  68. """Task 27:拉账户下所有非 DELETED 广告 → 反推 fingerprint 集合(预校验唯一性)。
  69. 腾讯文档:NORMAL + SUSPEND 状态广告均占用唯一性槽位,DENIED 也算占用。
  70. list 接口默认返回所有非 DELETED 状态 ad,故无需显式 filter status。
  71. 若腾讯 list 接口某些字段不返回(field 实测差异),降级:跳过该条 + log 警告。
  72. 返回空集 → 上层退化为"不预校验",enumerate 仍能跑(原有行为)。
  73. """
  74. from tools.ad_api import _get
  75. fingerprints: set[str] = set()
  76. skipped = 0
  77. page = 1
  78. try:
  79. while True:
  80. r = _get("/adgroups/get", {
  81. "account_id": account_id, "page": page, "page_size": 100,
  82. "fields": [
  83. "adgroup_id", "configured_status", "site_set",
  84. "targeting", "scene_spec",
  85. ],
  86. })
  87. ads = (r.get("data") or {}).get("list") or []
  88. if not ads:
  89. break
  90. for ad in ads:
  91. tgt = ad.get("targeting") or {}
  92. sc = ad.get("scene_spec") or {}
  93. wp = sc.get("wechat_position") if sc else None
  94. site_set = ad.get("site_set") or []
  95. age = tgt.get("age") or []
  96. geo_regions = (tgt.get("geo_location") or {}).get("regions") or []
  97. custom_audience = tgt.get("custom_audience")
  98. if not site_set or not age:
  99. skipped += 1
  100. continue
  101. try:
  102. fp = compute_fingerprint(
  103. account_id=account_id,
  104. site_set=site_set,
  105. custom_audience=custom_audience,
  106. age=age,
  107. geo_regions=geo_regions,
  108. wechat_position=wp,
  109. )
  110. fingerprints.add(fp)
  111. except Exception as e:
  112. skipped += 1
  113. logger.debug(
  114. "[phase0] fingerprint 算失败 ad=%s: %s",
  115. ad.get("adgroup_id"), e,
  116. )
  117. if len(ads) < 100:
  118. break
  119. page += 1
  120. except Exception as e:
  121. logger.warning(
  122. "[phase0] account=%d 拉 fingerprint 集失败(降级为不预校验): %s",
  123. account_id, e,
  124. )
  125. return set()
  126. logger.info(
  127. "[phase0] 已存在 fingerprint=%d 个(skip 缺字段广告 %d 条)",
  128. len(fingerprints), skipped,
  129. )
  130. return fingerprints
  131. def phase0_create_ads(target_ads: int = ADS_PER_ACCOUNT) -> list[dict]:
  132. """Phase 0:对每账户检查广告数,不足则建到 target_ads 条(模块 A,2026-06-09 P1-G)。
  133. 流程:
  134. account → 查当前广告数(NORMAL+SUSPEND 算占用)→ 不足 target_ads →
  135. → 反查 fingerprint 集 → enumerate candidates(排除已有 fp)→
  136. → 飞书审批 → approve 项 → 真 POST /adgroups/add → 返回新建的 adgroup_id 列表
  137. Returns: 本次新建成功的广告 list[dict],每项含 {account_id, adgroup_id, adgroup_name, wechat_position}
  138. """
  139. from datetime import datetime, timezone
  140. from tools.ad_api import _get
  141. from tools.im_approval_ad_creation import run_ad_approval_workflow
  142. if not WHITELIST_ACCOUNTS:
  143. logger.error("[phase0] WHITELIST_ACCOUNTS 为空,退出")
  144. return []
  145. # Task 26:NORMAL + SUSPEND 都算占用唯一性槽位(腾讯文档:删除前历史广告占槽位)
  146. OCCUPIED_STATUSES = {"AD_STATUS_NORMAL", "AD_STATUS_SUSPEND"}
  147. pending_ad_records: list[dict] = []
  148. for account_id in WHITELIST_ACCOUNTS:
  149. logger.info("=" * 60)
  150. logger.info("[phase0] 账户 %d 处理开始", account_id)
  151. # Task 26:查所有非 DELETED 广告,NORMAL+SUSPEND 都算占用槽位
  152. try:
  153. r = _get("/adgroups/get", {
  154. "account_id": account_id, "page": 1, "page_size": 100,
  155. "fields": ["adgroup_id", "configured_status"],
  156. })
  157. all_ads = (r.get("data") or {}).get("list") or []
  158. normal_n = sum(1 for a in all_ads if a.get("configured_status") == "AD_STATUS_NORMAL")
  159. suspend_n = sum(1 for a in all_ads if a.get("configured_status") == "AD_STATUS_SUSPEND")
  160. occupied = [a for a in all_ads if a.get("configured_status") in OCCUPIED_STATUSES]
  161. logger.info(
  162. "[phase0] 广告数: 总 %d,NORMAL %d + SUSPEND %d = 占用 %d / 目标 %d",
  163. len(all_ads), normal_n, suspend_n, len(occupied), target_ads,
  164. )
  165. except Exception as e:
  166. logger.exception("[phase0] account=%d 查广告数失败: %s", account_id, e)
  167. continue
  168. to_create = max(0, target_ads - len(occupied))
  169. if to_create == 0:
  170. logger.info("[phase0] 占用槽位已满,跳过")
  171. continue
  172. # Task 27:fingerprint 预校验 — 反查现存广告的 fp 集合,enumerate 时排除
  173. existing_fps = _fetch_existing_fingerprints_for_account(account_id)
  174. # enumerate 候选(模块 A,差异化 wechat_position 有/无)
  175. try:
  176. candidates = enumerate_new_ad_candidates(
  177. account_id, count=to_create,
  178. existing_fingerprints=existing_fps,
  179. )
  180. except Exception as e:
  181. logger.exception("[phase0] account=%d enumerate 失败: %s", account_id, e)
  182. continue
  183. if not candidates:
  184. logger.warning("[phase0] enumerate 返回 0 条候选,跳过")
  185. continue
  186. today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
  187. age_range = ",".join(
  188. f"{a.get('min')}-{a.get('max')}" for a in FIXED_TARGETING_AGE
  189. )
  190. for c in candidates:
  191. # 2026-06-10 修复:Phase 0 经过飞书审批后,广告应立即 NORMAL 生效
  192. # build_ad_request_body 的 default SUSPEND 是模块 A dry-run 时代的旧 SOP
  193. body = build_ad_request_body(c, configured_status="AD_STATUS_NORMAL")
  194. rec = {
  195. "approval_date": today,
  196. "account_id": c.account_id,
  197. "audience_tier_label": c.audience_tier_label,
  198. "adgroup_name": c.adgroup_name,
  199. "site_set": c.site_set,
  200. "wechat_position": c.wechat_position,
  201. "bid_amount_fen": c.bid_amount_fen,
  202. "age_range": age_range,
  203. "fingerprint": c.fingerprint,
  204. "_request_body": body,
  205. }
  206. pending_ad_records.append(rec)
  207. if not pending_ad_records:
  208. logger.info("[phase0] 无广告需新建,Phase 0 结束")
  209. return []
  210. # 飞书审批(如需要)
  211. data_dir = _HERE / "outputs" / "data"
  212. if CREATION_APPROVAL_REQUIRED:
  213. logger.info(
  214. "[phase0] CREATION_APPROVAL_REQUIRED=True → 飞书审批 %d 条广告候选",
  215. len(pending_ad_records),
  216. )
  217. sheet_meta, actions = run_ad_approval_workflow(
  218. records=pending_ad_records,
  219. xlsx_output_dir=data_dir,
  220. timeout_minutes=CREATION_APPROVAL_TIMEOUT_MINUTES,
  221. )
  222. for i, rec in enumerate(pending_ad_records, start=1):
  223. rec["action"] = actions.get(i, "skip")
  224. else:
  225. for rec in pending_ad_records:
  226. rec["action"] = "approve"
  227. # POST 真建 approve 项
  228. created: list[dict] = []
  229. for rec in pending_ad_records:
  230. if rec.get("action") != "approve":
  231. continue
  232. adgroup_id = post_ad_with_prepared_body(
  233. account_id=int(rec["account_id"]),
  234. body=rec["_request_body"],
  235. )
  236. if adgroup_id:
  237. created.append({
  238. "account_id": rec["account_id"],
  239. "adgroup_id": adgroup_id,
  240. "adgroup_name": rec["adgroup_name"],
  241. "wechat_position": rec.get("wechat_position"),
  242. })
  243. logger.info(
  244. "[phase0] 完成:候选 %d 条,approve %d 条,实际建 %d 条",
  245. len(pending_ad_records),
  246. sum(1 for r in pending_ad_records if r.get("action") == "approve"),
  247. len(created),
  248. )
  249. return created
  250. def phase1_prepare(target_creatives: int = TARGET_CREATIVES_PER_AD) -> list[dict]:
  251. """Phase 1:扫账户 → 关联点过滤 → 准备 pending records(不 POST 腾讯)。
  252. Returns:
  253. pending_records — 每个元素是 prepare_one_creative_for_ad 返回的 dict
  254. """
  255. if not WHITELIST_ACCOUNTS:
  256. logger.error("[phase1] WHITELIST_ACCOUNTS 为空,Phase 1 退出")
  257. return []
  258. excluded_ad_ids = load_excluded_ad_ids_from_adjustment()
  259. logger.info("[phase1] 关联点过滤集合 size=%d", len(excluded_ad_ids))
  260. pending_records: list[dict] = []
  261. # 全局 used 集合(2026-06-10 P0-NEW-3 修复:跨账户 landing/material 也互斥)
  262. # 起因:实测 70073686 跨"泛人群"+"回流330以上人群"两个 crowd_package 池子重复,
  263. # account 级 set 漏 — 提到 run_once 一份。
  264. used_material_ids_global: set = set()
  265. used_landing_ids_global: set = set()
  266. for account_id in WHITELIST_ACCOUNTS:
  267. logger.info("=" * 60)
  268. logger.info("[phase1] 账户 %d 处理开始", account_id)
  269. try:
  270. ads = find_ads_needing_creatives(account_id, min_creatives=target_creatives)
  271. except Exception as e:
  272. logger.exception("[phase1] account=%d find_ads 失败,跳过:%s", account_id, e)
  273. continue
  274. ads_after_filter = [a for a in ads if a["adgroup_id"] not in excluded_ad_ids]
  275. excluded_count = len(ads) - len(ads_after_filter)
  276. if excluded_count:
  277. logger.info(
  278. "[phase1] account=%d 关联点过滤排除 %d 条",
  279. account_id, excluded_count,
  280. )
  281. if not ads_after_filter:
  282. logger.info("[phase1] account=%d 无广告需补创意", account_id)
  283. continue
  284. for ad in ads_after_filter:
  285. adgroup_id = ad["adgroup_id"]
  286. already_have = ad["creative_count"]
  287. to_add = max(0, target_creatives - already_have)
  288. logger.info(
  289. "[phase1] adgroup=%d(have=%d need=%d)",
  290. adgroup_id, already_have, to_add,
  291. )
  292. for _ in range(to_add):
  293. try:
  294. rec = prepare_one_creative_for_ad(
  295. account_id, adgroup_id,
  296. excluded_material_ids=used_material_ids_global,
  297. excluded_landing_ids=used_landing_ids_global,
  298. )
  299. except Exception as e:
  300. logger.exception(
  301. "[phase1] adgroup=%d 准备失败:%s", adgroup_id, e,
  302. )
  303. rec = None
  304. if rec:
  305. pending_records.append(rec)
  306. used_material_ids_global.add(rec["_material_id"])
  307. used_landing_ids_global.add(rec["landing_video_id"])
  308. else:
  309. # 2026-06-10 用户要求:单条 prepare 失败 → continue 不 break
  310. # 同广告剩余 to_add 创意还能继续试,不被一次失败拖累
  311. logger.info(
  312. "[phase1] adgroup=%d 本条创意 prepare 失败,试下一条",
  313. adgroup_id,
  314. )
  315. logger.info("=" * 60)
  316. logger.info("[phase1] 准备完成,共 %d 条 pending records", len(pending_records))
  317. return pending_records
  318. def _write_pending_records(records: list[dict], output_dir: Path) -> Path:
  319. """落 Phase 1 产物 JSON(供 Phase 3 独立 apply 用 / 追溯)。"""
  320. output_dir.mkdir(parents=True, exist_ok=True)
  321. date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
  322. ts = datetime.now(timezone.utc).strftime("%H%M%S")
  323. out_path = output_dir / f"creation_pending_{date_str}_{ts}.json"
  324. with open(out_path, "w", encoding="utf-8") as f:
  325. json.dump(records, f, ensure_ascii=False, indent=2)
  326. return out_path
  327. def phase2_approval(records: list[dict], xlsx_output_dir: Path) -> dict[int, str]:
  328. """Phase 2:飞书审批(若需要)。
  329. Returns: actions {row_idx: action}, 1-based row_idx
  330. """
  331. from tools.im_approval_creation import run_approval_workflow
  332. sheet_meta, actions = run_approval_workflow(
  333. records=records,
  334. xlsx_output_dir=xlsx_output_dir,
  335. timeout_minutes=CREATION_APPROVAL_TIMEOUT_MINUTES,
  336. )
  337. logger.info(
  338. "[phase2] 审批完成 sheet_url=%s actions=%d/%d",
  339. sheet_meta.get("url"), len(actions), len(records),
  340. )
  341. return actions
  342. def run_once() -> dict:
  343. """完整主循环:Phase 0 → Phase 1 → Phase 2(若开关 True)→ Phase 3。"""
  344. run_started = datetime.now(timezone.utc).isoformat()
  345. # Phase 0:模块 A 建广告(满足每账户 ADS_PER_ACCOUNT 条)
  346. logger.info("=" * 60)
  347. logger.info("[main] Phase 0 启动 — 模块 A 检查 + 建广告")
  348. created_ads = phase0_create_ads()
  349. logger.info("[main] Phase 0 完成:本轮新建广告 %d 条", len(created_ads))
  350. # Phase 1:模块 B 给所有广告(新+旧)补创意
  351. logger.info("=" * 60)
  352. logger.info("[main] Phase 1 启动 — 模块 B 给广告补创意")
  353. pending_records = phase1_prepare()
  354. if not pending_records:
  355. logger.info("[main] Phase 1 无 pending records,主循环退出")
  356. return {
  357. "run_started": run_started,
  358. "run_finished": datetime.now(timezone.utc).isoformat(),
  359. "approval_required": CREATION_APPROVAL_REQUIRED,
  360. "phase0_created_ads": len(created_ads),
  361. "total": {"phase1_prepared": 0},
  362. }
  363. data_dir = _HERE / "outputs" / "data"
  364. pending_path = _write_pending_records(pending_records, data_dir)
  365. logger.info("[main] pending records 已写入: %s", pending_path)
  366. # Phase 2:审批(或 skip)
  367. if CREATION_APPROVAL_REQUIRED:
  368. logger.info("[main] CREATION_APPROVAL_REQUIRED=True → 进 Phase 2 飞书审批")
  369. actions = phase2_approval(pending_records, data_dir)
  370. for i, rec in enumerate(pending_records, start=1):
  371. rec["action"] = actions.get(i, "skip")
  372. else:
  373. logger.info("[main] CREATION_APPROVAL_REQUIRED=False → 全 records approve")
  374. for rec in pending_records:
  375. rec["action"] = "approve"
  376. # Phase 3:执行
  377. logger.info("=" * 60)
  378. logger.info("[main] Phase 3 执行启动")
  379. summary = apply_pending_records(pending_records)
  380. summary["approval_required"] = CREATION_APPROVAL_REQUIRED
  381. summary["pending_records_path"] = str(pending_path)
  382. summary_path = write_apply_summary(summary, data_dir)
  383. logger.info("[main] summary 已写入: %s", summary_path)
  384. # 发执行汇报
  385. _send_apply_summary_to_feishu(summary)
  386. summary["run_started"] = run_started
  387. summary["run_finished"] = datetime.now(timezone.utc).isoformat()
  388. summary["phase0_created_ads"] = len(created_ads)
  389. summary["phase0_ads"] = created_ads
  390. return summary
  391. def main() -> int:
  392. _setup_logging()
  393. logger.info("=" * 60)
  394. logger.info("模块 B 创意搭建子系统 — 主循环启动")
  395. logger.info("TARGET_CREATIVES_PER_AD = %d", TARGET_CREATIVES_PER_AD)
  396. logger.info("CREATION_APPROVAL_REQUIRED = %s", CREATION_APPROVAL_REQUIRED)
  397. logger.info("WHITELIST_ACCOUNTS = %s", WHITELIST_ACCOUNTS)
  398. logger.info("=" * 60)
  399. summary = run_once()
  400. t = summary.get("total") or {}
  401. logger.info("=" * 60)
  402. logger.info("[main] 主循环结束")
  403. if "phase1_prepared" in t:
  404. logger.info(" phase1_prepared = %d", t["phase1_prepared"])
  405. else:
  406. logger.info(" records = %d", t.get("records", 0))
  407. logger.info(" approved = %d", t.get("approved", 0))
  408. logger.info(" posted_ok = %d", t.get("posted_ok", 0))
  409. logger.info(" posted_failed = %d", t.get("posted_failed", 0))
  410. logger.info("=" * 60)
  411. return 0 if t.get("posted_failed", 0) == 0 else 1
  412. if __name__ == "__main__":
  413. sys.exit(main())