fetch_daily.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. """
  4. 按天增量获取数据 - 通用版本
  5. 支持并发获取,自动跳过已有数据
  6. 用法:
  7. python fetch_daily.py tasks/xxx/query.sql # 获取最近7天
  8. python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天
  9. python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
  10. python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天
  11. python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取
  12. python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置并发数
  13. """
  14. import argparse
  15. import sys
  16. from datetime import datetime, timedelta
  17. from pathlib import Path
  18. from concurrent.futures import ThreadPoolExecutor, as_completed
  19. import threading
  20. sys.path.insert(0, str(Path(__file__).parent / "lib"))
  21. from odps_module import ODPSClient
  22. # 线程安全的计数器
  23. counter_lock = threading.Lock()
  24. success_count = 0
  25. fail_count = 0
  26. def get_existing_dates(daily_dir):
  27. """获取已下载的日期列表"""
  28. existing = set()
  29. if not daily_dir.exists():
  30. return existing
  31. for f in daily_dir.glob("*.csv"):
  32. try:
  33. dt = f.stem
  34. if len(dt) == 8 and dt.isdigit():
  35. existing.add(dt)
  36. except:
  37. pass
  38. return existing
  39. def get_date_range(start_str, end_str):
  40. """生成日期范围列表"""
  41. start = datetime.strptime(start_str, "%Y%m%d")
  42. end = datetime.strptime(end_str, "%Y%m%d")
  43. dates = []
  44. current = start
  45. while current <= end:
  46. dates.append(current.strftime("%Y%m%d"))
  47. current += timedelta(days=1)
  48. return dates
  49. def fetch_single_day(dt, sql_template, daily_dir):
  50. """获取单天数据"""
  51. global success_count, fail_count
  52. try:
  53. client = ODPSClient()
  54. sql = sql_template.replace("${dt}", dt)
  55. df = client.execute_sql(sql)
  56. output_file = daily_dir / f"{dt}.csv"
  57. if df is not None and len(df) > 0:
  58. df.to_csv(output_file, index=False)
  59. with counter_lock:
  60. success_count += 1
  61. return (dt, "success", len(df))
  62. elif df is not None:
  63. df.to_csv(output_file, index=False)
  64. with counter_lock:
  65. success_count += 1
  66. return (dt, "empty", 0)
  67. else:
  68. with counter_lock:
  69. fail_count += 1
  70. return (dt, "fail", 0)
  71. except Exception as e:
  72. with counter_lock:
  73. fail_count += 1
  74. return (dt, "error", str(e))
  75. def main():
  76. global success_count, fail_count
  77. parser = argparse.ArgumentParser(description="按天增量获取数据")
  78. parser.add_argument("sql_file", type=str, help="SQL文件路径")
  79. parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)")
  80. parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
  81. parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
  82. parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
  83. parser.add_argument("--force", action="store_true", help="强制重新获取")
  84. parser.add_argument("--workers", type=int, default=5, help="并发数 (默认5)")
  85. args = parser.parse_args()
  86. # 解析 SQL 文件路径
  87. sql_file = Path(args.sql_file).resolve()
  88. if not sql_file.exists():
  89. print(f"错误: 找不到 {sql_file}")
  90. return
  91. # 输出目录:SQL 同目录下的 output/SQL文件名/
  92. output_dir = sql_file.parent / "output"
  93. daily_dir = output_dir / sql_file.stem
  94. daily_dir.mkdir(parents=True, exist_ok=True)
  95. print(f"SQL文件: {sql_file}")
  96. print(f"数据目录: {daily_dir}")
  97. # 确定日期范围
  98. if args.date:
  99. target_dates = [args.date]
  100. elif args.start and args.end:
  101. target_dates = get_date_range(args.start, args.end)
  102. else:
  103. today = datetime.now()
  104. end_date = (today - timedelta(days=1)).strftime("%Y%m%d")
  105. start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d")
  106. target_dates = get_date_range(start_date, end_date)
  107. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  108. # 检查已有数据
  109. existing_dates = get_existing_dates(daily_dir)
  110. print(f"已有数据: {len(existing_dates)}天")
  111. # 确定需要获取的日期
  112. if args.force:
  113. missing_dates = target_dates
  114. print(f"强制模式: 重新获取所有 {len(missing_dates)} 天")
  115. else:
  116. missing_dates = [d for d in target_dates if d not in existing_dates]
  117. print(f"需要获取: {len(missing_dates)}天")
  118. if not missing_dates:
  119. print("没有需要获取的数据,退出")
  120. return
  121. # 读取 SQL 模板
  122. sql_template = sql_file.read_text(encoding="utf-8")
  123. # 重置计数器
  124. success_count = 0
  125. fail_count = 0
  126. # 并发获取
  127. workers = min(args.workers, len(missing_dates))
  128. print(f"\n开始获取 (并发数: {workers})...")
  129. with ThreadPoolExecutor(max_workers=workers) as executor:
  130. futures = {
  131. executor.submit(fetch_single_day, dt, sql_template, daily_dir): dt
  132. for dt in missing_dates
  133. }
  134. completed = 0
  135. for future in as_completed(futures):
  136. completed += 1
  137. dt, status, info = future.result()
  138. if status == "success":
  139. print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行")
  140. elif status == "empty":
  141. print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据")
  142. elif status == "error":
  143. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}")
  144. else:
  145. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败")
  146. print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
  147. print(f"数据目录: {daily_dir}")
  148. if __name__ == "__main__":
  149. main()