fetch_data.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. """
  2. 数据拉取脚本 — auto_put_ad_mini V3
  3. V3 职责:
  4. - 拉取 30 天创意级别数据(增量,已有 CSV 的日期跳过)
  5. - 拉取 30 天广告状态快照
  6. - 输出到 outputs/raw/ 和 outputs/ad_status/
  7. 用法:
  8. # 拉取最近 30 天
  9. .venv/bin/python3 examples/auto_put_ad_mini/fetch_data.py --days 30
  10. # 拉取单日(验证 SQL)
  11. .venv/bin/python3 examples/auto_put_ad_mini/fetch_data.py --bizdate 20260409
  12. # 拉取指定天数
  13. .venv/bin/python3 examples/auto_put_ad_mini/fetch_data.py --days 7
  14. """
  15. import argparse
  16. import asyncio
  17. import logging
  18. import sys
  19. from datetime import datetime, timedelta
  20. from pathlib import Path
  21. # 把项目根目录加入 path
  22. ROOT = Path(__file__).resolve().parent.parent.parent
  23. sys.path.insert(0, str(ROOT))
  24. sys.path.insert(0, str(ROOT / "examples" / "auto_put_ad_mini"))
  25. from tools.data_query import _fetch_creative_data, _fetch_ad_status, _parse_bizdate, _merge_single_day
  26. logging.basicConfig(
  27. level=logging.INFO,
  28. format="%(asctime)s [%(levelname)s] %(message)s",
  29. datefmt="%H:%M:%S",
  30. )
  31. logger = logging.getLogger(__name__)
  32. _MINI_DIR = Path(__file__).resolve().parent
  33. _RAW_DIR = _MINI_DIR / "outputs" / "raw"
  34. _AD_STATUS_DIR = _MINI_DIR / "outputs" / "ad_status"
  35. def fetch_single_day(bizdate: str) -> bool:
  36. """
  37. 拉取单日数据。
  38. Returns:
  39. True: 成功拉取或已存在
  40. False: 拉取失败
  41. """
  42. biz, biz_dash = _parse_bizdate(bizdate)
  43. # 检查是否已存在
  44. creative_csv = _RAW_DIR / f"creative_{biz}.csv"
  45. ad_status_csv = _AD_STATUS_DIR / f"ad_status_{biz}.csv"
  46. if creative_csv.exists() and ad_status_csv.exists():
  47. logger.info("✓ %s 数据已存在,跳过", biz)
  48. return True
  49. logger.info("→ 开始拉取 %s", biz)
  50. # 拉取创意数据
  51. if not creative_csv.exists():
  52. df_creative = _fetch_creative_data(biz)
  53. if df_creative is None:
  54. logger.error("✗ %s 创意数据拉取失败", biz)
  55. return False
  56. _RAW_DIR.mkdir(parents=True, exist_ok=True)
  57. df_creative.to_csv(creative_csv, index=False, encoding="utf-8-sig")
  58. logger.info(" ✓ 创意数据: %d 行 → %s", len(df_creative), creative_csv.name)
  59. else:
  60. logger.info(" ✓ 创意数据已存在")
  61. # 拉取广告状态
  62. if not ad_status_csv.exists():
  63. df_status = _fetch_ad_status(biz)
  64. if df_status is None:
  65. logger.error("✗ %s 广告状态拉取失败", biz)
  66. return False
  67. _AD_STATUS_DIR.mkdir(parents=True, exist_ok=True)
  68. df_status.to_csv(ad_status_csv, index=False, encoding="utf-8-sig")
  69. logger.info(" ✓ 广告状态: %d 行 → %s", len(df_status), ad_status_csv.name)
  70. else:
  71. logger.info(" ✓ 广告状态已存在")
  72. # 合并创意数据与广告状态(可选,自动执行)
  73. df_merged = _merge_single_day(biz)
  74. if df_merged is not None:
  75. logger.info(" ✓ 合并完成: %d 行, %d 列", len(df_merged), len(df_merged.columns))
  76. return True
  77. def fetch_multiple_days(days: int, end_date: str = "yesterday") -> None:
  78. """
  79. 拉取多日数据(增量)。
  80. Args:
  81. days: 拉取天数
  82. end_date: 结束日期(yesterday 或 YYYYMMDD)
  83. """
  84. if end_date == "yesterday":
  85. end_dt = datetime.now() - timedelta(days=1)
  86. else:
  87. end_dt = datetime.strptime(end_date, "%Y%m%d")
  88. logger.info("=" * 60)
  89. logger.info("开始拉取 %d 天数据(结束日期: %s)", days, end_dt.strftime("%Y%m%d"))
  90. logger.info("=" * 60)
  91. success_count = 0
  92. fail_count = 0
  93. skip_count = 0
  94. for i in range(days):
  95. date_dt = end_dt - timedelta(days=i)
  96. bizdate = date_dt.strftime("%Y%m%d")
  97. creative_csv = _RAW_DIR / f"creative_{bizdate}.csv"
  98. ad_status_csv = _AD_STATUS_DIR / f"ad_status_{bizdate}.csv"
  99. if creative_csv.exists() and ad_status_csv.exists():
  100. skip_count += 1
  101. continue
  102. if fetch_single_day(bizdate):
  103. success_count += 1
  104. else:
  105. fail_count += 1
  106. logger.info("=" * 60)
  107. logger.info("拉取完成: 成功 %d, 失败 %d, 跳过 %d", success_count, fail_count, skip_count)
  108. logger.info("=" * 60)
  109. # 列出已有文件
  110. creative_files = sorted(_RAW_DIR.glob("creative_*.csv"))
  111. status_files = sorted(_AD_STATUS_DIR.glob("ad_status_*.csv"))
  112. logger.info("创意数据文件 (%d 个):", len(creative_files))
  113. for f in creative_files[-5:]: # 只显示最近 5 个
  114. size_kb = f.stat().st_size / 1024
  115. logger.info(" %s (%.1f KB)", f.name, size_kb)
  116. logger.info("广告状态文件 (%d 个):", len(status_files))
  117. for f in status_files[-5:]:
  118. size_kb = f.stat().st_size / 1024
  119. logger.info(" %s (%.1f KB)", f.name, size_kb)
  120. if __name__ == "__main__":
  121. parser = argparse.ArgumentParser(description="拉取创意级别数据(V3)")
  122. parser.add_argument("--bizdate", default="", help="单日拉取: YYYYMMDD")
  123. parser.add_argument("--days", type=int, default=0, help="多日拉取: 天数")
  124. parser.add_argument("--end_date", default="yesterday", help="结束日期: yesterday 或 YYYYMMDD")
  125. args = parser.parse_args()
  126. if args.bizdate:
  127. # 单日模式
  128. success = fetch_single_day(args.bizdate)
  129. sys.exit(0 if success else 1)
  130. elif args.days > 0:
  131. # 多日模式
  132. fetch_multiple_days(args.days, args.end_date)
  133. else:
  134. # 默认:拉取 30 天
  135. fetch_multiple_days(30, "yesterday")