#!/usr/bin/env python # coding=utf-8 """ SQL 执行工具 - 输入 SQL 文件,输出查询结果到同目录下的 CSV 使用示例: python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --start 20251222 --end 20260103 """ import argparse from datetime import datetime, timedelta from pathlib import Path from lib.odps_module import ODPSClient def get_default_dates(): """获取默认日期范围:最近 7 天(start=7天前, end=昨天)""" today = datetime.now() end_date = today - timedelta(days=1) start_date = today - timedelta(days=7) return start_date.strftime('%Y%m%d'), end_date.strftime('%Y%m%d') def parse_variables(var_list: list) -> dict: """解析变量列表为字典""" if not var_list: return {} variables = {} for item in var_list: if '=' in item: key, value = item.split('=', 1) variables[key.strip()] = value.strip() return variables def replace_variables(sql: str, variables: dict) -> str: """替换 SQL 中的 ${variable} 占位符""" for key, value in variables.items(): sql = sql.replace(f'${{{key}}}', value) return sql def run_sql(sql_file: str, output_file: str = None, variables: dict = None, start: str = None, end: str = None, dry_run: bool = False): """ 执行 SQL 文件并保存结果 Args: sql_file: SQL 文件路径 output_file: 输出文件路径(默认与 SQL 同目录同名) variables: 变量替换字典 start: dt 分区起始日期 end: dt 分区结束日期 dry_run: 仅打印 SQL,不执行 """ sql_path = Path(sql_file) # 合并 start/end 到 variables if variables is None: variables = {} if start: variables['start'] = start if end: variables['end'] = end # 输出目录:SQL 同目录下的 output/;文件名:[sql前缀_]日期.csv if output_file is None: output_dir = sql_path.parent / "output" output_dir.mkdir(exist_ok=True) # SQL 文件名作为前缀 sql_stem = sql_path.stem # 去掉 .sql 后缀 prefix = f"{sql_stem}_" if start and end: output_file = output_dir / f"{prefix}{start}_{end}.csv" elif start: output_file = output_dir / f"{prefix}{start}.csv" else: output_file = output_dir / f"{prefix}result.csv" else: output_file = Path(output_file) # 读取 SQL with open(sql_path, 'r', encoding='utf-8') as f: sql = f.read() # 变量替换 if variables: sql = replace_variables(sql, variables) # Dry run 模式 if dry_run: print("=" * 50) print("SQL 预览 (dry-run 模式)") print("=" * 50) print(sql) print("=" * 50) print(f"输出文件: {output_file}") return # 执行 SQL print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 开始执行: {sql_path.name}") odps_client = ODPSClient() odps_client.execute_sql_result_save_file(sql, str(output_file)) print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 完成,结果保存至: {output_file}") def main(): parser = argparse.ArgumentParser( description='执行 SQL 文件并输出结果', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --start 20251222 --end 20260103 python run_sql.py tasks/渠道效果分析/渠道再分享回流.sql --dry-run """ ) parser.add_argument('sql_file', type=str, help='SQL 文件路径') parser.add_argument('--start', type=str, help='dt 分区起始日期,替换 ${start}') parser.add_argument('--end', type=str, help='dt 分区结束日期,替换 ${end}') parser.add_argument('-o', '--output', type=str, help='自定义输出路径') parser.add_argument('--vars', nargs='*', metavar='KEY=VALUE', help='额外变量,如: apptype=36') parser.add_argument('--dry-run', action='store_true', help='仅打印 SQL,不执行') args = parser.parse_args() # 解析变量 variables = parse_variables(args.vars) # 默认日期 start = args.start end = args.end if start is None or end is None: default_start, default_end = get_default_dates() start = start or default_start end = end or default_end print(f"使用默认日期范围: {start} ~ {end}") # 执行 run_sql( sql_file=args.sql_file, output_file=args.output, variables=variables, start=start, end=end, dry_run=args.dry_run ) if __name__ == "__main__": main()