| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- #!/usr/bin/env python
- # coding=utf-8
- """
- SQL 执行工具 - 输入 SQL 文件,输出查询结果到同目录下的 CSV
- 使用示例:
- python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql
- python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --start 20251222 --end 20260103
- """
- import argparse
- from datetime import datetime, timedelta
- from pathlib import Path
- from lib.odps_module import ODPSClient
- def get_default_dates():
- """获取默认日期范围:最近 7 天(start=7天前, end=昨天)"""
- today = datetime.now()
- end_date = today - timedelta(days=1)
- start_date = today - timedelta(days=7)
- return start_date.strftime('%Y%m%d'), end_date.strftime('%Y%m%d')
- def parse_variables(var_list: list) -> dict:
- """解析变量列表为字典"""
- if not var_list:
- return {}
- variables = {}
- for item in var_list:
- if '=' in item:
- key, value = item.split('=', 1)
- variables[key.strip()] = value.strip()
- return variables
- def replace_variables(sql: str, variables: dict) -> str:
- """替换 SQL 中的 ${variable} 占位符"""
- for key, value in variables.items():
- sql = sql.replace(f'${{{key}}}', value)
- return sql
- def run_sql(sql_file: str, output_file: str = None, variables: dict = None,
- start: str = None, end: str = None, dry_run: bool = False):
- """
- 执行 SQL 文件并保存结果
- Args:
- sql_file: SQL 文件路径
- output_file: 输出文件路径(默认与 SQL 同目录同名)
- variables: 变量替换字典
- start: dt 分区起始日期
- end: dt 分区结束日期
- dry_run: 仅打印 SQL,不执行
- """
- sql_path = Path(sql_file)
- # 合并 start/end 到 variables
- if variables is None:
- variables = {}
- if start:
- variables['start'] = start
- if end:
- variables['end'] = end
- # 输出目录:SQL 同目录下的 output/;文件名:日期.csv
- if output_file is None:
- output_dir = sql_path.parent / "output"
- output_dir.mkdir(exist_ok=True)
- if start and end:
- output_file = output_dir / f"{start}_{end}.csv"
- elif start:
- output_file = output_dir / f"{start}.csv"
- else:
- output_file = output_dir / "result.csv"
- else:
- output_file = Path(output_file)
- # 读取 SQL
- with open(sql_path, 'r', encoding='utf-8') as f:
- sql = f.read()
- # 变量替换
- if variables:
- sql = replace_variables(sql, variables)
- # Dry run 模式
- if dry_run:
- print("=" * 50)
- print("SQL 预览 (dry-run 模式)")
- print("=" * 50)
- print(sql)
- print("=" * 50)
- print(f"输出文件: {output_file}")
- return
- # 执行 SQL
- print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始执行: {sql_path.name}")
- odps_client = ODPSClient()
- odps_client.execute_sql_result_save_file(sql, str(output_file))
- print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 完成,结果保存至: {output_file}")
- def main():
- parser = argparse.ArgumentParser(
- description='执行 SQL 文件并输出结果',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql
- python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --start 20251222 --end 20260103
- python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --dry-run
- """
- )
- parser.add_argument('sql_file', type=str, help='SQL 文件路径')
- parser.add_argument('--start', type=str, help='dt 分区起始日期,替换 ${start}')
- parser.add_argument('--end', type=str, help='dt 分区结束日期,替换 ${end}')
- parser.add_argument('-o', '--output', type=str, help='自定义输出路径')
- parser.add_argument('--vars', nargs='*', metavar='KEY=VALUE', help='额外变量,如: apptype=36')
- parser.add_argument('--dry-run', action='store_true', help='仅打印 SQL,不执行')
- args = parser.parse_args()
- # 解析变量
- variables = parse_variables(args.vars)
- # 默认日期
- start = args.start
- end = args.end
- if start is None or end is None:
- default_start, default_end = get_default_dates()
- start = start or default_start
- end = end or default_end
- print(f"使用默认日期范围: {start} ~ {end}")
- # 执行
- run_sql(
- sql_file=args.sql_file,
- output_file=args.output,
- variables=variables,
- start=start,
- end=end,
- dry_run=args.dry_run
- )
- if __name__ == "__main__":
- main()
|