import math import pandas as pd from client import ODPSClient odps_client = ODPSClient.ODPSClient() dt_list = ["20241115", "20241116"] hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23"] VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov" APP_TYPE = "4" # 定义查询函数 def read_odps(dt: str, hh: str): # 读取SQL文件 sql = "" with open(f"{VOV_BASE_PATH}/sql/vovh24_小时_top1000.sql", "r") as f: sql = f.read() real_sql = ( sql.replace("${bizdate}", dt) .replace("${hh}", hh) .replace("${apptype}", APP_TYPE) ) print(f"Executing for dt: {dt}, hh: {hh}") odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv") # 并行执行函数 def run_parallel(): # 定义线程池的最大线程数 with ThreadPoolExecutor(max_workers=24) as executor: # 创建任务列表 future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh) for dt in dt_list for hh in hh_list } # 监控每个任务的完成情况 for future in as_completed(future_to_task): dt, hh = future_to_task[future] try: future.result() # 获取执行结果 print(f"Completed for dt: {dt}, hh: {hh}") except Exception as exc: print(f"Error for dt: {dt}, hh: {hh} - {exc}") def download(): # 执行并行任务 run_parallel() for dt in dt_list: csv_list = [] for hh in hh_list: csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv") df_list = [pd.read_csv(file) for file in csv_list] df = pd.concat(df_list, ignore_index=True) df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False) from concurrent.futures import ThreadPoolExecutor, as_completed def ad_download(): batch_size = 10000 # 计算总页数 total_pages = math.ceil(7633541 / batch_size) print(f"总页数: {total_pages}") # 最大线程数 max_workers = 24 # SQL 文件路径 sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql" # 线程池 with ThreadPoolExecutor(max_workers=max_workers) as executor: for page in range(total_pages): offset = page * batch_size print(f"正在下载第 {page + 1}/{total_pages} 页,记录范围: {offset} - {offset + batch_size}") # 存储任务 future_tasks = {} params = { "start_bizdate": "20240909", "end_bizdate": "20241208", "offset": str(offset), "size": str(batch_size), } result_file_path = f"/Users/zhao/Desktop/{page}.csv" # 提交任务 future = executor.submit( odps_client.execute_sql_file_result_save_fle, sql_file_path, params, result_file_path ) future_tasks[future] = (params, result_file_path) # 监控任务完成情况 for future in as_completed(future_tasks): params, result_file_path = future_tasks[future] try: # 获取任务执行结果 future.result() print(f"Completed: {result_file_path} for date {page}") except Exception as exc: print(f"Error: {exc} for date {page}") print("数据下载完成。") def _main(): ad_download() if __name__ == "__main__": _main()