Просмотр исходного кода

feat(fetch_daily): 新增飞书表格上传功能

- 支持 --feishu 参数上传数据到飞书表格
- 支持 --sort 多字段排序(如 dt:desc,name:asc)
- 支持 --cols 列映射(筛选/排序/重命名)
- 自动推断列类型(int/float/str)
- 多级配置文件: {sql名}.json > sql目录/default.json > 根目录/default.json

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
yangxiaohui 1 месяц назад
Родитель
Сommit
ac80773c8b
2 измененных файлов с 256 добавлено и 1 удалено
  1. 6 0
      default.json
  2. 250 1
      fetch_daily.py

+ 6 - 0
default.json

@@ -0,0 +1,6 @@
+{
+  "token": "ONZqsxB9BhGH8tt90EScSJT5nHh",
+  "sheet_id": null,
+  "sort": "dt:desc",
+  "cols": null
+}

+ 250 - 1
fetch_daily.py

@@ -13,6 +13,9 @@
     python fetch_daily.py tasks/xxx/query.sql --workers 10       # 设置天级并发数
     python fetch_daily.py tasks/xxx/query.sql --parallel 50      # 单天多线程下载(默认50,大数据量推荐)
     python fetch_daily.py tasks/xxx/query.sql --parallel 0       # 关闭多线程,使用单线程下载
+    python fetch_daily.py tasks/xxx/query.sql --feishu           # 获取后上传到飞书表格
+    python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN     # 指定飞书表格token
+    python fetch_daily.py tasks/xxx/query.sql --merge --feishu   # 仅合并并上传飞书
 """
 import argparse
 import sys
@@ -23,6 +26,7 @@ import threading
 
 sys.path.insert(0, str(Path(__file__).parent / "lib"))
 from odps_module import ODPSClient
+import csv
 
 # 线程安全的计数器
 counter_lock = threading.Lock()
@@ -78,6 +82,216 @@ def merge_csv_files(daily_dir, output_file=None):
     return output_file
 
 
+def infer_column_types(rows):
+    """推断每列的类型:int, float, 或 str"""
+    if not rows:
+        return []
+
+    num_cols = len(rows[0])
+    col_types = []
+
+    for col_idx in range(num_cols):
+        has_float = False
+        all_numeric = True
+
+        for row in rows:
+            if col_idx >= len(row):
+                continue
+            v = row[col_idx].strip() if row[col_idx] else ""
+            if not v:  # 空值不影响类型判断
+                continue
+            try:
+                if '.' in v or 'e' in v.lower():
+                    float(v)
+                    has_float = True
+                else:
+                    int(v)
+            except ValueError:
+                all_numeric = False
+                break
+
+        if all_numeric:
+            col_types.append('float' if has_float else 'int')
+        else:
+            col_types.append('str')
+
+    return col_types
+
+
+def convert_row_by_types(row, col_types):
+    """按列类型转换一行数据"""
+    result = []
+    for i, cell in enumerate(row):
+        if i >= len(col_types):
+            result.append(cell)
+            continue
+
+        v = cell.strip() if cell else ""
+        if not v:
+            result.append("")
+            continue
+
+        col_type = col_types[i]
+        if col_type == 'int':
+            result.append(int(v))
+        elif col_type == 'float':
+            result.append(float(v))
+        else:
+            result.append(cell)
+    return result
+
+
+def load_feishu_config(sql_file):
+    """加载飞书配置,优先级: {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值"""
+    import json
+
+    defaults = {
+        "token": "ONZqsxB9BhGH8tt90EScSJT5nHh",
+        "sheet_id": None,
+        "sort": "dt:desc",
+        "cols": None,
+    }
+
+    root_dir = Path(__file__).parent
+    sql_dir = sql_file.parent
+    sql_name = sql_file.stem
+
+    def load_json(path, name):
+        if path.exists():
+            try:
+                with open(path, "r", encoding="utf-8") as f:
+                    defaults.update(json.load(f))
+            except Exception as e:
+                print(f"警告: 读取 {name} 失败: {e}")
+
+    # 按优先级从低到高加载(后加载的覆盖先加载的)
+    load_json(root_dir / "default.json", "根目录/default.json")
+    load_json(sql_dir / "default.json", "sql目录/default.json")
+    load_json(sql_dir / f"{sql_name}.json", f"{sql_name}.json")
+
+    return defaults
+
+
+def parse_sort_spec(sort_spec):
+    """解析排序规格,如 'dt:desc,name:asc' -> [('dt', True), ('name', False)]"""
+    if not sort_spec:
+        return []
+    result = []
+    for part in sort_spec.split(","):
+        part = part.strip()
+        if not part:
+            continue
+        if ":" in part:
+            field, order = part.rsplit(":", 1)
+            desc = order.lower() != "asc"
+        else:
+            field, desc = part, True  # 默认逆序
+        result.append((field.strip(), desc))
+    return result
+
+
+def parse_cols_spec(cols_spec):
+    """解析列映射规格,如 'dt:日期,name,value:数值' -> [('dt', '日期'), ('name', 'name'), ('value', '数值')]"""
+    if not cols_spec:
+        return []
+    result = []
+    for part in cols_spec.split(","):
+        part = part.strip()
+        if not part:
+            continue
+        if ":" in part:
+            old_name, new_name = part.split(":", 1)
+            result.append((old_name.strip(), new_name.strip()))
+        else:
+            result.append((part, part))
+    return result
+
+
+def apply_cols_mapping(header, data_rows, cols_spec):
+    """应用列映射:筛选、排序、重命名"""
+    col_mapping = parse_cols_spec(cols_spec)
+    if not col_mapping:
+        return header, data_rows
+
+    # 构建索引映射
+    header_index = {name: i for i, name in enumerate(header)}
+    new_header = []
+    col_indices = []
+
+    for old_name, new_name in col_mapping:
+        if old_name in header_index:
+            col_indices.append(header_index[old_name])
+            new_header.append(new_name)
+        else:
+            print(f"警告: 字段 '{old_name}' 不存在,已跳过")
+
+    if not col_indices:
+        print("警告: 没有有效的列映射,保持原样")
+        return header, data_rows
+
+    # 应用映射
+    new_rows = []
+    for row in data_rows:
+        new_row = [row[i] if i < len(row) else "" for i in col_indices]
+        new_rows.append(new_row)
+
+    print(f"列映射: {len(col_indices)} 列")
+    return new_header, new_rows
+
+
+def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None):
+    """上传 CSV 文件到飞书表格
+
+    Args:
+        csv_file: CSV 文件路径
+        sheet_token: 飞书表格 token
+        sheet_id: 工作表 ID(None 时自动获取第一个)
+        sort_spec: 排序规格,如 "dt:desc,name:asc"
+        cols_spec: 列映射规格,如 "dt:日期,name,value:数值"
+    """
+    from feishu import write_data_to_sheet
+
+    # 读取 CSV
+    with open(csv_file, "r", encoding="utf-8") as f:
+        reader = csv.reader(f)
+        rows = list(reader)
+
+    if len(rows) < 2:
+        print("CSV 文件为空,跳过上传")
+        return
+
+    header = rows[0]
+    data_rows = rows[1:]
+
+    # 排序(在列映射之前,使用原始列名)
+    sort_fields = parse_sort_spec(sort_spec)
+    if sort_fields:
+        applied = []
+        for field, desc in reversed(sort_fields):
+            if field in header:
+                idx = header.index(field)
+                data_rows.sort(key=lambda row: row[idx] if idx < len(row) else "", reverse=desc)
+                applied.append(f"{field}:{'desc' if desc else 'asc'}")
+        if applied:
+            print(f"排序: {', '.join(reversed(applied))}")
+
+    # 列映射(排序之后)
+    header, data_rows = apply_cols_mapping(header, data_rows, cols_spec)
+
+    # 按列推断类型并转换
+    col_types = infer_column_types(data_rows)
+    converted_rows = [convert_row_by_types(row, col_types) for row in data_rows]
+    data = [header] + converted_rows
+
+    print(f"上传到飞书: {len(data_rows)} 行数据")
+    write_data_to_sheet(
+        data=data,
+        sheet_token=sheet_token,
+        sheetid=sheet_id,  # None 时自动获取第一个工作表
+    )
+    print(f"飞书上传完成: {sheet_token}")
+
+
 def get_date_range(start_str, end_str):
     """生成日期范围列表"""
     start = datetime.strptime(start_str, "%Y%m%d")
@@ -140,6 +354,11 @@ def main():
     parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
     parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
     parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件")
+    parser.add_argument("--feishu", nargs="?", const="__USE_CONFIG__",
+                        help="上传到飞书表格")
+    parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
+    parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
+    parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
     args = parser.parse_args()
 
     # 解析 SQL 文件路径
@@ -148,6 +367,24 @@ def main():
         print(f"错误: 找不到 {sql_file}")
         return
 
+    # 加载飞书配置(优先级: 命令行 > {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值)
+    feishu_config = load_feishu_config(sql_file)
+    if args.feishu == "__USE_CONFIG__":
+        args.feishu = feishu_config["token"]
+    elif args.feishu is None:
+        pass  # 未启用飞书上传
+    # 命令行参数覆盖配置文件
+    if args.sheet_id is None:
+        args.sheet_id = feishu_config["sheet_id"]
+    if args.sort is None:
+        args.sort = feishu_config["sort"]
+    if args.cols is None:
+        args.cols = feishu_config["cols"]
+
+    # 打印飞书配置
+    if args.feishu:
+        print(f"飞书配置: token={args.feishu}, sheet_id={args.sheet_id}, sort={args.sort}, cols={args.cols}")
+
     # 输出目录:SQL 同目录下的 output/SQL文件名/
     output_dir = sql_file.parent / "output"
     daily_dir = output_dir / sql_file.stem
@@ -161,7 +398,10 @@ def main():
         existing_dates = get_existing_dates(daily_dir)
         print(f"已有数据: {len(existing_dates)}天")
         if existing_dates:
-            merge_csv_files(daily_dir)
+            merged_file = merge_csv_files(daily_dir)
+            # 如果指定了飞书上传
+            if args.feishu and merged_file:
+                upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
         else:
             print("没有可合并的数据")
         return
@@ -220,6 +460,9 @@ def main():
             else:
                 client.execute_sql_result_save_file(sql_template, str(output_file))
             print(f"数据目录: {output_file}")
+            # 如果指定了飞书上传
+            if args.feishu and output_file.exists():
+                upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols)
         except Exception as e:
             print(f"✗ 执行失败: {e}")
         return
@@ -255,6 +498,12 @@ def main():
     print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
     print(f"数据目录: {daily_dir}")
 
+    # 如果指定了飞书上传,先合并再上传
+    if args.feishu:
+        merged_file = merge_csv_files(daily_dir)
+        if merged_file:
+            upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols)
+
 
 if __name__ == "__main__":
     main()