| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- """
- 同步腾讯广告"删除"状态到本地 ad_status CSV。
- 背景:
- outputs/ad_status/ad_status_YYYYMMDD.csv 来自 ODPS 镜像表
- `loghubods.ad_put_tencent_ad`,只能可靠反映 SUSPEND,无法感知"腾讯侧被删除"。
- 本脚本每天调用 `/v3.0/adgroups/get` 拉全量广告清单,做差集:
- 本地 CSV 有、但 API 未返回的 → 标记 is_deleted=True,写回原文件。
- 用法:
- .venv/bin/python3 sync_ad_status.py [--date YYYYMMDD] [--dry-run] [--page-size 100] [--force]
- 退出码:
- 0 = 成功(含 partial:部分账号成功)
- 2 = 全部账号 API 调用失败
- """
- import argparse
- import logging
- import sys
- import time
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Set
- import pandas as pd
- _MINI_DIR = Path(__file__).resolve().parent
- _AGENT_ROOT = _MINI_DIR.parent.parent # /Users/.../Agent
- # 与 execute_once.py 保持一致:把 Agent 根目录加进来以便 `from agent....` 可用
- if str(_AGENT_ROOT) not in sys.path:
- sys.path.insert(0, str(_AGENT_ROOT))
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- # 顺序敏感:必须先 import `tools.ad_api`,因为 `config` 间接加载 agent 框架,
- # 会把 `Agent/im-client/` 加入 sys.path,里面的 im-client/tools.py 会覆盖本地
- # auto_put_ad_mini/tools/ 包的绑定。先 import 本地 tools 包可锁定正确目标。
- from tools.ad_api import _check, _get # 复用底层 HTTP + 公共参数构造
- from config import AD_STATUS_DIR
- logger = logging.getLogger("sync_ad_status")
- def _setup_logging() -> None:
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
- stream=sys.stdout,
- )
- def _fetch_api_ad_ids(account_id: int, page_size: int) -> Set[int]:
- """分页拉取某账号下全部广告 ID。"""
- ad_ids: Set[int] = set()
- page = 1
- while True:
- resp = _get(
- "/adgroups/get",
- {
- "account_id": account_id,
- "page": page,
- "page_size": page_size,
- },
- )
- data = _check(resp, "sync_ad_status")
- items = data.get("list", []) or []
- for it in items:
- raw = it.get("adgroup_id")
- if raw is None:
- continue
- try:
- ad_ids.add(int(raw))
- except (TypeError, ValueError):
- logger.warning("无法解析 adgroup_id: %r", raw)
- page_info = data.get("page_info", {}) or {}
- total_page = int(page_info.get("total_page", 1) or 1)
- logger.info(
- "[account=%s] page %d/%d (items=%d, 累计 %d)",
- account_id, page, total_page, len(items), len(ad_ids),
- )
- if page >= total_page or not items:
- break
- page += 1
- time.sleep(0.15) # 避开 QPS=10
- return ad_ids
- def main() -> int:
- _setup_logging()
- parser = argparse.ArgumentParser(description="同步腾讯广告删除状态到本地 ad_status CSV")
- default_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- parser.add_argument("--date", default=default_date, help="目标 bizdate,默认=昨天")
- parser.add_argument("--dry-run", action="store_true", help="只打印差集,不回写 CSV")
- parser.add_argument("--page-size", type=int, default=100, help="分页大小,默认100(API上限)")
- parser.add_argument("--force", action="store_true", help="忽略当日已同步短路判断,强制重跑")
- args = parser.parse_args()
- csv_path = AD_STATUS_DIR / f"ad_status_{args.date}.csv"
- if not csv_path.exists():
- logger.error(
- "ad_status CSV 不存在: %s(请先运行 fetch_data.py 拉取当日数据)",
- csv_path,
- )
- return 2
- df = pd.read_csv(csv_path, encoding="utf-8-sig")
- df["ad_id"] = pd.to_numeric(df["ad_id"], errors="coerce").astype("Int64")
- df["account_id"] = pd.to_numeric(df["account_id"], errors="coerce").astype("Int64")
- # 防重复调用短路:CSV 已含 is_deleted 且未 --force → 直接退出
- if "is_deleted" in df.columns and not args.force:
- n_marked = int(df["is_deleted"].fillna(False).astype(bool).sum())
- logger.info(
- "当日已同步过(%s 已含 is_deleted 列,共标记 %d 个删除);如需重跑,加 --force",
- csv_path.name, n_marked,
- )
- return 0
- account_ids = [int(a) for a in df["account_id"].dropna().unique().tolist()]
- if not account_ids:
- logger.error("CSV 中未发现任何 account_id,中止")
- return 2
- logger.info("开始同步 date=%s, 账号数=%d, 总行数=%d", args.date, len(account_ids), len(df))
- all_deleted_ids: Set[int] = set()
- success_accounts: list = []
- failed_accounts: list = []
- for acct in account_ids:
- local_ids = set(
- int(x) for x in df.loc[df["account_id"] == acct, "ad_id"].dropna().tolist()
- )
- try:
- api_ids = _fetch_api_ad_ids(acct, args.page_size)
- except Exception as e:
- logger.error("[account=%s] 拉取 API 失败:%s(跳过该账号)", acct, e)
- failed_accounts.append(acct)
- continue
- # 注:API 层面若 code=0 且返回 items=[](不抛异常),说明该账号下所有广告
- # 已被人工全部删除(经业务确认的常见场景),按"全部标记删除"处理。
- # 只有真正的 HTTP/API 异常(见上面 except 分支)才会跳过账号。
- deleted = local_ids - api_ids
- ratio = len(deleted) / max(len(local_ids), 1)
- # 护栏:删除比例 > 50% 打 warning 但放行
- if ratio > 0.5:
- logger.warning(
- "[account=%s] 删除比例 %.1f%% 过高(%d/%d),请人工确认是否正常",
- acct, ratio * 100, len(deleted), len(local_ids),
- )
- logger.info(
- "[account=%s] 本地 %d / API %d / 新标记删除 %d",
- acct, len(local_ids), len(api_ids), len(deleted),
- )
- all_deleted_ids.update(deleted)
- success_accounts.append(acct)
- # 汇总
- logger.info(
- "=== 汇总 === 账号 成功=%d / 失败=%d,总计标记删除 %d 个广告",
- len(success_accounts), len(failed_accounts), len(all_deleted_ids),
- )
- if args.dry_run:
- logger.info("[dry-run] 不回写 CSV。示例 ad_id(前 20 个):%s",
- sorted(all_deleted_ids)[:20])
- # 全部失败退出码 2;否则 0
- return 2 if success_accounts == [] else 0
- if not success_accounts:
- logger.error("所有账号均拉取失败,不回写 CSV")
- return 2
- # 回写 is_deleted 列:对成功账号的本地 ad_id 全部重算 False/True;
- # 对失败账号的行保持原值(如果有),没有原值则先置 False
- if "is_deleted" not in df.columns:
- df["is_deleted"] = False
- else:
- df["is_deleted"] = df["is_deleted"].fillna(False).astype(bool)
- success_mask = df["account_id"].isin(success_accounts)
- # 成功账号:先全部置 False,再对在删除集合里的置 True
- df.loc[success_mask, "is_deleted"] = False
- deleted_mask = success_mask & df["ad_id"].isin(all_deleted_ids)
- df.loc[deleted_mask, "is_deleted"] = True
- # 同步将 ad_status 改为 AD_STATUS_DELETED,便于下游直接按 ad_status 识别
- # (注意:SUSPEND 也在 deleted 集合里时会被覆盖为 DELETED;对下游过滤无影响,
- # 因为 SUSPEND 和 DELETED 都会被排除在决策表外)
- if "ad_status" in df.columns:
- df.loc[deleted_mask, "ad_status"] = "AD_STATUS_DELETED"
- df.to_csv(csv_path, index=False, encoding="utf-8-sig")
- logger.info(
- "已写回 %s(is_deleted=True 共 %d 行 / 总 %d 行,ad_status 同步改为 AD_STATUS_DELETED)",
- csv_path, int(df["is_deleted"].sum()), len(df),
- )
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|