from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd from client import ODPSClient odps_client = ODPSClient.ODPSClient() dt_list = ['20241028', "20241027", "20241026"] hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23"] VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov" # 定义查询函数 def read_odps(dt: str, hh: str): # 读取SQL文件 sql = "" with open(f"{VOV_BASE_PATH}/sql/vov排序_552_562.sql", "r") as f: sql = f.read() real_sql = (sql.replace("${bizdate}", dt) .replace("${hh}", hh)) print(f"Executing for dt: {dt}, hh: {hh}") odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv") # 并行执行函数 def run_parallel(): # 定义线程池的最大线程数 with ThreadPoolExecutor(max_workers=24) as executor: # 创建任务列表 future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh) for dt in dt_list for hh in hh_list } # 监控每个任务的完成情况 for future in as_completed(future_to_task): dt, hh = future_to_task[future] try: future.result() # 获取执行结果 print(f"Completed for dt: {dt}, hh: {hh}") except Exception as exc: print(f"Error for dt: {dt}, hh: {hh} - {exc}") def download(): # 执行并行任务 run_parallel() for dt in dt_list: csv_list = [] for hh in hh_list: csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv") df_list = [pd.read_csv(file) for file in csv_list] df = pd.concat(df_list, ignore_index=True) df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False) def _main(): download() if __name__ == "__main__": _main()