""" 数据拉取脚本 — auto_put_ad_mini V3 V3 职责: - 拉取 30 天创意级别数据(增量,已有 CSV 的日期跳过) - 拉取 30 天广告状态快照 - 输出到 outputs/raw/ 和 outputs/ad_status/ 用法: # 拉取最近 30 天 .venv/bin/python3 examples/auto_put_ad_mini/fetch_data.py --days 30 # 拉取单日(验证 SQL) .venv/bin/python3 examples/auto_put_ad_mini/fetch_data.py --bizdate 20260409 # 拉取指定天数 .venv/bin/python3 examples/auto_put_ad_mini/fetch_data.py --days 7 """ import argparse import asyncio import logging import sys from datetime import datetime, timedelta from pathlib import Path # 把项目根目录加入 path ROOT = Path(__file__).resolve().parent.parent.parent sys.path.insert(0, str(ROOT)) sys.path.insert(0, str(ROOT / "examples" / "auto_put_ad_mini")) from tools.data_query import _fetch_creative_data, _fetch_ad_status, _parse_bizdate, _merge_single_day logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) _MINI_DIR = Path(__file__).resolve().parent _RAW_DIR = _MINI_DIR / "outputs" / "raw" _AD_STATUS_DIR = _MINI_DIR / "outputs" / "ad_status" def fetch_single_day(bizdate: str) -> bool: """ 拉取单日数据。 Returns: True: 成功拉取或已存在 False: 拉取失败 """ biz, biz_dash = _parse_bizdate(bizdate) # 检查是否已存在 creative_csv = _RAW_DIR / f"creative_{biz}.csv" ad_status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv" if creative_csv.exists() and ad_status_csv.exists(): logger.info("✓ %s 数据已存在,跳过", biz) return True logger.info("→ 开始拉取 %s", biz) # 拉取创意数据 if not creative_csv.exists(): df_creative = _fetch_creative_data(biz) if df_creative is None: logger.error("✗ %s 创意数据拉取失败", biz) return False _RAW_DIR.mkdir(parents=True, exist_ok=True) df_creative.to_csv(creative_csv, index=False, encoding="utf-8-sig") logger.info(" ✓ 创意数据: %d 行 → %s", len(df_creative), creative_csv.name) else: logger.info(" ✓ 创意数据已存在") # 拉取广告状态 if not ad_status_csv.exists(): df_status = _fetch_ad_status(biz) if df_status is None: logger.error("✗ %s 广告状态拉取失败", biz) return False _AD_STATUS_DIR.mkdir(parents=True, exist_ok=True) df_status.to_csv(ad_status_csv, index=False, encoding="utf-8-sig") logger.info(" ✓ 广告状态: %d 行 → %s", len(df_status), ad_status_csv.name) else: logger.info(" ✓ 广告状态已存在") # 合并创意数据与广告状态(可选,自动执行) df_merged = _merge_single_day(biz) if df_merged is not None: logger.info(" ✓ 合并完成: %d 行, %d 列", len(df_merged), len(df_merged.columns)) return True def fetch_multiple_days(days: int, end_date: str = "yesterday") -> None: """ 拉取多日数据(增量)。 Args: days: 拉取天数 end_date: 结束日期(yesterday 或 YYYYMMDD) """ if end_date == "yesterday": end_dt = datetime.now() - timedelta(days=1) else: end_dt = datetime.strptime(end_date, "%Y%m%d") logger.info("=" * 60) logger.info("开始拉取 %d 天数据(结束日期: %s)", days, end_dt.strftime("%Y%m%d")) logger.info("=" * 60) success_count = 0 fail_count = 0 skip_count = 0 for i in range(days): date_dt = end_dt - timedelta(days=i) bizdate = date_dt.strftime("%Y%m%d") creative_csv = _RAW_DIR / f"creative_{bizdate}.csv" ad_status_csv = _AD_STATUS_DIR / f"ad_status_{bizdate}.csv" if creative_csv.exists() and ad_status_csv.exists(): skip_count += 1 continue if fetch_single_day(bizdate): success_count += 1 else: fail_count += 1 logger.info("=" * 60) logger.info("拉取完成: 成功 %d, 失败 %d, 跳过 %d", success_count, fail_count, skip_count) logger.info("=" * 60) # 列出已有文件 creative_files = sorted(_RAW_DIR.glob("creative_*.csv")) status_files = sorted(_AD_STATUS_DIR.glob("ad_status_*.csv")) logger.info("创意数据文件 (%d 个):", len(creative_files)) for f in creative_files[-5:]: # 只显示最近 5 个 size_kb = f.stat().st_size / 1024 logger.info(" %s (%.1f KB)", f.name, size_kb) logger.info("广告状态文件 (%d 个):", len(status_files)) for f in status_files[-5:]: size_kb = f.stat().st_size / 1024 logger.info(" %s (%.1f KB)", f.name, size_kb) if __name__ == "__main__": parser = argparse.ArgumentParser(description="拉取创意级别数据(V3)") parser.add_argument("--bizdate", default="", help="单日拉取: YYYYMMDD") parser.add_argument("--days", type=int, default=0, help="多日拉取: 天数") parser.add_argument("--end_date", default="yesterday", help="结束日期: yesterday 或 YYYYMMDD") args = parser.parse_args() if args.bizdate: # 单日模式 success = fetch_single_day(args.bizdate) sys.exit(0 if success else 1) elif args.days > 0: # 多日模式 fetch_multiple_days(args.days, args.end_date) else: # 默认:拉取 30 天 fetch_multiple_days(30, "yesterday")