#!/usr/bin/env python # coding=utf-8 """ 按天增量获取数据 - 通用版本 支持并发获取,自动跳过已有数据 用法: python fetch_daily.py tasks/xxx/query.sql # 获取最近7天 python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天 python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107 python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天 python fetch_daily.py tasks/xxx/query.sql --date 20260105 --hh 08 # 单天单小时 python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取 python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数 python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐) python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载 python fetch_daily.py tasks/xxx/query.sql --feishu # 获取后上传到飞书表格 python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN # 指定飞书表格token python fetch_daily.py tasks/xxx/query.sql --merge --feishu # 仅合并并上传飞书 python fetch_daily.py tasks/xxx/query.sql --config piaoquan_api # 切换 ODPS 配置 """ import argparse import sys from datetime import datetime, timedelta from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import threading sys.path.insert(0, str(Path(__file__).parent / "lib")) from odps_module import ODPSClient import csv # 线程安全的计数器 counter_lock = threading.Lock() success_count = 0 fail_count = 0 def get_existing_dates(daily_dir, hh=None): """获取已下载的日期列表(可选指定小时)""" existing = set() if not daily_dir.exists(): return existing for f in daily_dir.glob("*.csv"): try: stem = f.stem if hh is not None: # 带小时格式:20250101_08 if len(stem) == 11 and stem[8] == '_': dt = stem[:8] file_hh = stem[9:11] if dt.isdigit() and file_hh == hh: existing.add(dt) else: # 仅日期格式:20250101 if len(stem) == 8 and stem.isdigit(): existing.add(stem) except: pass return existing def merge_csv_files(daily_dir, output_file=None): """合并目录下所有日期 CSV 文件,只保留一个表头""" csv_files = sorted(daily_dir.glob("*.csv")) if not csv_files: print("没有找到 CSV 文件") return None if output_file is None: output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv" with open(output_file, "w", encoding="utf-8") as out: header_written = False total_rows = 0 for csv_file in csv_files: with open(csv_file, "r", encoding="utf-8") as f: lines = f.readlines() if not lines: continue if not header_written: out.write(lines[0]) header_written = True for line in lines[1:]: out.write(line) total_rows += 1 print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据") print(f"输出文件: {output_file}") return output_file def infer_column_types(rows): """推断每列的类型:int, float, 或 str""" if not rows: return [] num_cols = len(rows[0]) col_types = [] for col_idx in range(num_cols): has_float = False all_numeric = True for row in rows: if col_idx >= len(row): continue v = row[col_idx].strip() if row[col_idx] else "" if not v: # 空值不影响类型判断 continue try: if '.' in v or 'e' in v.lower(): float(v) has_float = True else: int(v) except ValueError: all_numeric = False break if all_numeric: col_types.append('float' if has_float else 'int') else: col_types.append('str') return col_types def convert_row_by_types(row, col_types): """按列类型转换一行数据""" result = [] for i, cell in enumerate(row): if i >= len(col_types): result.append(cell) continue v = cell.strip() if cell else "" if not v: result.append("") continue col_type = col_types[i] if col_type == 'int': result.append(int(v)) elif col_type == 'float': result.append(float(v)) else: result.append(cell) return result def load_feishu_config(sql_file): """加载飞书配置,优先级: {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值""" import json defaults = { "token": "ONZqsxB9BhGH8tt90EScSJT5nHh", "sheet_id": None, "sort": "dt:desc", "cols": None, "filter": None, "limit": None, } root_dir = Path(__file__).parent sql_dir = sql_file.parent sql_name = sql_file.stem def load_json(path, name): if path.exists(): try: with open(path, "r", encoding="utf-8") as f: defaults.update(json.load(f)) except Exception as e: print(f"警告: 读取 {name} 失败: {e}") # 按优先级从低到高加载(后加载的覆盖先加载的) load_json(root_dir / "default.json", "根目录/default.json") load_json(sql_dir / "default.json", "sql目录/default.json") load_json(sql_dir / f"{sql_name}.json", f"{sql_name}.json") return defaults def parse_sort_spec(sort_spec): """解析排序规格,如 'dt:desc,name:asc' -> [('dt', True), ('name', False)]""" if not sort_spec: return [] result = [] for part in sort_spec.split(","): part = part.strip() if not part: continue if ":" in part: field, order = part.rsplit(":", 1) desc = order.lower() != "asc" else: field, desc = part, True # 默认逆序 result.append((field.strip(), desc)) return result def parse_cols_spec(cols_spec): """解析列映射规格,如 'dt:日期,name,value:数值' -> [('dt', '日期'), ('name', 'name'), ('value', '数值')]""" if not cols_spec: return [] result = [] for part in cols_spec.split(","): part = part.strip() if not part: continue if ":" in part: old_name, new_name = part.split(":", 1) result.append((old_name.strip(), new_name.strip())) else: result.append((part, part)) return result def apply_cols_mapping(header, data_rows, cols_spec): """应用列映射:筛选、排序、重命名""" col_mapping = parse_cols_spec(cols_spec) if not col_mapping: return header, data_rows # 构建索引映射 header_index = {name: i for i, name in enumerate(header)} new_header = [] col_indices = [] for old_name, new_name in col_mapping: if old_name in header_index: col_indices.append(header_index[old_name]) new_header.append(new_name) else: print(f"警告: 字段 '{old_name}' 不存在,已跳过") if not col_indices: print("警告: 没有有效的列映射,保持原样") return header, data_rows # 应用映射 new_rows = [] for row in data_rows: new_row = [row[i] if i < len(row) else "" for i in col_indices] new_rows.append(new_row) print(f"列映射: {len(col_indices)} 列") return new_header, new_rows def column_index_to_letter(col_idx): """列索引转字母,如 1->A, 26->Z, 27->AA""" result = "" while col_idx > 0: col_idx, remainder = divmod(col_idx - 1, 26) result = chr(65 + remainder) + result return result def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None, filter_spec=None, limit=None): """上传 CSV 文件到飞书表格(通过模板行继承样式) 第1行: 表头 第2行: 样式模板(用于继承,最后删除) 第3行起: 数据 Args: csv_file: CSV 文件路径 sheet_token: 飞书表格 token sheet_id: 工作表 ID(None 时自动获取第一个) sort_spec: 排序规格,如 "dt:desc,name:asc" cols_spec: 列映射规格,如 "dt:日期,name,value:数值" filter_spec: 过滤条件,dict {"字段": "值"} 或 str "字段=值,字段=值" limit: 上传行数上限 """ from feishu import Client, LARK_HOST, APP_ID, APP_SECRET, request # 读取 CSV with open(csv_file, "r", encoding="utf-8") as f: reader = csv.reader(f) rows = list(reader) if len(rows) < 2: print("CSV 文件为空,跳过上传") return header = rows[0] data_rows = rows[1:] # 排序(在列映射之前,使用原始列名) sort_fields = parse_sort_spec(sort_spec) if sort_fields: applied = [] for field, desc in reversed(sort_fields): if field in header: idx = header.index(field) data_rows.sort(key=lambda row: row[idx] if idx < len(row) else "", reverse=desc) applied.append(f"{field}:{'desc' if desc else 'asc'}") if applied: print(f"排序: {', '.join(reversed(applied))}") # 过滤(排序之后) if filter_spec: # 支持 dict(来自 JSON 配置)或 str(来自命令行 "字段=值,字段!=值") if isinstance(filter_spec, str): filters = [] for part in filter_spec.split(","): if "!=" in part: k, v = part.split("!=", 1) filters.append((k.strip(), v.strip(), "!=")) elif "=" in part: k, v = part.split("=", 1) filters.append((k.strip(), v.strip(), "==")) elif isinstance(filter_spec, dict): filters = [(k, v, "==") for k, v in filter_spec.items()] before_count = len(data_rows) for field, value, op in filters: if field in header: idx = header.index(field) if op == "!=": data_rows = [row for row in data_rows if idx < len(row) and row[idx] != str(value)] else: data_rows = [row for row in data_rows if idx < len(row) and row[idx] == str(value)] print(f"过滤: {filters} → {before_count} → {len(data_rows)} 行") # limit(过滤之后) if limit and len(data_rows) > limit: print(f"限制行数: {len(data_rows)} → {limit}") data_rows = data_rows[:limit] # 列映射(排序之后) header, data_rows = apply_cols_mapping(header, data_rows, cols_spec) # 按列推断类型并转换 col_types = infer_column_types(data_rows) converted_rows = [convert_row_by_types(row, col_types) for row in data_rows] # 初始化飞书客户端 client = Client(LARK_HOST) access_token = client.get_tenant_access_token(APP_ID, APP_SECRET) # 获取 sheet_id if sheet_id is None: sheet_id = client.get_sheetid(access_token, sheet_token) print(f"Sheet ID: {sheet_id}") # 获取表格信息 sheet_props = client.get_sheet_properties(access_token, sheet_token, sheet_id) current_cols = sheet_props['column_count'] if sheet_props else 26 header_end_col = column_index_to_letter(current_cols) # 扩展列数(CSV 列数超过当前 sheet 列数时) num_csv_cols = len(header) if num_csv_cols > current_cols: add_cols = num_csv_cols - current_cols expand_headers = { 'Content-Type': 'application/json; charset=utf-8', 'Authorization': f'Bearer {access_token}' } expand_payload = { "dimension": { "sheetId": sheet_id, "majorDimension": "COLUMNS", "length": add_cols } } try: request("POST", f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range", expand_headers, expand_payload) print(f"扩展列数: {current_cols} -> {num_csv_cols} (+{add_cols}列)") current_cols = num_csv_cols header_end_col = column_index_to_letter(current_cols) except Exception as e: print(f" 扩展列数失败: {e}") # 读取飞书表头(获取所有列) feishu_header = client.read_range_values(access_token, sheet_token, f"{sheet_id}!A1:{header_end_col}1") feishu_cols = [] if feishu_header and feishu_header[0]: feishu_cols = [c for c in feishu_header[0] if c] # 过滤 None 和空字符串 # 富文本列转纯文本(飞书表头可能含带链接的 list 结构) def _col_to_str(col): if isinstance(col, list): return "".join(item.get("text", "") for item in col if isinstance(item, dict)) return col if feishu_cols: feishu_cols_str = [_col_to_str(c) for c in feishu_cols] print(f"飞书表头: {feishu_cols_str}") print(f"CSV表头: {header}") # 校验字段一致性(警告但继续,以飞书表头为准) feishu_set = set(feishu_cols_str) csv_set = set(header) missing_in_csv = feishu_set - csv_set missing_in_feishu = csv_set - feishu_set if missing_in_csv: print(f"警告: CSV缺少字段(将填空值): {missing_in_csv}") if missing_in_feishu: print(f"警告: 飞书缺少字段(将忽略): {missing_in_feishu}") # 按飞书表头顺序重排数据(用纯文本版本做匹配) csv_col_index = {name: i for i, name in enumerate(header)} new_converted_rows = [] for row in converted_rows: new_row = [] for col_name in feishu_cols_str: if col_name in csv_col_index: new_row.append(row[csv_col_index[col_name]]) else: new_row.append("") # CSV缺少的字段填空 new_converted_rows.append(new_row) converted_rows = new_converted_rows header = feishu_cols print(f"已按飞书表头顺序重排数据") else: # 飞书表头为空,用 CSV 表头写入(飞书单次最多写100列,需分批) print(f"飞书表头为空,使用 CSV 表头写入") col_batch = 100 for start in range(0, len(header), col_batch): end = min(start + col_batch, len(header)) start_col = column_index_to_letter(start + 1) end_col = column_index_to_letter(end) batch_range = f"{sheet_id}!{start_col}1:{end_col}1" client.batch_update_values(access_token, sheet_token, { "valueRanges": [{"range": batch_range, "values": [header[start:end]]}] }) total_rows = len(converted_rows) num_cols = len(header) end_col = column_index_to_letter(num_cols) # 飞书单 sheet 上限 5,000,000 cells,预留表头+模板行 CELL_LIMIT = 5_000_000 max_data_rows = (CELL_LIMIT // num_cols) - 2 if total_rows > max_data_rows: print(f"⚠ 飞书 cell 上限 {CELL_LIMIT:,}({num_cols}列 × {max_data_rows}行),截断 {total_rows} → {max_data_rows} 行") converted_rows = converted_rows[:max_data_rows] total_rows = max_data_rows print(f"上传到飞书: {total_rows} 行数据") batch_size = 500 # 获取当前行数(复用之前获取的 sheet_props) current_rows = sheet_props['row_count'] if sheet_props else 2 print(f"当前行数: {current_rows}, 需要数据行: {total_rows}") headers = { 'Content-Type': 'application/json; charset=utf-8', 'Authorization': f'Bearer {access_token}' } # 第1步:删除旧数据行(保留第1行表头 + 第2行样式模板),分批删除 if current_rows > 2: print(f"清理旧数据({current_rows - 2}行)...") rows_to_delete = current_rows - 2 delete_batch = 5000 while rows_to_delete > 0: # 每次从第3行开始删除,删除后行号会自动调整 batch = min(rows_to_delete, delete_batch) try: client.delete_rows(access_token, sheet_token, sheet_id, 3, 2 + batch) rows_to_delete -= batch if rows_to_delete > 0: print(f" 已删除 {current_rows - 2 - rows_to_delete}/{current_rows - 2}") except Exception as e: print(f" 清理失败: {e}") break # 第2步:扩展表格容量(insert 不会自动扩展) # 删除后当前只有2行(表头+模板),需要扩展到 2 + total_rows 行 add_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range" expand_batch = 5000 remaining = total_rows expanded = 0 while remaining > 0: chunk = min(remaining, expand_batch) add_payload = { "dimension": { "sheetId": sheet_id, "majorDimension": "ROWS", "length": chunk } } try: request("POST", add_url, headers, add_payload) expanded += chunk remaining -= chunk except Exception as e: print(f" 扩展容量失败(已扩展{expanded}): {e}") break if expanded > 0: print(f"扩展容量: +{expanded} 行") # 第3步:分批写入数据到扩展的空行(不再 insert,避免 expand+insert 双重加行超 cell 上限) print(f"写入 {total_rows} 行...") batches = [converted_rows[i:i + batch_size] for i in range(0, total_rows, batch_size)] processed = 0 for i, batch in enumerate(batches): batch_count = len(batch) start_row = 3 + i * batch_size # 从第3行开始,顺序写入 # 写入数据(飞书单次最多100列,需按列分批) col_batch = 100 value_ranges = [] for col_start in range(0, num_cols, col_batch): col_end = min(col_start + col_batch, num_cols) sc = column_index_to_letter(col_start + 1) ec = column_index_to_letter(col_end) col_range = f"{sheet_id}!{sc}{start_row}:{ec}{start_row + batch_count - 1}" col_values = [row[col_start:col_end] for row in batch] value_ranges.append({"range": col_range, "values": col_values}) client.batch_update_values(access_token, sheet_token, { "valueRanges": value_ranges }) processed += batch_count print(f" 处理: {processed}/{total_rows}") # 第5步:删除模板行(第2行),仅当初始存在模板行时 if current_rows >= 2: print(f"删除模板行...") try: client.delete_rows(access_token, sheet_token, sheet_id, 2, 2) except Exception as e: print(f" 删除模板行失败: {e}") print(f"飞书上传完成: {sheet_token}") def get_date_range(start_str, end_str): """生成日期范围列表""" start = datetime.strptime(start_str, "%Y%m%d") end = datetime.strptime(end_str, "%Y%m%d") dates = [] current = start while current <= end: dates.append(current.strftime("%Y%m%d")) current += timedelta(days=1) return dates def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default", hh=None): """获取单天数据(可选指定小时)""" global success_count, fail_count try: client = ODPSClient(config=config) sql = sql_template.replace("${dt}", dt) if hh is not None: sql = sql.replace("${hh}", hh) output_file = daily_dir / f"{dt}_{hh}.csv" else: output_file = daily_dir / f"{dt}.csv" # 下载到文件 if parallel_threads > 0: # 多线程并行下载(适合大数据量) client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads) else: # 单线程下载 client.execute_sql_result_save_file(sql, str(output_file)) # 检查结果 if output_file.exists(): row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头 with counter_lock: success_count += 1 if row_count > 0: return (dt, "success", row_count) else: return (dt, "empty", 0) else: with counter_lock: fail_count += 1 return (dt, "fail", 0) except Exception as e: with counter_lock: fail_count += 1 return (dt, "error", str(e)) def main(): global success_count, fail_count parser = argparse.ArgumentParser(description="按天增量获取数据") parser.add_argument("sql_file", type=str, help="SQL文件路径") parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)") parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD") parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD") parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD") parser.add_argument("--hh", type=str, default=None, help="小时 HH (00-23),需配合 --date 使用") parser.add_argument("--force", action="store_true", help="强制重新获取") parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)") parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)") parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件") parser.add_argument("--feishu", nargs="?", const="__USE_CONFIG__", help="上传到飞书表格") parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID") parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc") parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...") parser.add_argument("--filter", type=str, default=None, help="过滤: 字段=值,字段=值") parser.add_argument("--limit", type=int, default=None, help="上传行数上限") parser.add_argument("--config", type=str, default="default", help="ODPS配置: default 或 piaoquan_api") args = parser.parse_args() # 解析 SQL 文件路径 sql_file = Path(args.sql_file).resolve() if not sql_file.exists(): print(f"错误: 找不到 {sql_file}") return # 加载飞书配置(优先级: 命令行 > {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值) feishu_config = load_feishu_config(sql_file) if args.feishu == "__USE_CONFIG__": args.feishu = feishu_config["token"] elif args.feishu is None: pass # 未启用飞书上传 # 命令行参数覆盖配置文件 if args.sheet_id is None: args.sheet_id = feishu_config["sheet_id"] if args.sort is None: args.sort = feishu_config["sort"] if args.cols is None: args.cols = feishu_config["cols"] if args.filter is None: args.filter = feishu_config["filter"] if args.limit is None: args.limit = feishu_config["limit"] # 打印飞书配置 if args.feishu: print(f"飞书配置: token={args.feishu}, sheet_id={args.sheet_id}, sort={args.sort}, cols={args.cols}") # 输出目录:SQL 同目录下的 output/SQL文件名/ output_dir = sql_file.parent / "output" daily_dir = output_dir / sql_file.stem daily_dir.mkdir(parents=True, exist_ok=True) print(f"SQL文件: {sql_file}") print(f"数据目录: {daily_dir}") # 仅合并模式:不获取数据,直接合并已有文件 if args.merge: existing_dates = get_existing_dates(daily_dir) print(f"已有数据: {len(existing_dates)}天") if existing_dates: merged_file = merge_csv_files(daily_dir) # 如果指定了飞书上传 if args.feishu and merged_file: upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit) else: print("没有可合并的数据") return # 确定日期范围 if args.date: target_dates = [args.date] elif args.start and args.end: target_dates = get_date_range(args.start, args.end) else: today = datetime.now() end_date = (today - timedelta(days=1)).strftime("%Y%m%d") start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d") target_dates = get_date_range(start_date, end_date) print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)") # 检查已有数据 existing_dates = get_existing_dates(daily_dir, args.hh) if args.hh: print(f"已有数据: {len(existing_dates)}天 (hh={args.hh})") else: print(f"已有数据: {len(existing_dates)}天") # 确定需要获取的日期 if args.force: missing_dates = target_dates print(f"强制模式: 重新获取所有 {len(missing_dates)} 天") else: missing_dates = [d for d in target_dates if d not in existing_dates] print(f"需要获取: {len(missing_dates)}天") if not missing_dates: print("没有需要获取的数据,退出") return # 读取 SQL 模板 sql_template = sql_file.read_text(encoding="utf-8") # 检测 SQL 中是否包含 ${dt} 变量 has_dt_var = "${dt}" in sql_template # 重置计数器 success_count = 0 fail_count = 0 # 如果 SQL 中没有 ${dt},只需执行一次 if not has_dt_var: print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...") target_dates = ["20000101"] # 用虚拟日期 missing_dates = target_dates output_file = output_dir / f"{sql_file.stem}.csv" output_file.parent.mkdir(parents=True, exist_ok=True) try: client = ODPSClient(config=args.config) if args.parallel > 0: client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel) else: client.execute_sql_result_save_file(sql_template, str(output_file)) print(f"数据目录: {output_file}") # 如果指定了飞书上传 if args.feishu and output_file.exists(): upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit) except Exception as e: print(f"✗ 执行失败: {e}") return # 并发获取 print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)") workers = min(args.workers, len(missing_dates)) if args.parallel > 0: print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...") else: print(f"\n开始获取 (并发数: {workers})...") with ThreadPoolExecutor(max_workers=workers) as executor: futures = { executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config, args.hh): dt for dt in missing_dates } completed = 0 for future in as_completed(futures): completed += 1 dt, status, info = future.result() if status == "success": print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行") elif status == "empty": print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据") elif status == "error": print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}") else: print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败") print(f"\n完成! 成功: {success_count}, 失败: {fail_count}") print(f"数据目录: {daily_dir}") # 如果指定了飞书上传,先合并再上传 if args.feishu: merged_file = merge_csv_files(daily_dir) if merged_file: upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit) if __name__ == "__main__": main()