#!/usr/bin/env python # coding=utf-8 """ 按天增量获取数据 - 通用版本 支持并发获取,自动跳过已有数据 用法: python fetch_daily.py tasks/xxx/query.sql # 获取最近7天 python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天 python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107 python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天 python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取 python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置并发数 """ import argparse import sys from datetime import datetime, timedelta from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import threading sys.path.insert(0, str(Path(__file__).parent / "lib")) from odps_module import ODPSClient # 线程安全的计数器 counter_lock = threading.Lock() success_count = 0 fail_count = 0 def get_existing_dates(daily_dir): """获取已下载的日期列表""" existing = set() if not daily_dir.exists(): return existing for f in daily_dir.glob("*.csv"): try: dt = f.stem if len(dt) == 8 and dt.isdigit(): existing.add(dt) except: pass return existing def get_date_range(start_str, end_str): """生成日期范围列表""" start = datetime.strptime(start_str, "%Y%m%d") end = datetime.strptime(end_str, "%Y%m%d") dates = [] current = start while current <= end: dates.append(current.strftime("%Y%m%d")) current += timedelta(days=1) return dates def fetch_single_day(dt, sql_template, daily_dir): """获取单天数据""" global success_count, fail_count try: client = ODPSClient() sql = sql_template.replace("${dt}", dt) df = client.execute_sql(sql) output_file = daily_dir / f"{dt}.csv" if df is not None and len(df) > 0: df.to_csv(output_file, index=False) with counter_lock: success_count += 1 return (dt, "success", len(df)) elif df is not None: df.to_csv(output_file, index=False) with counter_lock: success_count += 1 return (dt, "empty", 0) else: with counter_lock: fail_count += 1 return (dt, "fail", 0) except Exception as e: with counter_lock: fail_count += 1 return (dt, "error", str(e)) def main(): global success_count, fail_count parser = argparse.ArgumentParser(description="按天增量获取数据") parser.add_argument("sql_file", type=str, help="SQL文件路径") parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)") parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD") parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD") parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD") parser.add_argument("--force", action="store_true", help="强制重新获取") parser.add_argument("--workers", type=int, default=5, help="并发数 (默认5)") args = parser.parse_args() # 解析 SQL 文件路径 sql_file = Path(args.sql_file).resolve() if not sql_file.exists(): print(f"错误: 找不到 {sql_file}") return # 输出目录:SQL 同目录下的 output/SQL文件名/ output_dir = sql_file.parent / "output" daily_dir = output_dir / sql_file.stem daily_dir.mkdir(parents=True, exist_ok=True) print(f"SQL文件: {sql_file}") print(f"数据目录: {daily_dir}") # 确定日期范围 if args.date: target_dates = [args.date] elif args.start and args.end: target_dates = get_date_range(args.start, args.end) else: today = datetime.now() end_date = (today - timedelta(days=1)).strftime("%Y%m%d") start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d") target_dates = get_date_range(start_date, end_date) print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)") # 检查已有数据 existing_dates = get_existing_dates(daily_dir) print(f"已有数据: {len(existing_dates)}天") # 确定需要获取的日期 if args.force: missing_dates = target_dates print(f"强制模式: 重新获取所有 {len(missing_dates)} 天") else: missing_dates = [d for d in target_dates if d not in existing_dates] print(f"需要获取: {len(missing_dates)}天") if not missing_dates: print("没有需要获取的数据,退出") return # 读取 SQL 模板 sql_template = sql_file.read_text(encoding="utf-8") # 重置计数器 success_count = 0 fail_count = 0 # 并发获取 workers = min(args.workers, len(missing_dates)) print(f"\n开始获取 (并发数: {workers})...") with ThreadPoolExecutor(max_workers=workers) as executor: futures = { executor.submit(fetch_single_day, dt, sql_template, daily_dir): dt for dt in missing_dates } completed = 0 for future in as_completed(futures): completed += 1 dt, status, info = future.result() if status == "success": print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行") elif status == "empty": print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据") elif status == "error": print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}") else: print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败") print(f"\n完成! 成功: {success_count}, 失败: {fail_count}") print(f"数据目录: {daily_dir}") if __name__ == "__main__": main()