merge_data.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. """
  2. 数据合并脚本 — auto_put_ad_mini V3
  3. 独立工具:合并创意数据与广告状态数据
  4. 用法:
  5. # 合并最近 30 天
  6. .venv/bin/python3 examples/auto_put_ad_mini/merge_data.py --days 30
  7. # 合并单日
  8. .venv/bin/python3 examples/auto_put_ad_mini/merge_data.py --bizdate 20260411
  9. # 强制重新合并(覆盖已存在的文件)
  10. .venv/bin/python3 examples/auto_put_ad_mini/merge_data.py --days 30 --force
  11. """
  12. import argparse
  13. import logging
  14. import sys
  15. from datetime import datetime, timedelta
  16. from pathlib import Path
  17. # 把项目根目录加入 path
  18. ROOT = Path(__file__).resolve().parent.parent.parent
  19. sys.path.insert(0, str(ROOT))
  20. sys.path.insert(0, str(ROOT / "examples" / "auto_put_ad_mini"))
  21. from tools.data_query import _merge_single_day, _parse_bizdate
  22. logging.basicConfig(
  23. level=logging.INFO,
  24. format="%(asctime)s [%(levelname)s] %(message)s",
  25. datefmt="%H:%M:%S",
  26. )
  27. logger = logging.getLogger(__name__)
  28. _MINI_DIR = Path(__file__).resolve().parent
  29. _MERGED_DIR = _MINI_DIR / "outputs" / "merged"
  30. def merge_single_day(bizdate: str, force: bool = False) -> bool:
  31. """
  32. 合并单日数据。
  33. Returns:
  34. True: 成功合并
  35. False: 合并失败
  36. """
  37. biz, _ = _parse_bizdate(bizdate)
  38. # 检查是否已存在
  39. merged_csv = _MERGED_DIR / f"merged_{biz}.csv"
  40. if merged_csv.exists() and not force:
  41. logger.info("✓ %s 合并文件已存在,跳过(使用 --force 强制重新合并)", biz)
  42. return True
  43. logger.info("→ 开始合并 %s", biz)
  44. df = _merge_single_day(biz)
  45. if df is not None:
  46. logger.info("✓ 合并成功: %d 行, %d 列", len(df), len(df.columns))
  47. return True
  48. else:
  49. logger.error("✗ 合并失败(源文件缺失)")
  50. return False
  51. def merge_multiple_days(days: int, end_date: str = "yesterday", force: bool = False) -> None:
  52. """
  53. 合并多日数据。
  54. Args:
  55. days: 合并天数
  56. end_date: 结束日期(yesterday 或 YYYYMMDD)
  57. force: 是否强制重新合并
  58. """
  59. if end_date == "yesterday":
  60. end_dt = datetime.now() - timedelta(days=1)
  61. else:
  62. end_dt = datetime.strptime(end_date, "%Y%m%d")
  63. logger.info("=" * 60)
  64. logger.info("开始合并 %d 天数据(结束日期: %s)", days, end_dt.strftime("%Y%m%d"))
  65. logger.info("=" * 60)
  66. success_count = 0
  67. fail_count = 0
  68. skip_count = 0
  69. for i in range(days):
  70. date_dt = end_dt - timedelta(days=i)
  71. bizdate = date_dt.strftime("%Y%m%d")
  72. merged_csv = _MERGED_DIR / f"merged_{bizdate}.csv"
  73. if merged_csv.exists() and not force:
  74. skip_count += 1
  75. continue
  76. if merge_single_day(bizdate, force):
  77. success_count += 1
  78. else:
  79. fail_count += 1
  80. logger.info("=" * 60)
  81. logger.info("合并完成: 成功 %d, 失败 %d, 跳过 %d", success_count, fail_count, skip_count)
  82. logger.info("=" * 60)
  83. # 列出已有文件
  84. merged_files = sorted(_MERGED_DIR.glob("merged_*.csv"))
  85. logger.info("合并文件 (%d 个):", len(merged_files))
  86. for f in merged_files[-5:]: # 只显示最近 5 个
  87. size_kb = f.stat().st_size / 1024
  88. logger.info(" %s (%.1f KB)", f.name, size_kb)
  89. if __name__ == "__main__":
  90. parser = argparse.ArgumentParser(description="合并创意数据与广告状态(V3)")
  91. parser.add_argument("--bizdate", default="", help="单日合并: YYYYMMDD")
  92. parser.add_argument("--days", type=int, default=0, help="多日合并: 天数")
  93. parser.add_argument("--end_date", default="yesterday", help="结束日期: yesterday 或 YYYYMMDD")
  94. parser.add_argument("--force", action="store_true", help="强制重新合并(覆盖已存在的文件)")
  95. args = parser.parse_args()
  96. if args.bizdate:
  97. # 单日模式
  98. success = merge_single_day(args.bizdate, args.force)
  99. sys.exit(0 if success else 1)
  100. elif args.days > 0:
  101. # 多日模式
  102. merge_multiple_days(args.days, args.end_date, args.force)
  103. else:
  104. # 默认:合并 30 天
  105. merge_multiple_days(30, "yesterday", args.force)