123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- import time
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import Callable, Sequence
- from client import ODPSClient
- odps_client = ODPSClient.ODPSClient()
- def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None:
- """
- 通用任务处理器,将任务分批并发执行。
- :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
- :param max_workers: 最大并发数
- """
- total_tasks = len(tasks)
- task_counter = 0
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_tasks = {}
- for task in tasks:
- task_counter += 1
- print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
- # 提交任务
- future = executor.submit(task)
- future_tasks[future] = (task, task_counter)
- time.sleep(0.01)
- # 控制每批次提交的任务数
- if len(future_tasks) == max_workers or task_counter == total_tasks:
- # 等待当前批次完成
- for future in as_completed(future_tasks):
- task, counter = future_tasks[future]
- try:
- # 获取任务执行结果
- future.result()
- print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
- except Exception as exc:
- print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
- # 清空当前批次任务
- future_tasks = {}
- def ad_download() -> None:
- max_workers = 24
- sql_file_path = "/Users/zhao/Desktop/tzld/ad/sql/特征平均值.sql"
- dts = ["20241206",
- "20241207",
- "20241208",
- "20241209",
- "20241210",
- "20241211",
- "20241212",
- "20241213",
- "20241214",
- "20241215",
- "20241216"]
- def create_task(dt: str) -> Callable[[], None]:
- def task() -> None:
- params = {
- "dt_1": dt,
- "dt_2": dt
- }
- result_file_path = f"/Users/zhao/Desktop/tzld/ad/特征/{dt}.csv"
- print(f"准备任务: {dt}")
- odps_client.execute_sql_file_result_save_fle(
- sql_file_path,
- params,
- result_file_path
- )
- return task
- tasks = [create_task(dt) for dt in dts]
- process_tasks(tasks, max_workers)
- print("数据下载完成。")
- def _main():
- ad_download()
- if __name__ == "__main__":
- _main()
|