import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, Sequence from client import ODPSClient odps_client = ODPSClient.ODPSClient() def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None: """ 通用任务处理器,将任务分批并发执行。 :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数) :param max_workers: 最大并发数 """ total_tasks = len(tasks) task_counter = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: future_tasks = {} for task in tasks: task_counter += 1 print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务") # 提交任务 future = executor.submit(task) future_tasks[future] = (task, task_counter) time.sleep(0.01) # 控制每批次提交的任务数 if len(future_tasks) == max_workers or task_counter == total_tasks: # 等待当前批次完成 for future in as_completed(future_tasks): task, counter = future_tasks[future] try: # 获取任务执行结果 future.result() print(f"任务完成: 第 {counter}/{total_tasks} 个任务") except Exception as exc: print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}") # 清空当前批次任务 future_tasks = {} def ad_download() -> None: max_workers = 24 sql_file_path = "/Users/zhao/Desktop/tzld/ad/sql/特征平均值.sql" dts = ["20241206", "20241207", "20241208", "20241209", "20241210", "20241211", "20241212", "20241213", "20241214", "20241215", "20241216"] def create_task(dt: str) -> Callable[[], None]: def task() -> None: params = { "dt_1": dt, "dt_2": dt } result_file_path = f"/Users/zhao/Desktop/tzld/ad/特征/{dt}.csv" print(f"准备任务: {dt}") odps_client.execute_sql_file_result_save_fle( sql_file_path, params, result_file_path ) return task tasks = [create_task(dt) for dt in dts] process_tasks(tasks, max_workers) print("数据下载完成。") def _main(): ad_download() if __name__ == "__main__": _main()