sync_ad_status.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. """
  2. 同步腾讯广告"删除"状态到本地 ad_status CSV。
  3. 背景:
  4. outputs/ad_status/ad_status_YYYYMMDD.csv 来自 ODPS 镜像表
  5. `loghubods.ad_put_tencent_ad`,只能可靠反映 SUSPEND,无法感知"腾讯侧被删除"。
  6. 本脚本每天调用 `/v3.0/adgroups/get` 拉全量广告清单,做差集:
  7. 本地 CSV 有、但 API 未返回的 → 标记 is_deleted=True,写回原文件。
  8. 用法:
  9. .venv/bin/python3 sync_ad_status.py [--date YYYYMMDD] [--dry-run] [--page-size 100] [--force]
  10. 退出码:
  11. 0 = 成功(含 partial:部分账号成功)
  12. 2 = 全部账号 API 调用失败
  13. """
  14. import argparse
  15. import logging
  16. import sys
  17. import time
  18. from datetime import datetime, timedelta
  19. from pathlib import Path
  20. from typing import Set
  21. import pandas as pd
  22. _MINI_DIR = Path(__file__).resolve().parent
  23. _AGENT_ROOT = _MINI_DIR.parent.parent # /Users/.../Agent
  24. # 与 execute_once.py 保持一致:把 Agent 根目录加进来以便 `from agent....` 可用
  25. if str(_AGENT_ROOT) not in sys.path:
  26. sys.path.insert(0, str(_AGENT_ROOT))
  27. if str(_MINI_DIR) not in sys.path:
  28. sys.path.insert(0, str(_MINI_DIR))
  29. # 顺序敏感:必须先 import `tools.ad_api`,因为 `config` 间接加载 agent 框架,
  30. # 会把 `Agent/im-client/` 加入 sys.path,里面的 im-client/tools.py 会覆盖本地
  31. # auto_put_ad_mini/tools/ 包的绑定。先 import 本地 tools 包可锁定正确目标。
  32. from tools.ad_api import _check, _get # 复用底层 HTTP + 公共参数构造
  33. from config import AD_STATUS_DIR
  34. logger = logging.getLogger("sync_ad_status")
  35. def _setup_logging() -> None:
  36. logging.basicConfig(
  37. level=logging.INFO,
  38. format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
  39. stream=sys.stdout,
  40. )
  41. def _fetch_api_ad_ids(account_id: int, page_size: int) -> Set[int]:
  42. """分页拉取某账号下全部广告 ID。"""
  43. ad_ids: Set[int] = set()
  44. page = 1
  45. while True:
  46. resp = _get(
  47. "/adgroups/get",
  48. {
  49. "account_id": account_id,
  50. "page": page,
  51. "page_size": page_size,
  52. },
  53. )
  54. data = _check(resp, "sync_ad_status")
  55. items = data.get("list", []) or []
  56. for it in items:
  57. raw = it.get("adgroup_id")
  58. if raw is None:
  59. continue
  60. try:
  61. ad_ids.add(int(raw))
  62. except (TypeError, ValueError):
  63. logger.warning("无法解析 adgroup_id: %r", raw)
  64. page_info = data.get("page_info", {}) or {}
  65. total_page = int(page_info.get("total_page", 1) or 1)
  66. logger.info(
  67. "[account=%s] page %d/%d (items=%d, 累计 %d)",
  68. account_id, page, total_page, len(items), len(ad_ids),
  69. )
  70. if page >= total_page or not items:
  71. break
  72. page += 1
  73. time.sleep(0.15) # 避开 QPS=10
  74. return ad_ids
  75. def main() -> int:
  76. _setup_logging()
  77. parser = argparse.ArgumentParser(description="同步腾讯广告删除状态到本地 ad_status CSV")
  78. default_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  79. parser.add_argument("--date", default=default_date, help="目标 bizdate,默认=昨天")
  80. parser.add_argument("--dry-run", action="store_true", help="只打印差集,不回写 CSV")
  81. parser.add_argument("--page-size", type=int, default=100, help="分页大小,默认100(API上限)")
  82. parser.add_argument("--force", action="store_true", help="忽略当日已同步短路判断,强制重跑")
  83. args = parser.parse_args()
  84. csv_path = AD_STATUS_DIR / f"ad_status_{args.date}.csv"
  85. if not csv_path.exists():
  86. logger.error(
  87. "ad_status CSV 不存在: %s(请先运行 fetch_data.py 拉取当日数据)",
  88. csv_path,
  89. )
  90. return 2
  91. df = pd.read_csv(csv_path, encoding="utf-8-sig")
  92. df["ad_id"] = pd.to_numeric(df["ad_id"], errors="coerce").astype("Int64")
  93. df["account_id"] = pd.to_numeric(df["account_id"], errors="coerce").astype("Int64")
  94. # 防重复调用短路:CSV 已含 is_deleted 且未 --force → 直接退出
  95. if "is_deleted" in df.columns and not args.force:
  96. n_marked = int(df["is_deleted"].fillna(False).astype(bool).sum())
  97. logger.info(
  98. "当日已同步过(%s 已含 is_deleted 列,共标记 %d 个删除);如需重跑,加 --force",
  99. csv_path.name, n_marked,
  100. )
  101. return 0
  102. account_ids = [int(a) for a in df["account_id"].dropna().unique().tolist()]
  103. if not account_ids:
  104. logger.error("CSV 中未发现任何 account_id,中止")
  105. return 2
  106. logger.info("开始同步 date=%s, 账号数=%d, 总行数=%d", args.date, len(account_ids), len(df))
  107. all_deleted_ids: Set[int] = set()
  108. success_accounts: list = []
  109. failed_accounts: list = []
  110. for acct in account_ids:
  111. local_ids = set(
  112. int(x) for x in df.loc[df["account_id"] == acct, "ad_id"].dropna().tolist()
  113. )
  114. try:
  115. api_ids = _fetch_api_ad_ids(acct, args.page_size)
  116. except Exception as e:
  117. logger.error("[account=%s] 拉取 API 失败:%s(跳过该账号)", acct, e)
  118. failed_accounts.append(acct)
  119. continue
  120. # 注:API 层面若 code=0 且返回 items=[](不抛异常),说明该账号下所有广告
  121. # 已被人工全部删除(经业务确认的常见场景),按"全部标记删除"处理。
  122. # 只有真正的 HTTP/API 异常(见上面 except 分支)才会跳过账号。
  123. deleted = local_ids - api_ids
  124. ratio = len(deleted) / max(len(local_ids), 1)
  125. # 护栏:删除比例 > 50% 打 warning 但放行
  126. if ratio > 0.5:
  127. logger.warning(
  128. "[account=%s] 删除比例 %.1f%% 过高(%d/%d),请人工确认是否正常",
  129. acct, ratio * 100, len(deleted), len(local_ids),
  130. )
  131. logger.info(
  132. "[account=%s] 本地 %d / API %d / 新标记删除 %d",
  133. acct, len(local_ids), len(api_ids), len(deleted),
  134. )
  135. all_deleted_ids.update(deleted)
  136. success_accounts.append(acct)
  137. # 汇总
  138. logger.info(
  139. "=== 汇总 === 账号 成功=%d / 失败=%d,总计标记删除 %d 个广告",
  140. len(success_accounts), len(failed_accounts), len(all_deleted_ids),
  141. )
  142. if args.dry_run:
  143. logger.info("[dry-run] 不回写 CSV。示例 ad_id(前 20 个):%s",
  144. sorted(all_deleted_ids)[:20])
  145. # 全部失败退出码 2;否则 0
  146. return 2 if success_accounts == [] else 0
  147. if not success_accounts:
  148. logger.error("所有账号均拉取失败,不回写 CSV")
  149. return 2
  150. # 回写 is_deleted 列:对成功账号的本地 ad_id 全部重算 False/True;
  151. # 对失败账号的行保持原值(如果有),没有原值则先置 False
  152. if "is_deleted" not in df.columns:
  153. df["is_deleted"] = False
  154. else:
  155. df["is_deleted"] = df["is_deleted"].fillna(False).astype(bool)
  156. success_mask = df["account_id"].isin(success_accounts)
  157. # 成功账号:先全部置 False,再对在删除集合里的置 True
  158. df.loc[success_mask, "is_deleted"] = False
  159. deleted_mask = success_mask & df["ad_id"].isin(all_deleted_ids)
  160. df.loc[deleted_mask, "is_deleted"] = True
  161. # 同步将 ad_status 改为 AD_STATUS_DELETED,便于下游直接按 ad_status 识别
  162. # (注意:SUSPEND 也在 deleted 集合里时会被覆盖为 DELETED;对下游过滤无影响,
  163. # 因为 SUSPEND 和 DELETED 都会被排除在决策表外)
  164. if "ad_status" in df.columns:
  165. df.loc[deleted_mask, "ad_status"] = "AD_STATUS_DELETED"
  166. df.to_csv(csv_path, index=False, encoding="utf-8-sig")
  167. logger.info(
  168. "已写回 %s(is_deleted=True 共 %d 行 / 总 %d 行,ad_status 同步改为 AD_STATUS_DELETED)",
  169. csv_path, int(df["is_deleted"].sum()), len(df),
  170. )
  171. return 0
  172. if __name__ == "__main__":
  173. sys.exit(main())