12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import math
- import time
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import Callable, Sequence
- from client import ODPSClient
- odps_client = ODPSClient.ODPSClient()
- def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None:
- """
- 通用任务处理器,将任务分批并发执行。
- :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
- :param max_workers: 最大并发数
- """
- total_tasks = len(tasks)
- task_counter = 0
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_tasks = {}
- for task in tasks:
- task_counter += 1
- print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
- # 提交任务
- future = executor.submit(task)
- future_tasks[future] = (task, task_counter)
- time.sleep(0.01)
- # 控制每批次提交的任务数
- if len(future_tasks) == max_workers or task_counter == total_tasks:
- # 等待当前批次完成
- for future in as_completed(future_tasks):
- task, counter = future_tasks[future]
- try:
- # 获取任务执行结果
- future.result()
- print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
- except Exception as exc:
- print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
- # 清空当前批次任务
- future_tasks = {}
- def ad_download() -> None:
- batch_size = 10000
- total_records = 7633541
- max_workers = 24
- sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
- def create_task(task_index: int) -> Callable[[], None]:
- def task() -> None:
- offset = task_index * batch_size
- params = {
- "start_bizdate": "20240909",
- "end_bizdate": "20241208",
- "l_bizdate": "20241209",
- "offset": str(offset),
- "size": str(batch_size),
- }
- result_file_path = f"/Users/zhao/Desktop/tzld/ad/人群选择/csv/{task_index}.csv"
- print(f"准备任务: 第 {task_index + 1} 页,记录范围: {offset} - {offset + batch_size}")
- odps_client.execute_sql_file_result_save_fle(
- sql_file_path,
- params,
- result_file_path
- )
- return task
- total_pages = math.ceil(total_records / batch_size)
- tasks = [create_task(task_index) for task_index in range(total_pages)]
- print(f"总页数: {total_pages}")
- process_tasks(tasks, max_workers)
- print("数据下载完成。")
- def _main():
- ad_download()
- if __name__ == "__main__":
- _main()
|