| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- """模块 B 创意搭建子系统 — 主循环入口(P0-A / P0-C / P0-E,2026-06-09)。
- 数据流(三阶段,先审后挂):
- Phase 1 — 准备:
- 扫账户 → 关联点过滤(读调控当日 latest_decisions 排除 pause)
- → find_ads_needing_creatives
- → 对每条广告 prepare_one_creative_for_ad(召回 + 上传图 + xcx/save + build body)
- → pending_records (含完整 Phase 3 用的 _request_body)
- → 写 outputs/data/creation_pending_{date}.json(追溯 + 独立 apply 用)
- Phase 2 — 审批(若 CREATION_APPROVAL_REQUIRED=True):
- run_approval_workflow 生成 13 列 xlsx → 上传飞书 sheet → 发链接 → 轮询读决策列
- actions {row_idx: approve/reject/hold}
- → 写回 records["action"]
- Phase 3 — 执行:
- 对 action=approve 的 → POST /dynamic_creatives/add
- → 写 outputs/data/creation_run_{date}.json
- → 发飞书"执行汇报"消息
- 开关 CREATION_APPROVAL_REQUIRED=False 时跳过 Phase 2,全 records 直接 approve。
- """
- import json
- import logging
- import sys
- from datetime import datetime, timezone
- from pathlib import Path
- _HERE = Path(__file__).parent
- sys.path.insert(0, str(_HERE.parent.parent))
- sys.path.insert(0, str(_HERE))
- from dotenv import load_dotenv # noqa: E402
- load_dotenv(_HERE / ".env")
- from config import ( # noqa: E402
- ADS_PER_ACCOUNT,
- CREATION_APPROVAL_REQUIRED,
- CREATION_APPROVAL_TIMEOUT_MINUTES,
- FIXED_TARGETING_AGE,
- TARGET_CREATIVES_PER_AD,
- WHITELIST_ACCOUNTS,
- )
- # config import 副作用会改 sys.path(把 im-client / agent/tools/builtin 顶到前面),
- # 强占 sys.path[0],只影响本入口自己的 import resolution。
- while str(_HERE) in sys.path:
- sys.path.remove(str(_HERE))
- sys.path.insert(0, str(_HERE))
- from tools.ad_creation import ( # noqa: E402
- build_ad_request_body,
- compute_fingerprint,
- enumerate_new_ad_candidates,
- post_ad_with_prepared_body,
- )
- from tools.creative_creation import ( # noqa: E402
- find_ads_needing_creatives,
- load_excluded_ad_ids_from_adjustment,
- prepare_one_creative_for_ad,
- )
- from execute_creation_apply import ( # noqa: E402
- apply_pending_records,
- write_summary as write_apply_summary,
- _send_apply_summary_to_feishu,
- )
- logger = logging.getLogger("execute_creation_once")
- def _setup_logging() -> None:
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
- datefmt="%H:%M:%S",
- )
- for noisy in ("httpx", "httpcore", "config", "db.config"):
- logging.getLogger(noisy).setLevel(logging.WARNING)
- def _fetch_existing_fingerprints_for_account(account_id: int) -> set[str]:
- """Task 27:拉账户下所有非 DELETED 广告 → 反推 fingerprint 集合(预校验唯一性)。
- 腾讯文档:NORMAL + SUSPEND 状态广告均占用唯一性槽位,DENIED 也算占用。
- list 接口默认返回所有非 DELETED 状态 ad,故无需显式 filter status。
- 若腾讯 list 接口某些字段不返回(field 实测差异),降级:跳过该条 + log 警告。
- 返回空集 → 上层退化为"不预校验",enumerate 仍能跑(原有行为)。
- """
- from tools.ad_api import _get
- fingerprints: set[str] = set()
- skipped = 0
- page = 1
- try:
- while True:
- r = _get("/adgroups/get", {
- "account_id": account_id, "page": page, "page_size": 100,
- "fields": [
- "adgroup_id", "configured_status", "site_set",
- "targeting", "scene_spec",
- ],
- })
- ads = (r.get("data") or {}).get("list") or []
- if not ads:
- break
- for ad in ads:
- tgt = ad.get("targeting") or {}
- sc = ad.get("scene_spec") or {}
- wp = sc.get("wechat_position") if sc else None
- site_set = ad.get("site_set") or []
- age = tgt.get("age") or []
- geo_regions = (tgt.get("geo_location") or {}).get("regions") or []
- custom_audience = tgt.get("custom_audience")
- if not site_set or not age:
- skipped += 1
- continue
- try:
- fp = compute_fingerprint(
- account_id=account_id,
- site_set=site_set,
- custom_audience=custom_audience,
- age=age,
- geo_regions=geo_regions,
- wechat_position=wp,
- )
- fingerprints.add(fp)
- except Exception as e:
- skipped += 1
- logger.debug(
- "[phase0] fingerprint 算失败 ad=%s: %s",
- ad.get("adgroup_id"), e,
- )
- if len(ads) < 100:
- break
- page += 1
- except Exception as e:
- logger.warning(
- "[phase0] account=%d 拉 fingerprint 集失败(降级为不预校验): %s",
- account_id, e,
- )
- return set()
- logger.info(
- "[phase0] 已存在 fingerprint=%d 个(skip 缺字段广告 %d 条)",
- len(fingerprints), skipped,
- )
- return fingerprints
- def phase0_create_ads(target_ads: int = ADS_PER_ACCOUNT) -> list[dict]:
- """Phase 0:对每账户检查广告数,不足则建到 target_ads 条(模块 A,2026-06-09 P1-G)。
- 流程:
- account → 查当前广告数(NORMAL+SUSPEND 算占用)→ 不足 target_ads →
- → 反查 fingerprint 集 → enumerate candidates(排除已有 fp)→
- → 飞书审批 → approve 项 → 真 POST /adgroups/add → 返回新建的 adgroup_id 列表
- Returns: 本次新建成功的广告 list[dict],每项含 {account_id, adgroup_id, adgroup_name, wechat_position}
- """
- from datetime import datetime, timezone
- from tools.ad_api import _get
- from tools.im_approval_ad_creation import run_ad_approval_workflow
- if not WHITELIST_ACCOUNTS:
- logger.error("[phase0] WHITELIST_ACCOUNTS 为空,退出")
- return []
- # Task 26:NORMAL + SUSPEND 都算占用唯一性槽位(腾讯文档:删除前历史广告占槽位)
- OCCUPIED_STATUSES = {"AD_STATUS_NORMAL", "AD_STATUS_SUSPEND"}
- pending_ad_records: list[dict] = []
- for account_id in WHITELIST_ACCOUNTS:
- logger.info("=" * 60)
- logger.info("[phase0] 账户 %d 处理开始", account_id)
- # Task 26:查所有非 DELETED 广告,NORMAL+SUSPEND 都算占用槽位
- try:
- r = _get("/adgroups/get", {
- "account_id": account_id, "page": 1, "page_size": 100,
- "fields": ["adgroup_id", "configured_status"],
- })
- all_ads = (r.get("data") or {}).get("list") or []
- normal_n = sum(1 for a in all_ads if a.get("configured_status") == "AD_STATUS_NORMAL")
- suspend_n = sum(1 for a in all_ads if a.get("configured_status") == "AD_STATUS_SUSPEND")
- occupied = [a for a in all_ads if a.get("configured_status") in OCCUPIED_STATUSES]
- logger.info(
- "[phase0] 广告数: 总 %d,NORMAL %d + SUSPEND %d = 占用 %d / 目标 %d",
- len(all_ads), normal_n, suspend_n, len(occupied), target_ads,
- )
- except Exception as e:
- logger.exception("[phase0] account=%d 查广告数失败: %s", account_id, e)
- continue
- to_create = max(0, target_ads - len(occupied))
- if to_create == 0:
- logger.info("[phase0] 占用槽位已满,跳过")
- continue
- # Task 27:fingerprint 预校验 — 反查现存广告的 fp 集合,enumerate 时排除
- existing_fps = _fetch_existing_fingerprints_for_account(account_id)
- # enumerate 候选(模块 A,差异化 wechat_position 有/无)
- try:
- candidates = enumerate_new_ad_candidates(
- account_id, count=to_create,
- existing_fingerprints=existing_fps,
- )
- except Exception as e:
- logger.exception("[phase0] account=%d enumerate 失败: %s", account_id, e)
- continue
- if not candidates:
- logger.warning("[phase0] enumerate 返回 0 条候选,跳过")
- continue
- today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
- age_range = ",".join(
- f"{a.get('min')}-{a.get('max')}" for a in FIXED_TARGETING_AGE
- )
- for c in candidates:
- # 2026-06-10 修复:Phase 0 经过飞书审批后,广告应立即 NORMAL 生效
- # build_ad_request_body 的 default SUSPEND 是模块 A dry-run 时代的旧 SOP
- body = build_ad_request_body(c, configured_status="AD_STATUS_NORMAL")
- rec = {
- "approval_date": today,
- "account_id": c.account_id,
- "audience_tier_label": c.audience_tier_label,
- "adgroup_name": c.adgroup_name,
- "site_set": c.site_set,
- "wechat_position": c.wechat_position,
- "bid_amount_fen": c.bid_amount_fen,
- "age_range": age_range,
- "fingerprint": c.fingerprint,
- "_request_body": body,
- }
- pending_ad_records.append(rec)
- if not pending_ad_records:
- logger.info("[phase0] 无广告需新建,Phase 0 结束")
- return []
- # 飞书审批(如需要)
- data_dir = _HERE / "outputs" / "data"
- if CREATION_APPROVAL_REQUIRED:
- logger.info(
- "[phase0] CREATION_APPROVAL_REQUIRED=True → 飞书审批 %d 条广告候选",
- len(pending_ad_records),
- )
- sheet_meta, actions = run_ad_approval_workflow(
- records=pending_ad_records,
- xlsx_output_dir=data_dir,
- timeout_minutes=CREATION_APPROVAL_TIMEOUT_MINUTES,
- )
- for i, rec in enumerate(pending_ad_records, start=1):
- rec["action"] = actions.get(i, "skip")
- else:
- for rec in pending_ad_records:
- rec["action"] = "approve"
- # POST 真建 approve 项
- created: list[dict] = []
- for rec in pending_ad_records:
- if rec.get("action") != "approve":
- continue
- adgroup_id = post_ad_with_prepared_body(
- account_id=int(rec["account_id"]),
- body=rec["_request_body"],
- )
- if adgroup_id:
- created.append({
- "account_id": rec["account_id"],
- "adgroup_id": adgroup_id,
- "adgroup_name": rec["adgroup_name"],
- "wechat_position": rec.get("wechat_position"),
- })
- logger.info(
- "[phase0] 完成:候选 %d 条,approve %d 条,实际建 %d 条",
- len(pending_ad_records),
- sum(1 for r in pending_ad_records if r.get("action") == "approve"),
- len(created),
- )
- return created
- def phase1_prepare(target_creatives: int = TARGET_CREATIVES_PER_AD) -> list[dict]:
- """Phase 1:扫账户 → 关联点过滤 → 准备 pending records(不 POST 腾讯)。
- Returns:
- pending_records — 每个元素是 prepare_one_creative_for_ad 返回的 dict
- """
- if not WHITELIST_ACCOUNTS:
- logger.error("[phase1] WHITELIST_ACCOUNTS 为空,Phase 1 退出")
- return []
- excluded_ad_ids = load_excluded_ad_ids_from_adjustment()
- logger.info("[phase1] 关联点过滤集合 size=%d", len(excluded_ad_ids))
- pending_records: list[dict] = []
- # 全局 used 集合(2026-06-10 P0-NEW-3 修复:跨账户 landing/material 也互斥)
- # 起因:实测 70073686 跨"泛人群"+"回流330以上人群"两个 crowd_package 池子重复,
- # account 级 set 漏 — 提到 run_once 一份。
- used_material_ids_global: set = set()
- used_landing_ids_global: set = set()
- for account_id in WHITELIST_ACCOUNTS:
- logger.info("=" * 60)
- logger.info("[phase1] 账户 %d 处理开始", account_id)
- try:
- ads = find_ads_needing_creatives(account_id, min_creatives=target_creatives)
- except Exception as e:
- logger.exception("[phase1] account=%d find_ads 失败,跳过:%s", account_id, e)
- continue
- ads_after_filter = [a for a in ads if a["adgroup_id"] not in excluded_ad_ids]
- excluded_count = len(ads) - len(ads_after_filter)
- if excluded_count:
- logger.info(
- "[phase1] account=%d 关联点过滤排除 %d 条",
- account_id, excluded_count,
- )
- if not ads_after_filter:
- logger.info("[phase1] account=%d 无广告需补创意", account_id)
- continue
- for ad in ads_after_filter:
- adgroup_id = ad["adgroup_id"]
- already_have = ad["creative_count"]
- to_add = max(0, target_creatives - already_have)
- logger.info(
- "[phase1] adgroup=%d(have=%d need=%d)",
- adgroup_id, already_have, to_add,
- )
- for _ in range(to_add):
- try:
- rec = prepare_one_creative_for_ad(
- account_id, adgroup_id,
- excluded_material_ids=used_material_ids_global,
- excluded_landing_ids=used_landing_ids_global,
- )
- except Exception as e:
- logger.exception(
- "[phase1] adgroup=%d 准备失败:%s", adgroup_id, e,
- )
- rec = None
- if rec:
- pending_records.append(rec)
- used_material_ids_global.add(rec["_material_id"])
- used_landing_ids_global.add(rec["landing_video_id"])
- else:
- # 2026-06-10 用户要求:单条 prepare 失败 → continue 不 break
- # 同广告剩余 to_add 创意还能继续试,不被一次失败拖累
- logger.info(
- "[phase1] adgroup=%d 本条创意 prepare 失败,试下一条",
- adgroup_id,
- )
- logger.info("=" * 60)
- logger.info("[phase1] 准备完成,共 %d 条 pending records", len(pending_records))
- return pending_records
- def _write_pending_records(records: list[dict], output_dir: Path) -> Path:
- """落 Phase 1 产物 JSON(供 Phase 3 独立 apply 用 / 追溯)。"""
- output_dir.mkdir(parents=True, exist_ok=True)
- date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
- ts = datetime.now(timezone.utc).strftime("%H%M%S")
- out_path = output_dir / f"creation_pending_{date_str}_{ts}.json"
- with open(out_path, "w", encoding="utf-8") as f:
- json.dump(records, f, ensure_ascii=False, indent=2)
- return out_path
- def phase2_approval(records: list[dict], xlsx_output_dir: Path) -> dict[int, str]:
- """Phase 2:飞书审批(若需要)。
- Returns: actions {row_idx: action}, 1-based row_idx
- """
- from tools.im_approval_creation import run_approval_workflow
- sheet_meta, actions = run_approval_workflow(
- records=records,
- xlsx_output_dir=xlsx_output_dir,
- timeout_minutes=CREATION_APPROVAL_TIMEOUT_MINUTES,
- )
- logger.info(
- "[phase2] 审批完成 sheet_url=%s actions=%d/%d",
- sheet_meta.get("url"), len(actions), len(records),
- )
- return actions
- def run_once() -> dict:
- """完整主循环:Phase 0 → Phase 1 → Phase 2(若开关 True)→ Phase 3。"""
- run_started = datetime.now(timezone.utc).isoformat()
- # Phase 0:模块 A 建广告(满足每账户 ADS_PER_ACCOUNT 条)
- logger.info("=" * 60)
- logger.info("[main] Phase 0 启动 — 模块 A 检查 + 建广告")
- created_ads = phase0_create_ads()
- logger.info("[main] Phase 0 完成:本轮新建广告 %d 条", len(created_ads))
- # Phase 1:模块 B 给所有广告(新+旧)补创意
- logger.info("=" * 60)
- logger.info("[main] Phase 1 启动 — 模块 B 给广告补创意")
- pending_records = phase1_prepare()
- if not pending_records:
- logger.info("[main] Phase 1 无 pending records,主循环退出")
- return {
- "run_started": run_started,
- "run_finished": datetime.now(timezone.utc).isoformat(),
- "approval_required": CREATION_APPROVAL_REQUIRED,
- "phase0_created_ads": len(created_ads),
- "total": {"phase1_prepared": 0},
- }
- data_dir = _HERE / "outputs" / "data"
- pending_path = _write_pending_records(pending_records, data_dir)
- logger.info("[main] pending records 已写入: %s", pending_path)
- # Phase 2:审批(或 skip)
- if CREATION_APPROVAL_REQUIRED:
- logger.info("[main] CREATION_APPROVAL_REQUIRED=True → 进 Phase 2 飞书审批")
- actions = phase2_approval(pending_records, data_dir)
- for i, rec in enumerate(pending_records, start=1):
- rec["action"] = actions.get(i, "skip")
- else:
- logger.info("[main] CREATION_APPROVAL_REQUIRED=False → 全 records approve")
- for rec in pending_records:
- rec["action"] = "approve"
- # Phase 3:执行
- logger.info("=" * 60)
- logger.info("[main] Phase 3 执行启动")
- summary = apply_pending_records(pending_records)
- summary["approval_required"] = CREATION_APPROVAL_REQUIRED
- summary["pending_records_path"] = str(pending_path)
- summary_path = write_apply_summary(summary, data_dir)
- logger.info("[main] summary 已写入: %s", summary_path)
- # 发执行汇报
- _send_apply_summary_to_feishu(summary)
- summary["run_started"] = run_started
- summary["run_finished"] = datetime.now(timezone.utc).isoformat()
- summary["phase0_created_ads"] = len(created_ads)
- summary["phase0_ads"] = created_ads
- return summary
- def main() -> int:
- _setup_logging()
- logger.info("=" * 60)
- logger.info("模块 B 创意搭建子系统 — 主循环启动")
- logger.info("TARGET_CREATIVES_PER_AD = %d", TARGET_CREATIVES_PER_AD)
- logger.info("CREATION_APPROVAL_REQUIRED = %s", CREATION_APPROVAL_REQUIRED)
- logger.info("WHITELIST_ACCOUNTS = %s", WHITELIST_ACCOUNTS)
- logger.info("=" * 60)
- summary = run_once()
- t = summary.get("total") or {}
- logger.info("=" * 60)
- logger.info("[main] 主循环结束")
- if "phase1_prepared" in t:
- logger.info(" phase1_prepared = %d", t["phase1_prepared"])
- else:
- logger.info(" records = %d", t.get("records", 0))
- logger.info(" approved = %d", t.get("approved", 0))
- logger.info(" posted_ok = %d", t.get("posted_ok", 0))
- logger.info(" posted_failed = %d", t.get("posted_failed", 0))
- logger.info("=" * 60)
- return 0 if t.get("posted_failed", 0) == 0 else 1
- if __name__ == "__main__":
- sys.exit(main())
|