fetch_daily.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. """
  4. 按天增量获取数据 - 通用版本
  5. 支持并发获取,自动跳过已有数据
  6. 用法:
  7. python fetch_daily.py tasks/xxx/query.sql # 获取最近7天
  8. python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天
  9. python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
  10. python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天
  11. python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取
  12. python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数
  13. python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐)
  14. python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载
  15. """
  16. import argparse
  17. import sys
  18. from datetime import datetime, timedelta
  19. from pathlib import Path
  20. from concurrent.futures import ThreadPoolExecutor, as_completed
  21. import threading
  22. sys.path.insert(0, str(Path(__file__).parent / "lib"))
  23. from odps_module import ODPSClient
  24. # 线程安全的计数器
  25. counter_lock = threading.Lock()
  26. success_count = 0
  27. fail_count = 0
  28. def get_existing_dates(daily_dir):
  29. """获取已下载的日期列表"""
  30. existing = set()
  31. if not daily_dir.exists():
  32. return existing
  33. for f in daily_dir.glob("*.csv"):
  34. try:
  35. dt = f.stem
  36. if len(dt) == 8 and dt.isdigit():
  37. existing.add(dt)
  38. except:
  39. pass
  40. return existing
  41. def merge_csv_files(daily_dir, output_file=None):
  42. """合并目录下所有日期 CSV 文件,只保留一个表头"""
  43. csv_files = sorted(daily_dir.glob("*.csv"))
  44. if not csv_files:
  45. print("没有找到 CSV 文件")
  46. return None
  47. if output_file is None:
  48. output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv"
  49. with open(output_file, "w", encoding="utf-8") as out:
  50. header_written = False
  51. total_rows = 0
  52. for csv_file in csv_files:
  53. with open(csv_file, "r", encoding="utf-8") as f:
  54. lines = f.readlines()
  55. if not lines:
  56. continue
  57. if not header_written:
  58. out.write(lines[0])
  59. header_written = True
  60. for line in lines[1:]:
  61. out.write(line)
  62. total_rows += 1
  63. print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据")
  64. print(f"输出文件: {output_file}")
  65. return output_file
  66. def get_date_range(start_str, end_str):
  67. """生成日期范围列表"""
  68. start = datetime.strptime(start_str, "%Y%m%d")
  69. end = datetime.strptime(end_str, "%Y%m%d")
  70. dates = []
  71. current = start
  72. while current <= end:
  73. dates.append(current.strftime("%Y%m%d"))
  74. current += timedelta(days=1)
  75. return dates
  76. def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0):
  77. """获取单天数据"""
  78. global success_count, fail_count
  79. try:
  80. client = ODPSClient()
  81. sql = sql_template.replace("${dt}", dt)
  82. output_file = daily_dir / f"{dt}.csv"
  83. # 下载到文件
  84. if parallel_threads > 0:
  85. # 多线程并行下载(适合大数据量)
  86. client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads)
  87. else:
  88. # 单线程下载
  89. client.execute_sql_result_save_file(sql, str(output_file))
  90. # 检查结果
  91. if output_file.exists():
  92. row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头
  93. with counter_lock:
  94. success_count += 1
  95. if row_count > 0:
  96. return (dt, "success", row_count)
  97. else:
  98. return (dt, "empty", 0)
  99. else:
  100. with counter_lock:
  101. fail_count += 1
  102. return (dt, "fail", 0)
  103. except Exception as e:
  104. with counter_lock:
  105. fail_count += 1
  106. return (dt, "error", str(e))
  107. def main():
  108. global success_count, fail_count
  109. parser = argparse.ArgumentParser(description="按天增量获取数据")
  110. parser.add_argument("sql_file", type=str, help="SQL文件路径")
  111. parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)")
  112. parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
  113. parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
  114. parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
  115. parser.add_argument("--force", action="store_true", help="强制重新获取")
  116. parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
  117. parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
  118. parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件")
  119. args = parser.parse_args()
  120. # 解析 SQL 文件路径
  121. sql_file = Path(args.sql_file).resolve()
  122. if not sql_file.exists():
  123. print(f"错误: 找不到 {sql_file}")
  124. return
  125. # 输出目录:SQL 同目录下的 output/SQL文件名/
  126. output_dir = sql_file.parent / "output"
  127. daily_dir = output_dir / sql_file.stem
  128. daily_dir.mkdir(parents=True, exist_ok=True)
  129. print(f"SQL文件: {sql_file}")
  130. print(f"数据目录: {daily_dir}")
  131. # 仅合并模式:不获取数据,直接合并已有文件
  132. if args.merge:
  133. existing_dates = get_existing_dates(daily_dir)
  134. print(f"已有数据: {len(existing_dates)}天")
  135. if existing_dates:
  136. merge_csv_files(daily_dir)
  137. else:
  138. print("没有可合并的数据")
  139. return
  140. # 确定日期范围
  141. if args.date:
  142. target_dates = [args.date]
  143. elif args.start and args.end:
  144. target_dates = get_date_range(args.start, args.end)
  145. else:
  146. today = datetime.now()
  147. end_date = (today - timedelta(days=1)).strftime("%Y%m%d")
  148. start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d")
  149. target_dates = get_date_range(start_date, end_date)
  150. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  151. # 检查已有数据
  152. existing_dates = get_existing_dates(daily_dir)
  153. print(f"已有数据: {len(existing_dates)}天")
  154. # 确定需要获取的日期
  155. if args.force:
  156. missing_dates = target_dates
  157. print(f"强制模式: 重新获取所有 {len(missing_dates)} 天")
  158. else:
  159. missing_dates = [d for d in target_dates if d not in existing_dates]
  160. print(f"需要获取: {len(missing_dates)}天")
  161. if not missing_dates:
  162. print("没有需要获取的数据,退出")
  163. return
  164. # 读取 SQL 模板
  165. sql_template = sql_file.read_text(encoding="utf-8")
  166. # 检测 SQL 中是否包含 ${dt} 变量
  167. has_dt_var = "${dt}" in sql_template
  168. # 重置计数器
  169. success_count = 0
  170. fail_count = 0
  171. # 如果 SQL 中没有 ${dt},只需执行一次
  172. if not has_dt_var:
  173. print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...")
  174. target_dates = ["20000101"] # 用虚拟日期
  175. missing_dates = target_dates
  176. output_file = output_dir / f"{sql_file.stem}.csv"
  177. output_file.parent.mkdir(parents=True, exist_ok=True)
  178. try:
  179. client = ODPSClient()
  180. if args.parallel > 0:
  181. client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
  182. else:
  183. client.execute_sql_result_save_file(sql_template, str(output_file))
  184. print(f"数据目录: {output_file}")
  185. except Exception as e:
  186. print(f"✗ 执行失败: {e}")
  187. return
  188. # 并发获取
  189. print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
  190. workers = min(args.workers, len(missing_dates))
  191. if args.parallel > 0:
  192. print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...")
  193. else:
  194. print(f"\n开始获取 (并发数: {workers})...")
  195. with ThreadPoolExecutor(max_workers=workers) as executor:
  196. futures = {
  197. executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel): dt
  198. for dt in missing_dates
  199. }
  200. completed = 0
  201. for future in as_completed(futures):
  202. completed += 1
  203. dt, status, info = future.result()
  204. if status == "success":
  205. print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行")
  206. elif status == "empty":
  207. print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据")
  208. elif status == "error":
  209. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}")
  210. else:
  211. print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败")
  212. print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
  213. print(f"数据目录: {daily_dir}")
  214. if __name__ == "__main__":
  215. main()