| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771 |
- #!/usr/bin/env python
- # coding=utf-8
- """
- 按天增量获取数据 - 通用版本
- 支持并发获取,自动跳过已有数据
- 用法:
- python fetch_daily.py tasks/xxx/query.sql # 获取最近7天
- python fetch_daily.py tasks/xxx/query.sql --days 30 # 获取最近30天
- python fetch_daily.py tasks/xxx/query.sql --start 20260101 --end 20260107
- python fetch_daily.py tasks/xxx/query.sql --date 20260105 # 单天
- python fetch_daily.py tasks/xxx/query.sql --date 20260105 --hh 08 # 单天单小时
- python fetch_daily.py tasks/xxx/query.sql --force # 强制重新获取
- python fetch_daily.py tasks/xxx/query.sql --workers 10 # 设置天级并发数
- python fetch_daily.py tasks/xxx/query.sql --parallel 50 # 单天多线程下载(默认50,大数据量推荐)
- python fetch_daily.py tasks/xxx/query.sql --parallel 0 # 关闭多线程,使用单线程下载
- python fetch_daily.py tasks/xxx/query.sql --feishu # 获取后上传到飞书表格
- python fetch_daily.py tasks/xxx/query.sql --feishu TOKEN # 指定飞书表格token
- python fetch_daily.py tasks/xxx/query.sql --merge --feishu # 仅合并并上传飞书
- python fetch_daily.py tasks/xxx/query.sql --config piaoquan_api # 切换 ODPS 配置
- """
- import argparse
- import sys
- from datetime import datetime, timedelta
- from pathlib import Path
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import threading
- sys.path.insert(0, str(Path(__file__).parent / "lib"))
- from odps_module import ODPSClient
- import csv
- # 线程安全的计数器
- counter_lock = threading.Lock()
- success_count = 0
- fail_count = 0
- def get_existing_dates(daily_dir, hh=None):
- """获取已下载的日期列表(可选指定小时)"""
- existing = set()
- if not daily_dir.exists():
- return existing
- for f in daily_dir.glob("*.csv"):
- try:
- stem = f.stem
- if hh is not None:
- # 带小时格式:20250101_08
- if len(stem) == 11 and stem[8] == '_':
- dt = stem[:8]
- file_hh = stem[9:11]
- if dt.isdigit() and file_hh == hh:
- existing.add(dt)
- else:
- # 仅日期格式:20250101
- if len(stem) == 8 and stem.isdigit():
- existing.add(stem)
- except:
- pass
- return existing
- def merge_csv_files(daily_dir, output_file=None):
- """合并目录下所有日期 CSV 文件,只保留一个表头"""
- csv_files = sorted(daily_dir.glob("*.csv"))
- if not csv_files:
- print("没有找到 CSV 文件")
- return None
- if output_file is None:
- output_file = daily_dir.parent / f"{daily_dir.name}_merged.csv"
- with open(output_file, "w", encoding="utf-8") as out:
- header_written = False
- total_rows = 0
- for csv_file in csv_files:
- with open(csv_file, "r", encoding="utf-8") as f:
- lines = f.readlines()
- if not lines:
- continue
- if not header_written:
- out.write(lines[0])
- header_written = True
- for line in lines[1:]:
- out.write(line)
- total_rows += 1
- print(f"合并完成: {len(csv_files)} 个文件, {total_rows} 行数据")
- print(f"输出文件: {output_file}")
- return output_file
- def infer_column_types(rows):
- """推断每列的类型:int, float, 或 str"""
- if not rows:
- return []
- num_cols = len(rows[0])
- col_types = []
- for col_idx in range(num_cols):
- has_float = False
- all_numeric = True
- for row in rows:
- if col_idx >= len(row):
- continue
- v = row[col_idx].strip() if row[col_idx] else ""
- if not v: # 空值不影响类型判断
- continue
- try:
- if '.' in v or 'e' in v.lower():
- float(v)
- has_float = True
- else:
- int(v)
- except ValueError:
- all_numeric = False
- break
- if all_numeric:
- col_types.append('float' if has_float else 'int')
- else:
- col_types.append('str')
- return col_types
- def convert_row_by_types(row, col_types):
- """按列类型转换一行数据"""
- result = []
- for i, cell in enumerate(row):
- if i >= len(col_types):
- result.append(cell)
- continue
- v = cell.strip() if cell else ""
- if not v:
- result.append("")
- continue
- col_type = col_types[i]
- if col_type == 'int':
- result.append(int(v))
- elif col_type == 'float':
- result.append(float(v))
- else:
- result.append(cell)
- return result
- def load_feishu_config(sql_file):
- """加载飞书配置,优先级: {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值"""
- import json
- defaults = {
- "token": "ONZqsxB9BhGH8tt90EScSJT5nHh",
- "sheet_id": None,
- "sort": "dt:desc",
- "cols": None,
- "filter": None,
- "limit": None,
- }
- root_dir = Path(__file__).parent
- sql_dir = sql_file.parent
- sql_name = sql_file.stem
- def load_json(path, name):
- if path.exists():
- try:
- with open(path, "r", encoding="utf-8") as f:
- defaults.update(json.load(f))
- except Exception as e:
- print(f"警告: 读取 {name} 失败: {e}")
- # 按优先级从低到高加载(后加载的覆盖先加载的)
- load_json(root_dir / "default.json", "根目录/default.json")
- load_json(sql_dir / "default.json", "sql目录/default.json")
- load_json(sql_dir / f"{sql_name}.json", f"{sql_name}.json")
- return defaults
- def parse_sort_spec(sort_spec):
- """解析排序规格,如 'dt:desc,name:asc' -> [('dt', True), ('name', False)]"""
- if not sort_spec:
- return []
- result = []
- for part in sort_spec.split(","):
- part = part.strip()
- if not part:
- continue
- if ":" in part:
- field, order = part.rsplit(":", 1)
- desc = order.lower() != "asc"
- else:
- field, desc = part, True # 默认逆序
- result.append((field.strip(), desc))
- return result
- def parse_cols_spec(cols_spec):
- """解析列映射规格,如 'dt:日期,name,value:数值' -> [('dt', '日期'), ('name', 'name'), ('value', '数值')]"""
- if not cols_spec:
- return []
- result = []
- for part in cols_spec.split(","):
- part = part.strip()
- if not part:
- continue
- if ":" in part:
- old_name, new_name = part.split(":", 1)
- result.append((old_name.strip(), new_name.strip()))
- else:
- result.append((part, part))
- return result
- def apply_cols_mapping(header, data_rows, cols_spec):
- """应用列映射:筛选、排序、重命名"""
- col_mapping = parse_cols_spec(cols_spec)
- if not col_mapping:
- return header, data_rows
- # 构建索引映射
- header_index = {name: i for i, name in enumerate(header)}
- new_header = []
- col_indices = []
- for old_name, new_name in col_mapping:
- if old_name in header_index:
- col_indices.append(header_index[old_name])
- new_header.append(new_name)
- else:
- print(f"警告: 字段 '{old_name}' 不存在,已跳过")
- if not col_indices:
- print("警告: 没有有效的列映射,保持原样")
- return header, data_rows
- # 应用映射
- new_rows = []
- for row in data_rows:
- new_row = [row[i] if i < len(row) else "" for i in col_indices]
- new_rows.append(new_row)
- print(f"列映射: {len(col_indices)} 列")
- return new_header, new_rows
- def column_index_to_letter(col_idx):
- """列索引转字母,如 1->A, 26->Z, 27->AA"""
- result = ""
- while col_idx > 0:
- col_idx, remainder = divmod(col_idx - 1, 26)
- result = chr(65 + remainder) + result
- return result
- def upload_to_feishu(csv_file, sheet_token, sheet_id=None, sort_spec="dt:desc", cols_spec=None, filter_spec=None, limit=None):
- """上传 CSV 文件到飞书表格(通过模板行继承样式)
- 第1行: 表头
- 第2行: 样式模板(用于继承,最后删除)
- 第3行起: 数据
- Args:
- csv_file: CSV 文件路径
- sheet_token: 飞书表格 token
- sheet_id: 工作表 ID(None 时自动获取第一个)
- sort_spec: 排序规格,如 "dt:desc,name:asc"
- cols_spec: 列映射规格,如 "dt:日期,name,value:数值"
- filter_spec: 过滤条件,dict {"字段": "值"} 或 str "字段=值,字段=值"
- limit: 上传行数上限
- """
- from feishu import Client, LARK_HOST, APP_ID, APP_SECRET, request
- # 读取 CSV
- with open(csv_file, "r", encoding="utf-8") as f:
- reader = csv.reader(f)
- rows = list(reader)
- if len(rows) < 2:
- print("CSV 文件为空,跳过上传")
- return
- header = rows[0]
- data_rows = rows[1:]
- # 排序(在列映射之前,使用原始列名)
- sort_fields = parse_sort_spec(sort_spec)
- if sort_fields:
- applied = []
- for field, desc in reversed(sort_fields):
- if field in header:
- idx = header.index(field)
- data_rows.sort(key=lambda row: row[idx] if idx < len(row) else "", reverse=desc)
- applied.append(f"{field}:{'desc' if desc else 'asc'}")
- if applied:
- print(f"排序: {', '.join(reversed(applied))}")
- # 过滤(排序之后)
- if filter_spec:
- # 支持 dict(来自 JSON 配置)或 str(来自命令行 "字段=值,字段!=值")
- if isinstance(filter_spec, str):
- filters = []
- for part in filter_spec.split(","):
- if "!=" in part:
- k, v = part.split("!=", 1)
- filters.append((k.strip(), v.strip(), "!="))
- elif "=" in part:
- k, v = part.split("=", 1)
- filters.append((k.strip(), v.strip(), "=="))
- elif isinstance(filter_spec, dict):
- filters = [(k, v, "==") for k, v in filter_spec.items()]
- before_count = len(data_rows)
- for field, value, op in filters:
- if field in header:
- idx = header.index(field)
- if op == "!=":
- data_rows = [row for row in data_rows if idx < len(row) and row[idx] != str(value)]
- else:
- data_rows = [row for row in data_rows if idx < len(row) and row[idx] == str(value)]
- print(f"过滤: {filters} → {before_count} → {len(data_rows)} 行")
- # limit(过滤之后)
- if limit and len(data_rows) > limit:
- print(f"限制行数: {len(data_rows)} → {limit}")
- data_rows = data_rows[:limit]
- # 列映射(排序之后)
- header, data_rows = apply_cols_mapping(header, data_rows, cols_spec)
- # 按列推断类型并转换
- col_types = infer_column_types(data_rows)
- converted_rows = [convert_row_by_types(row, col_types) for row in data_rows]
- # 初始化飞书客户端
- client = Client(LARK_HOST)
- access_token = client.get_tenant_access_token(APP_ID, APP_SECRET)
- # 获取 sheet_id
- if sheet_id is None:
- sheet_id = client.get_sheetid(access_token, sheet_token)
- print(f"Sheet ID: {sheet_id}")
- # 获取表格信息
- sheet_props = client.get_sheet_properties(access_token, sheet_token, sheet_id)
- current_cols = sheet_props['column_count'] if sheet_props else 26
- header_end_col = column_index_to_letter(current_cols)
- # 扩展列数(CSV 列数超过当前 sheet 列数时)
- num_csv_cols = len(header)
- if num_csv_cols > current_cols:
- add_cols = num_csv_cols - current_cols
- expand_headers = {
- 'Content-Type': 'application/json; charset=utf-8',
- 'Authorization': f'Bearer {access_token}'
- }
- expand_payload = {
- "dimension": {
- "sheetId": sheet_id,
- "majorDimension": "COLUMNS",
- "length": add_cols
- }
- }
- try:
- request("POST", f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range",
- expand_headers, expand_payload)
- print(f"扩展列数: {current_cols} -> {num_csv_cols} (+{add_cols}列)")
- current_cols = num_csv_cols
- header_end_col = column_index_to_letter(current_cols)
- except Exception as e:
- print(f" 扩展列数失败: {e}")
- # 读取飞书表头(获取所有列)
- feishu_header = client.read_range_values(access_token, sheet_token, f"{sheet_id}!A1:{header_end_col}1")
- feishu_cols = []
- if feishu_header and feishu_header[0]:
- feishu_cols = [c for c in feishu_header[0] if c] # 过滤 None 和空字符串
- # 富文本列转纯文本(飞书表头可能含带链接的 list 结构)
- def _col_to_str(col):
- if isinstance(col, list):
- return "".join(item.get("text", "") for item in col if isinstance(item, dict))
- return col
- if feishu_cols:
- feishu_cols_str = [_col_to_str(c) for c in feishu_cols]
- print(f"飞书表头: {feishu_cols_str}")
- print(f"CSV表头: {header}")
- # 校验字段一致性(警告但继续,以飞书表头为准)
- feishu_set = set(feishu_cols_str)
- csv_set = set(header)
- missing_in_csv = feishu_set - csv_set
- missing_in_feishu = csv_set - feishu_set
- if missing_in_csv:
- print(f"警告: CSV缺少字段(将填空值): {missing_in_csv}")
- if missing_in_feishu:
- print(f"警告: 飞书缺少字段(将忽略): {missing_in_feishu}")
- # 按飞书表头顺序重排数据(用纯文本版本做匹配)
- csv_col_index = {name: i for i, name in enumerate(header)}
- new_converted_rows = []
- for row in converted_rows:
- new_row = []
- for col_name in feishu_cols_str:
- if col_name in csv_col_index:
- new_row.append(row[csv_col_index[col_name]])
- else:
- new_row.append("") # CSV缺少的字段填空
- new_converted_rows.append(new_row)
- converted_rows = new_converted_rows
- header = feishu_cols
- print(f"已按飞书表头顺序重排数据")
- else:
- # 飞书表头为空,用 CSV 表头写入(飞书单次最多写100列,需分批)
- print(f"飞书表头为空,使用 CSV 表头写入")
- col_batch = 100
- for start in range(0, len(header), col_batch):
- end = min(start + col_batch, len(header))
- start_col = column_index_to_letter(start + 1)
- end_col = column_index_to_letter(end)
- batch_range = f"{sheet_id}!{start_col}1:{end_col}1"
- client.batch_update_values(access_token, sheet_token, {
- "valueRanges": [{"range": batch_range, "values": [header[start:end]]}]
- })
- total_rows = len(converted_rows)
- num_cols = len(header)
- end_col = column_index_to_letter(num_cols)
- # 飞书单 sheet 上限 5,000,000 cells,预留表头+模板行
- CELL_LIMIT = 5_000_000
- max_data_rows = (CELL_LIMIT // num_cols) - 2
- if total_rows > max_data_rows:
- print(f"⚠ 飞书 cell 上限 {CELL_LIMIT:,}({num_cols}列 × {max_data_rows}行),截断 {total_rows} → {max_data_rows} 行")
- converted_rows = converted_rows[:max_data_rows]
- total_rows = max_data_rows
- print(f"上传到飞书: {total_rows} 行数据")
- batch_size = 500
- # 获取当前行数(复用之前获取的 sheet_props)
- current_rows = sheet_props['row_count'] if sheet_props else 2
- print(f"当前行数: {current_rows}, 需要数据行: {total_rows}")
- headers = {
- 'Content-Type': 'application/json; charset=utf-8',
- 'Authorization': f'Bearer {access_token}'
- }
- # 第1步:删除旧数据行(保留第1行表头 + 第2行样式模板),分批删除
- if current_rows > 2:
- print(f"清理旧数据({current_rows - 2}行)...")
- rows_to_delete = current_rows - 2
- delete_batch = 5000
- while rows_to_delete > 0:
- # 每次从第3行开始删除,删除后行号会自动调整
- batch = min(rows_to_delete, delete_batch)
- try:
- client.delete_rows(access_token, sheet_token, sheet_id, 3, 2 + batch)
- rows_to_delete -= batch
- if rows_to_delete > 0:
- print(f" 已删除 {current_rows - 2 - rows_to_delete}/{current_rows - 2}")
- except Exception as e:
- print(f" 清理失败: {e}")
- break
- # 第2步:扩展表格容量(insert 不会自动扩展)
- # 删除后当前只有2行(表头+模板),需要扩展到 2 + total_rows 行
- add_url = f"{LARK_HOST}/open-apis/sheets/v2/spreadsheets/{sheet_token}/dimension_range"
- expand_batch = 5000
- remaining = total_rows
- expanded = 0
- while remaining > 0:
- chunk = min(remaining, expand_batch)
- add_payload = {
- "dimension": {
- "sheetId": sheet_id,
- "majorDimension": "ROWS",
- "length": chunk
- }
- }
- try:
- request("POST", add_url, headers, add_payload)
- expanded += chunk
- remaining -= chunk
- except Exception as e:
- print(f" 扩展容量失败(已扩展{expanded}): {e}")
- break
- if expanded > 0:
- print(f"扩展容量: +{expanded} 行")
- # 第3步:分批写入数据到扩展的空行(不再 insert,避免 expand+insert 双重加行超 cell 上限)
- print(f"写入 {total_rows} 行...")
- batches = [converted_rows[i:i + batch_size] for i in range(0, total_rows, batch_size)]
- processed = 0
- for i, batch in enumerate(batches):
- batch_count = len(batch)
- start_row = 3 + i * batch_size # 从第3行开始,顺序写入
- # 写入数据(飞书单次最多100列,需按列分批)
- col_batch = 100
- value_ranges = []
- for col_start in range(0, num_cols, col_batch):
- col_end = min(col_start + col_batch, num_cols)
- sc = column_index_to_letter(col_start + 1)
- ec = column_index_to_letter(col_end)
- col_range = f"{sheet_id}!{sc}{start_row}:{ec}{start_row + batch_count - 1}"
- col_values = [row[col_start:col_end] for row in batch]
- value_ranges.append({"range": col_range, "values": col_values})
- client.batch_update_values(access_token, sheet_token, {
- "valueRanges": value_ranges
- })
- processed += batch_count
- print(f" 处理: {processed}/{total_rows}")
- # 第5步:删除模板行(第2行),仅当初始存在模板行时
- if current_rows >= 2:
- print(f"删除模板行...")
- try:
- client.delete_rows(access_token, sheet_token, sheet_id, 2, 2)
- except Exception as e:
- print(f" 删除模板行失败: {e}")
- print(f"飞书上传完成: {sheet_token}")
- def get_date_range(start_str, end_str):
- """生成日期范围列表"""
- start = datetime.strptime(start_str, "%Y%m%d")
- end = datetime.strptime(end_str, "%Y%m%d")
- dates = []
- current = start
- while current <= end:
- dates.append(current.strftime("%Y%m%d"))
- current += timedelta(days=1)
- return dates
- def fetch_single_day(dt, sql_template, daily_dir, parallel_threads=0, config="default", hh=None):
- """获取单天数据(可选指定小时)"""
- global success_count, fail_count
- try:
- client = ODPSClient(config=config)
- sql = sql_template.replace("${dt}", dt)
- if hh is not None:
- sql = sql.replace("${hh}", hh)
- output_file = daily_dir / f"{dt}_{hh}.csv"
- else:
- output_file = daily_dir / f"{dt}.csv"
- # 下载到文件
- if parallel_threads > 0:
- # 多线程并行下载(适合大数据量)
- client.execute_sql_result_save_file_parallel(sql, str(output_file), workers=parallel_threads)
- else:
- # 单线程下载
- client.execute_sql_result_save_file(sql, str(output_file))
- # 检查结果
- if output_file.exists():
- row_count = sum(1 for _ in open(output_file)) - 1 # 减去表头
- with counter_lock:
- success_count += 1
- if row_count > 0:
- return (dt, "success", row_count)
- else:
- return (dt, "empty", 0)
- else:
- with counter_lock:
- fail_count += 1
- return (dt, "fail", 0)
- except Exception as e:
- with counter_lock:
- fail_count += 1
- return (dt, "error", str(e))
- def main():
- global success_count, fail_count
- parser = argparse.ArgumentParser(description="按天增量获取数据")
- parser.add_argument("sql_file", type=str, help="SQL文件路径")
- parser.add_argument("--days", type=int, default=7, help="获取最近N天 (默认7)")
- parser.add_argument("--start", type=str, help="开始日期 YYYYMMDD")
- parser.add_argument("--end", type=str, help="结束日期 YYYYMMDD")
- parser.add_argument("--date", type=str, help="单天日期 YYYYMMDD")
- parser.add_argument("--hh", type=str, default=None, help="小时 HH (00-23),需配合 --date 使用")
- parser.add_argument("--force", action="store_true", help="强制重新获取")
- parser.add_argument("--workers", type=int, default=5, help="天级并发数 (默认5)")
- parser.add_argument("--parallel", type=int, default=50, help="单天多线程下载 (默认50, 大数据量推荐)")
- parser.add_argument("--merge", action="store_true", help="合并所有日期数据到一个文件")
- parser.add_argument("--feishu", nargs="?", const="__USE_CONFIG__",
- help="上传到飞书表格")
- parser.add_argument("--sheet-id", type=str, default=None, help="飞书工作表ID")
- parser.add_argument("--sort", type=str, default=None, help="排序: 字段:asc/desc")
- parser.add_argument("--cols", type=str, default=None, help="列映射: 原名:新名,...")
- parser.add_argument("--filter", type=str, default=None, help="过滤: 字段=值,字段=值")
- parser.add_argument("--limit", type=int, default=None, help="上传行数上限")
- parser.add_argument("--config", type=str, default="default", help="ODPS配置: default 或 piaoquan_api")
- args = parser.parse_args()
- # 解析 SQL 文件路径
- sql_file = Path(args.sql_file).resolve()
- if not sql_file.exists():
- print(f"错误: 找不到 {sql_file}")
- return
- # 加载飞书配置(优先级: 命令行 > {sql名}.json > sql目录/default.json > 根目录/default.json > 默认值)
- feishu_config = load_feishu_config(sql_file)
- if args.feishu == "__USE_CONFIG__":
- args.feishu = feishu_config["token"]
- elif args.feishu is None:
- pass # 未启用飞书上传
- # 命令行参数覆盖配置文件
- if args.sheet_id is None:
- args.sheet_id = feishu_config["sheet_id"]
- if args.sort is None:
- args.sort = feishu_config["sort"]
- if args.cols is None:
- args.cols = feishu_config["cols"]
- if args.filter is None:
- args.filter = feishu_config["filter"]
- if args.limit is None:
- args.limit = feishu_config["limit"]
- # 打印飞书配置
- if args.feishu:
- print(f"飞书配置: token={args.feishu}, sheet_id={args.sheet_id}, sort={args.sort}, cols={args.cols}")
- # 输出目录:SQL 同目录下的 output/SQL文件名/
- output_dir = sql_file.parent / "output"
- daily_dir = output_dir / sql_file.stem
- daily_dir.mkdir(parents=True, exist_ok=True)
- print(f"SQL文件: {sql_file}")
- print(f"数据目录: {daily_dir}")
- # 仅合并模式:不获取数据,直接合并已有文件
- if args.merge:
- existing_dates = get_existing_dates(daily_dir)
- print(f"已有数据: {len(existing_dates)}天")
- if existing_dates:
- merged_file = merge_csv_files(daily_dir)
- # 如果指定了飞书上传
- if args.feishu and merged_file:
- upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit)
- else:
- print("没有可合并的数据")
- return
- # 确定日期范围
- if args.date:
- target_dates = [args.date]
- elif args.start and args.end:
- target_dates = get_date_range(args.start, args.end)
- else:
- today = datetime.now()
- end_date = (today - timedelta(days=1)).strftime("%Y%m%d")
- start_date = (today - timedelta(days=args.days)).strftime("%Y%m%d")
- target_dates = get_date_range(start_date, end_date)
- print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
- # 检查已有数据
- existing_dates = get_existing_dates(daily_dir, args.hh)
- if args.hh:
- print(f"已有数据: {len(existing_dates)}天 (hh={args.hh})")
- else:
- print(f"已有数据: {len(existing_dates)}天")
- # 确定需要获取的日期
- if args.force:
- missing_dates = target_dates
- print(f"强制模式: 重新获取所有 {len(missing_dates)} 天")
- else:
- missing_dates = [d for d in target_dates if d not in existing_dates]
- print(f"需要获取: {len(missing_dates)}天")
- if not missing_dates:
- print("没有需要获取的数据,退出")
- return
- # 读取 SQL 模板
- sql_template = sql_file.read_text(encoding="utf-8")
- # 检测 SQL 中是否包含 ${dt} 变量
- has_dt_var = "${dt}" in sql_template
- # 重置计数器
- success_count = 0
- fail_count = 0
- # 如果 SQL 中没有 ${dt},只需执行一次
- if not has_dt_var:
- print("\n检测到 SQL 中不含 ${dt} 变量,只执行一次...")
- target_dates = ["20000101"] # 用虚拟日期
- missing_dates = target_dates
- output_file = output_dir / f"{sql_file.stem}.csv"
- output_file.parent.mkdir(parents=True, exist_ok=True)
- try:
- client = ODPSClient(config=args.config)
- if args.parallel > 0:
- client.execute_sql_result_save_file_parallel(sql_template, str(output_file), workers=args.parallel)
- else:
- client.execute_sql_result_save_file(sql_template, str(output_file))
- print(f"数据目录: {output_file}")
- # 如果指定了飞书上传
- if args.feishu and output_file.exists():
- upload_to_feishu(output_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit)
- except Exception as e:
- print(f"✗ 执行失败: {e}")
- return
- # 并发获取
- print(f"目标日期: {target_dates[0]} ~ {target_dates[-1]} ({len(target_dates)}天)")
- workers = min(args.workers, len(missing_dates))
- if args.parallel > 0:
- print(f"\n开始获取 (天级并发: {workers}, 单天多线程: {args.parallel})...")
- else:
- print(f"\n开始获取 (并发数: {workers})...")
- with ThreadPoolExecutor(max_workers=workers) as executor:
- futures = {
- executor.submit(fetch_single_day, dt, sql_template, daily_dir, args.parallel, args.config, args.hh): dt
- for dt in missing_dates
- }
- completed = 0
- for future in as_completed(futures):
- completed += 1
- dt, status, info = future.result()
- if status == "success":
- print(f" [{completed}/{len(missing_dates)}] ✓ {dt}: {info} 行")
- elif status == "empty":
- print(f" [{completed}/{len(missing_dates)}] ⚠ {dt}: 无数据")
- elif status == "error":
- print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: {info}")
- else:
- print(f" [{completed}/{len(missing_dates)}] ✗ {dt}: 失败")
- print(f"\n完成! 成功: {success_count}, 失败: {fail_count}")
- print(f"数据目录: {daily_dir}")
- # 如果指定了飞书上传,先合并再上传
- if args.feishu:
- merged_file = merge_csv_files(daily_dir)
- if merged_file:
- upload_to_feishu(merged_file, args.feishu, args.sheet_id, args.sort, args.cols, args.filter, args.limit)
- if __name__ == "__main__":
- main()
|