""" 同步腾讯广告"删除"状态到本地 ad_status CSV。 背景: outputs/ad_status/ad_status_YYYYMMDD.csv 来自 ODPS 镜像表 `loghubods.ad_put_tencent_ad`,只能可靠反映 SUSPEND,无法感知"腾讯侧被删除"。 本脚本每天调用 `/v3.0/adgroups/get` 拉全量广告清单,做差集: 本地 CSV 有、但 API 未返回的 → 标记 is_deleted=True,写回原文件。 用法: .venv/bin/python3 sync_ad_status.py [--date YYYYMMDD] [--dry-run] [--page-size 100] [--force] 退出码: 0 = 成功(含 partial:部分账号成功) 2 = 全部账号 API 调用失败 """ import argparse import logging import sys import time from datetime import datetime, timedelta from pathlib import Path from typing import Set import pandas as pd _MINI_DIR = Path(__file__).resolve().parent _AGENT_ROOT = _MINI_DIR.parent.parent # /Users/.../Agent # 与 execute_once.py 保持一致:把 Agent 根目录加进来以便 `from agent....` 可用 if str(_AGENT_ROOT) not in sys.path: sys.path.insert(0, str(_AGENT_ROOT)) if str(_MINI_DIR) not in sys.path: sys.path.insert(0, str(_MINI_DIR)) # 顺序敏感:必须先 import `tools.ad_api`,因为 `config` 间接加载 agent 框架, # 会把 `Agent/im-client/` 加入 sys.path,里面的 im-client/tools.py 会覆盖本地 # auto_put_ad_mini/tools/ 包的绑定。先 import 本地 tools 包可锁定正确目标。 from tools.ad_api import _check, _get # 复用底层 HTTP + 公共参数构造 from config import AD_STATUS_DIR logger = logging.getLogger("sync_ad_status") def _setup_logging() -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", stream=sys.stdout, ) def _fetch_api_ad_ids(account_id: int, page_size: int) -> Set[int]: """分页拉取某账号下全部广告 ID。""" ad_ids: Set[int] = set() page = 1 while True: resp = _get( "/adgroups/get", { "account_id": account_id, "page": page, "page_size": page_size, }, ) data = _check(resp, "sync_ad_status") items = data.get("list", []) or [] for it in items: raw = it.get("adgroup_id") if raw is None: continue try: ad_ids.add(int(raw)) except (TypeError, ValueError): logger.warning("无法解析 adgroup_id: %r", raw) page_info = data.get("page_info", {}) or {} total_page = int(page_info.get("total_page", 1) or 1) logger.info( "[account=%s] page %d/%d (items=%d, 累计 %d)", account_id, page, total_page, len(items), len(ad_ids), ) if page >= total_page or not items: break page += 1 time.sleep(0.15) # 避开 QPS=10 return ad_ids def main() -> int: _setup_logging() parser = argparse.ArgumentParser(description="同步腾讯广告删除状态到本地 ad_status CSV") default_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") parser.add_argument("--date", default=default_date, help="目标 bizdate,默认=昨天") parser.add_argument("--dry-run", action="store_true", help="只打印差集,不回写 CSV") parser.add_argument("--page-size", type=int, default=100, help="分页大小,默认100(API上限)") parser.add_argument("--force", action="store_true", help="忽略当日已同步短路判断,强制重跑") args = parser.parse_args() csv_path = AD_STATUS_DIR / f"ad_status_{args.date}.csv" if not csv_path.exists(): logger.error( "ad_status CSV 不存在: %s(请先运行 fetch_data.py 拉取当日数据)", csv_path, ) return 2 df = pd.read_csv(csv_path, encoding="utf-8-sig") df["ad_id"] = pd.to_numeric(df["ad_id"], errors="coerce").astype("Int64") df["account_id"] = pd.to_numeric(df["account_id"], errors="coerce").astype("Int64") # 防重复调用短路:CSV 已含 is_deleted 且未 --force → 直接退出 if "is_deleted" in df.columns and not args.force: n_marked = int(df["is_deleted"].fillna(False).astype(bool).sum()) logger.info( "当日已同步过(%s 已含 is_deleted 列,共标记 %d 个删除);如需重跑,加 --force", csv_path.name, n_marked, ) return 0 account_ids = [int(a) for a in df["account_id"].dropna().unique().tolist()] if not account_ids: logger.error("CSV 中未发现任何 account_id,中止") return 2 logger.info("开始同步 date=%s, 账号数=%d, 总行数=%d", args.date, len(account_ids), len(df)) all_deleted_ids: Set[int] = set() success_accounts: list = [] failed_accounts: list = [] for acct in account_ids: local_ids = set( int(x) for x in df.loc[df["account_id"] == acct, "ad_id"].dropna().tolist() ) try: api_ids = _fetch_api_ad_ids(acct, args.page_size) except Exception as e: logger.error("[account=%s] 拉取 API 失败:%s(跳过该账号)", acct, e) failed_accounts.append(acct) continue # 注:API 层面若 code=0 且返回 items=[](不抛异常),说明该账号下所有广告 # 已被人工全部删除(经业务确认的常见场景),按"全部标记删除"处理。 # 只有真正的 HTTP/API 异常(见上面 except 分支)才会跳过账号。 deleted = local_ids - api_ids ratio = len(deleted) / max(len(local_ids), 1) # 护栏:删除比例 > 50% 打 warning 但放行 if ratio > 0.5: logger.warning( "[account=%s] 删除比例 %.1f%% 过高(%d/%d),请人工确认是否正常", acct, ratio * 100, len(deleted), len(local_ids), ) logger.info( "[account=%s] 本地 %d / API %d / 新标记删除 %d", acct, len(local_ids), len(api_ids), len(deleted), ) all_deleted_ids.update(deleted) success_accounts.append(acct) # 汇总 logger.info( "=== 汇总 === 账号 成功=%d / 失败=%d,总计标记删除 %d 个广告", len(success_accounts), len(failed_accounts), len(all_deleted_ids), ) if args.dry_run: logger.info("[dry-run] 不回写 CSV。示例 ad_id(前 20 个):%s", sorted(all_deleted_ids)[:20]) # 全部失败退出码 2;否则 0 return 2 if success_accounts == [] else 0 if not success_accounts: logger.error("所有账号均拉取失败,不回写 CSV") return 2 # 回写 is_deleted 列:对成功账号的本地 ad_id 全部重算 False/True; # 对失败账号的行保持原值(如果有),没有原值则先置 False if "is_deleted" not in df.columns: df["is_deleted"] = False else: df["is_deleted"] = df["is_deleted"].fillna(False).astype(bool) success_mask = df["account_id"].isin(success_accounts) # 成功账号:先全部置 False,再对在删除集合里的置 True df.loc[success_mask, "is_deleted"] = False deleted_mask = success_mask & df["ad_id"].isin(all_deleted_ids) df.loc[deleted_mask, "is_deleted"] = True # 同步将 ad_status 改为 AD_STATUS_DELETED,便于下游直接按 ad_status 识别 # (注意:SUSPEND 也在 deleted 集合里时会被覆盖为 DELETED;对下游过滤无影响, # 因为 SUSPEND 和 DELETED 都会被排除在决策表外) if "ad_status" in df.columns: df.loc[deleted_mask, "ad_status"] = "AD_STATUS_DELETED" df.to_csv(csv_path, index=False, encoding="utf-8-sig") logger.info( "已写回 %s(is_deleted=True 共 %d 行 / 总 %d 行,ad_status 同步改为 AD_STATUS_DELETED)", csv_path, int(df["is_deleted"].sum()), len(df), ) return 0 if __name__ == "__main__": sys.exit(main())