Просмотр исходного кода

feat(fetch_daily): 支持 --hh 参数按小时粒度获取数据,优化飞书上传清理逻辑

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
yangxiaohui 4 недель назад
Родитель
Сommit
5241a82f64
1 измененных файлов с 45 добавлено и 22 удалено
  1. 45 22
      fetch_daily.py

+ 45 - 22
fetch_daily.py

@@ -9,6 +9,7 @@
     python fetch_daily.py tasks/xxx/query.sql --days 30          # 获取最近30天
     python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
     python fetch_daily.py tasks/xxx/query.sql --date 20260105    # 单天
+    python fetch_daily.py tasks/xxx/query.sql --date 20260105 --hh 08  # 单天单小时
     python fetch_daily.py tasks/xxx/query.sql --force            # 强制重新获取
     python fetch_daily.py tasks/xxx/query.sql --workers 10       # 设置天级并发数
     python fetch_daily.py tasks/xxx/query.sql --parallel 50      # 单天多线程下载(默认50,大数据量推荐)
@@ -35,16 +36,25 @@ success_count = 0
 fail_count = 0
 
 
-def get_existing_dates(daily_dir):
-    """获取已下载的日期列表"""
+def get_existing_dates(daily_dir, hh=None):
+    """获取已下载的日期列表(可选指定小时)"""
     existing = set()
     if not daily_dir.exists():
         return existing
     for f in daily_dir.glob("*.csv"):
         try:
-            dt = f.stem
-            if len(dt) == 8 and dt.isdigit():
-                existing.add(dt)
+            stem = f.stem
+            if hh is not None:
+                # 带小时格式:20250101_08
+                if len(stem) == 11 and stem[8] == '_':
+                    dt = stem[:8]
+                    file_hh = stem[9:11]
+                    if dt.isdigit() and file_hh == hh:
+                        existing.add(dt)
+            else:
+                # 仅日期格式:20250101
+                if len(stem) == 8 and stem.isdigit():
+                    existing.add(stem)
         except:
             pass
     return existing
@@ -480,21 +490,26 @@ def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc",
         print(f"  处理: {processed}/{total_rows}")
 
     # 第4步:删除末尾多余的空行(扩展容量时添加的)
-    final_row_count = 2 + total_rows  # 表头 + 模板 + 数据
-    current_row_count = 2 + total_rows * 2  # 扩展 + 插入
-    if current_row_count > final_row_count:
-        print(f"清理多余空行...")
+    # 重新查询实际行数,避免因初始行数不同导致计算偏差
+    sheet_props_after = client.get_sheet_properties(access_token, sheet_token, sheet_id)
+    actual_row_count = sheet_props_after['row_count'] if sheet_props_after else 0
+    # 期望行数:初始保留行数 + 插入的数据行数
+    rows_after_cleanup = 2 if current_rows > 2 else current_rows
+    final_row_count = rows_after_cleanup + total_rows
+    if actual_row_count > final_row_count:
+        print(f"清理多余空行({actual_row_count - final_row_count}行)...")
         try:
-            client.delete_rows(access_token, sheet_token, sheet_id, final_row_count + 1, current_row_count)
+            client.delete_rows(access_token, sheet_token, sheet_id, final_row_count + 1, actual_row_count)
         except Exception as e:
             print(f"  清理失败: {e}")
 
-    # 第5步:删除模板行(第2行)
-    print(f"删除模板行...")
-    try:
-        client.delete_rows(access_token, sheet_token, sheet_id, 2, 2)
-    except Exception as e:
-        print(f"  删除模板行失败: {e}")
+    # 第5步:删除模板行(第2行),仅当初始存在模板行时
+    if current_rows >= 2:
+        print(f"删除模板行...")
+        try:
+            client.delete_rows(access_token, sheet_token, sheet_id, 2, 2)
+        except Exception as e:
+            print(f"  删除模板行失败: {e}")
 
     print(f"飞书上传完成: {sheet_token}")
 
@@ -511,14 +526,18 @@ def get_date_range(start_str, end_str):
     return dates
 
 
-def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default"):
-    """获取单天数据"""
+def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default", hh=None):
+    """获取单天数据(可选指定小时)"""
     global success_count, fail_count
 
     try:
         client = ODPSClient(config=config)
         sql = sql_template.replace("${dt}", dt)
-        output_file = daily_dir / f"{dt}.csv"
+        if hh is not None:
+            sql = sql.replace("${hh}", hh)
+            output_file = daily_dir / f"{dt}_{hh}.csv"
+        else:
+            output_file = daily_dir / f"{dt}.csv"
 
         # 下载到文件
         if parallel_threads > 0:
@@ -557,6 +576,7 @@ def main():
     parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
     parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
     parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
+    parser.add_argument("--hh", type=str, default=None, help="小时 HH (00-23),需配合 --date 使用")
     parser.add_argument("--force", action="store_true", help="强制重新获取")
     parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
     parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
@@ -628,8 +648,11 @@ def main():
     print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
 
     # 检查已有数据
-    existing_dates = get_existing_dates(daily_dir)
-    print(f"已有数据: {len(existing_dates)}天")
+    existing_dates = get_existing_dates(daily_dir, args.hh)
+    if args.hh:
+        print(f"已有数据: {len(existing_dates)}天 (hh={args.hh})")
+    else:
+        print(f"已有数据: {len(existing_dates)}天")
 
     # 确定需要获取的日期
     if args.force:
@@ -685,7 +708,7 @@ def main():
 
     with ThreadPoolExecutor(max_workers=workers) as executor:
         futures = {
-            executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config): dt
+            executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config, args.hh): dt
             for dt in missing_dates
         }