#!/usr/bin/env python # coding=utf-8 """ 按天增量获取数据 - 通用版本 支持并发获取,自动跳过已有数据 用法: python fetch_daily.py tasks/xxx/query.sql # 获取最近7天 python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天 python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107 python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天 python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取 python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数 python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐) python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载 """ import argparse import sys from datetime import datetime, timedelta from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import threading sys.path.insert(0, str(Path(__file__).parent / "lib")) from odps_module import ODPSClient # 线程安全的计数器 counter_lock = threading.Lock() success_count = 0 fail_count = 0 def get_existing_dates(daily_dir): """获取已下载的日期列表""" existing = set() if not daily_dir.exists(): return existing for f in daily_dir.glob("*.csv"): try: dt = f.stem if len(dt) == 8 and dt.isdigit(): existing.add(dt) except: pass return existing def merge_csv_files(daily_dir, output_file=None): """合并目录下所有日期 CSV 文件,只保留一个表头""" csv_files = sorted(daily_dir.glob("*.csv")) if not csv_files: print("没有找到 CSV 文件") return None if output_file is None: output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv" with open(output_file, "w", encoding="utf-8") as out: header_written = False total_rows = 0 for csv_file in csv_files: with open(csv_file, "r", encoding="utf-8") as f: lines = f.readlines() if not lines: continue if not header_written: out.write(lines[0]) header_written = True for line in lines[1:]: out.write(line) total_rows += 1 print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据") print(f"输出文件: {output_file}") return output_file def get_date_range(start_str, end_str): """生成日期范围列表""" start = datetime.strptime(start_str, "%Y%m%d") end = datetime.strptime(end_str, "%Y%m%d") dates = [] current = start while current <= end: dates.append(current.strftime("%Y%m%d")) current += timedelta(days=1) return dates def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0): """获取单天数据""" global success_count, fail_count try: client = ODPSClient() sql = sql_template.replace("${dt}", dt) output_file = daily_dir / f"{dt}.csv" # 下载到文件 if parallel_threads > 0: # 多线程并行下载(适合大数据量) client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads) else: # 单线程下载 client.execute_sql_result_save_file(sql, str(output_file)) # 检查结果 if output_file.exists(): row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头 with counter_lock: success_count += 1 if row_count > 0: return (dt, "success", row_count) else: return (dt, "empty", 0) else: with counter_lock: fail_count += 1 return (dt, "fail", 0) except Exception as e: with counter_lock: fail_count += 1 return (dt, "error", str(e)) def main(): global success_count, fail_count parser = argparse.ArgumentParser(description="按天增量获取数据") parser.add_argument("sql_file", type=str, help="SQL文件路径") parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)") parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD") parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD") parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD") parser.add_argument("--force", action="store_true", help="强制重新获取") parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)") parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)") parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件") args = parser.parse_args() # 解析 SQL 文件路径 sql_file = Path(args.sql_file).resolve() if not sql_file.exists(): print(f"错误: 找不到 {sql_file}") return # 输出目录:SQL 同目录下的 output/SQL文件名/ output_dir = sql_file.parent / "output" daily_dir = output_dir / sql_file.stem daily_dir.mkdir(parents=True, exist_ok=True) print(f"SQL文件: {sql_file}") print(f"数据目录: {daily_dir}") # 仅合并模式:不获取数据,直接合并已有文件 if args.merge: existing_dates = get_existing_dates(daily_dir) print(f"已有数据: {len(existing_dates)}天") if existing_dates: merge_csv_files(daily_dir) else: print("没有可合并的数据") return # 确定日期范围 if args.date: target_dates = [args.date] elif args.start and args.end: target_dates = get_date_range(args.start, args.end) else: today = datetime.now() end_date = (today - timedelta(days=1)).strftime("%Y%m%d") start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d") target_dates = get_date_range(start_date, end_date) print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)") # 检查已有数据 existing_dates = get_existing_dates(daily_dir) print(f"已有数据: {len(existing_dates)}天") # 确定需要获取的日期 if args.force: missing_dates = target_dates print(f"强制模式: 重新获取所有 {len(missing_dates)} 天") else: missing_dates = [d for d in target_dates if d not in existing_dates] print(f"需要获取: {len(missing_dates)}天") if not missing_dates: print("没有需要获取的数据,退出") return # 读取 SQL 模板 sql_template = sql_file.read_text(encoding="utf-8") # 检测 SQL 中是否包含 ${dt} 变量 has_dt_var = "${dt}" in sql_template # 重置计数器 success_count = 0 fail_count = 0 # 如果 SQL 中没有 ${dt},只需执行一次 if not has_dt_var: print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...") target_dates = ["20000101"] # 用虚拟日期 missing_dates = target_dates output_file = output_dir / f"{sql_file.stem}.csv" output_file.parent.mkdir(parents=True, exist_ok=True) try: client = ODPSClient() if args.parallel > 0: client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel) else: client.execute_sql_result_save_file(sql_template, str(output_file)) print(f"数据目录: {output_file}") except Exception as e: print(f"✗ 执行失败: {e}") return # 并发获取 print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)") workers = min(args.workers, len(missing_dates)) if args.parallel > 0: print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...") else: print(f"\n开始获取 (并发数: {workers})...") with ThreadPoolExecutor(max_workers=workers) as executor: futures = { executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel): dt for dt in missing_dates } completed = 0 for future in as_completed(futures): completed += 1 dt, status, info = future.result() if status == "success": print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行") elif status == "empty": print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据") elif status == "error": print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}") else: print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败") print(f"\n完成! 成功: {success_count}, 失败: {fail_count}") print(f"数据目录: {daily_dir}") if __name__ == "__main__": main()