import math import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, Sequence from client import ODPSClient odps_client = ODPSClient.ODPSClient() def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None: """ 通用任务处理器,将任务分批并发执行。 :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数) :param max_workers: 最大并发数 """ total_tasks = len(tasks) task_counter = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: future_tasks = {} for task in tasks: task_counter += 1 print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务") # 提交任务 future = executor.submit(task) future_tasks[future] = (task, task_counter) time.sleep(0.01) # 控制每批次提交的任务数 if len(future_tasks) == max_workers or task_counter == total_tasks: # 等待当前批次完成 for future in as_completed(future_tasks): task, counter = future_tasks[future] try: # 获取任务执行结果 future.result() print(f"任务完成: 第 {counter}/{total_tasks} 个任务") except Exception as exc: print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}") # 清空当前批次任务 future_tasks = {} def ad_download() -> None: batch_size = 10000 total_records = 6731154 max_workers = 100 sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql" def create_task(task_index: int) -> Callable[[], None]: def task() -> None: offset = task_index * batch_size params = { "start_bizdate": "20240909", "end_bizdate": "20241208", "l_bizdate": "20241209", "offset": str(offset), "size": str(batch_size), } result_file_path = f"/Users/zhao/Desktop/tzld/ad/人群选择/csv/{task_index}.csv" print(f"准备任务: 第 {task_index + 1} 页,记录范围: {offset} - {offset + batch_size}") odps_client.execute_sql_file_result_save_fle( sql_file_path, params, result_file_path ) return task total_pages = math.ceil(total_records / batch_size) tasks = [create_task(task_index) for task_index in range(total_pages)] print(f"总页数: {total_pages}") process_tasks(tasks, max_workers) print("数据下载完成。") def _main(): ad_download() if __name__ == "__main__": _main()