import pandas as pd from client import ODPSClient odps_client = ODPSClient.ODPSClient() dt_list = ["20241115", "20241116"] hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23"] VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov" APP_TYPE = "4" # 定义查询函数 def read_odps(dt: str, hh: str): # 读取SQL文件 sql = "" with open(f"{VOV_BASE_PATH}/sql/vovh24_小时_top1000.sql", "r") as f: sql = f.read() real_sql = ( sql.replace("${bizdate}", dt) .replace("${hh}", hh) .replace("${apptype}", APP_TYPE) ) print(f"Executing for dt: {dt}, hh: {hh}") odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv") # 并行执行函数 def run_parallel(): # 定义线程池的最大线程数 with ThreadPoolExecutor(max_workers=24) as executor: # 创建任务列表 future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh) for dt in dt_list for hh in hh_list } # 监控每个任务的完成情况 for future in as_completed(future_to_task): dt, hh = future_to_task[future] try: future.result() # 获取执行结果 print(f"Completed for dt: {dt}, hh: {hh}") except Exception as exc: print(f"Error for dt: {dt}, hh: {hh} - {exc}") def download(): # 执行并行任务 run_parallel() for dt in dt_list: csv_list = [] for hh in hh_list: csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv") df_list = [pd.read_csv(file) for file in csv_list] df = pd.concat(df_list, ignore_index=True) df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False) from concurrent.futures import ThreadPoolExecutor, as_completed def ad_download(): # 日期列表 date_list = ["20241123", "20241124", "20241125", "20241126", "20241127", "20241128"] # 最大线程数 max_workers = 24 # SQL 文件路径 sql_file_path = "/Users/zhao/Desktop/广告分析.sql" # 线程池 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 存储任务 future_tasks = {} for date in date_list: params = { "bizdate": date, "hh": "23", "hhl": "00", "filter_id": "XXXXXXXX", "apptype": "3,36,6,17" } result_file_path = f"/Users/zhao/Desktop/{date}.csv" # 提交任务 future = executor.submit( odps_client.execute_sql_file_result_save_fle, sql_file_path, params, result_file_path ) future_tasks[future] = (params, result_file_path) # 监控任务完成情况 for future in as_completed(future_tasks): params, result_file_path = future_tasks[future] try: # 获取任务执行结果 future.result() print(f"Completed: {result_file_path} for date {params['bizdate']}") except Exception as exc: print(f"Error: {exc} for date {params['bizdate']}") def _main(): ad_download() if __name__ == "__main__": _main()