data_download.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import math
  2. import time
  3. from concurrent.futures import ThreadPoolExecutor, as_completed
  4. from typing import Callable, Sequence
  5. from client import ODPSClient
  6. odps_client = ODPSClient.ODPSClient()
  7. def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None:
  8. """
  9. 通用任务处理器,将任务分批并发执行。
  10. :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
  11. :param max_workers: 最大并发数
  12. """
  13. total_tasks = len(tasks)
  14. task_counter = 0
  15. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  16. future_tasks = {}
  17. for task in tasks:
  18. task_counter += 1
  19. print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
  20. # 提交任务
  21. future = executor.submit(task)
  22. future_tasks[future] = (task, task_counter)
  23. time.sleep(0.01)
  24. # 控制每批次提交的任务数
  25. if len(future_tasks) == max_workers or task_counter == total_tasks:
  26. # 等待当前批次完成
  27. for future in as_completed(future_tasks):
  28. task, counter = future_tasks[future]
  29. try:
  30. # 获取任务执行结果
  31. future.result()
  32. print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
  33. except Exception as exc:
  34. print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
  35. # 清空当前批次任务
  36. future_tasks = {}
  37. def ad_download() -> None:
  38. batch_size = 10000
  39. total_records = 6731154
  40. max_workers = 100
  41. sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
  42. def create_task(task_index: int) -> Callable[[], None]:
  43. def task() -> None:
  44. offset = task_index * batch_size
  45. params = {
  46. "start_bizdate": "20240909",
  47. "end_bizdate": "20241208",
  48. "l_bizdate": "20241209",
  49. "offset": str(offset),
  50. "size": str(batch_size),
  51. }
  52. result_file_path = f"/Users/zhao/Desktop/tzld/ad/人群选择/csv/{task_index}.csv"
  53. print(f"准备任务: 第 {task_index + 1} 页,记录范围: {offset} - {offset + batch_size}")
  54. odps_client.execute_sql_file_result_save_fle(
  55. sql_file_path,
  56. params,
  57. result_file_path
  58. )
  59. return task
  60. total_pages = math.ceil(total_records / batch_size)
  61. tasks = [create_task(task_index) for task_index in range(total_pages)]
  62. print(f"总页数: {total_pages}")
  63. process_tasks(tasks, max_workers)
  64. print("数据下载完成。")
  65. def _main():
  66. ad_download()
  67. if __name__ == "__main__":
  68. _main()