data_download.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. from typing import Callable, Sequence
  4. from client import ODPSClient
  5. odps_client = ODPSClient.ODPSClient()
  6. def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None:
  7. """
  8. 通用任务处理器,将任务分批并发执行。
  9. :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
  10. :param max_workers: 最大并发数
  11. """
  12. total_tasks = len(tasks)
  13. task_counter = 0
  14. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  15. future_tasks = {}
  16. for task in tasks:
  17. task_counter += 1
  18. print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
  19. # 提交任务
  20. future = executor.submit(task)
  21. future_tasks[future] = (task, task_counter)
  22. time.sleep(0.01)
  23. # 控制每批次提交的任务数
  24. if len(future_tasks) == max_workers or task_counter == total_tasks:
  25. # 等待当前批次完成
  26. for future in as_completed(future_tasks):
  27. task, counter = future_tasks[future]
  28. try:
  29. # 获取任务执行结果
  30. future.result()
  31. print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
  32. except Exception as exc:
  33. print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
  34. # 清空当前批次任务
  35. future_tasks = {}
  36. def ad_download() -> None:
  37. max_workers = 24
  38. sql_file_path = "/Users/zhao/Desktop/tzld/ad/sql/特征平均值.sql"
  39. dts = ["20241206",
  40. "20241207",
  41. "20241208",
  42. "20241209",
  43. "20241210",
  44. "20241211",
  45. "20241212",
  46. "20241213",
  47. "20241214",
  48. "20241215",
  49. "20241216"]
  50. def create_task(dt: str) -> Callable[[], None]:
  51. def task() -> None:
  52. params = {
  53. "dt_1": dt,
  54. "dt_2": dt
  55. }
  56. result_file_path = f"/Users/zhao/Desktop/tzld/ad/特征/{dt}.csv"
  57. print(f"准备任务: {dt}")
  58. odps_client.execute_sql_file_result_save_fle(
  59. sql_file_path,
  60. params,
  61. result_file_path
  62. )
  63. return task
  64. tasks = [create_task(dt) for dt in dts]
  65. process_tasks(tasks, max_workers)
  66. print("数据下载完成。")
  67. def _main():
  68. ad_download()
  69. if __name__ == "__main__":
  70. _main()