Ver código fonte

feat:修改数据下载脚本

zhaohaipeng 7 meses atrás
pai
commit
4b97843cee
1 arquivos alterados com 54 adições e 88 exclusões
  1. 54 88
      vov/data_download.py

+ 54 - 88
vov/data_download.py

@@ -1,116 +1,82 @@
 import math
-
-import pandas as pd
+import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Callable, Sequence
 
 from client import ODPSClient
 
 odps_client = ODPSClient.ODPSClient()
 
-dt_list = ["20241115", "20241116"]
-hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
-           "18", "19", "20", "21", "22", "23"]
-
-VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
-
-APP_TYPE = "4"
-
-
-# 定义查询函数
-def read_odps(dt: str, hh: str):
-    # 读取SQL文件
-    sql = ""
-    with open(f"{VOV_BASE_PATH}/sql/vovh24_小时_top1000.sql", "r") as f:
-        sql = f.read()
-
-    real_sql = (
-        sql.replace("${bizdate}", dt)
-        .replace("${hh}", hh)
-        .replace("${apptype}", APP_TYPE)
-    )
-    print(f"Executing for dt: {dt}, hh: {hh}")
-    odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
-
-
-# 并行执行函数
-def run_parallel():
-    # 定义线程池的最大线程数
-    with ThreadPoolExecutor(max_workers=24) as executor:
-        # 创建任务列表
-        future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh)
-                          for dt in dt_list
-                          for hh in hh_list
-                          }
-
-        # 监控每个任务的完成情况
-        for future in as_completed(future_to_task):
-            dt, hh = future_to_task[future]
-            try:
-                future.result()  # 获取执行结果
-                print(f"Completed for dt: {dt}, hh: {hh}")
-            except Exception as exc:
-                print(f"Error for dt: {dt}, hh: {hh} - {exc}")
-
-
-def download():
-    # 执行并行任务
-    run_parallel()
-    for dt in dt_list:
-        csv_list = []
-        for hh in hh_list:
-            csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
 
-        df_list = [pd.read_csv(file) for file in csv_list]
-        df = pd.concat(df_list, ignore_index=True)
-        df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False)
+def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int) -> None:
+    """
+    通用任务处理器,将任务分批并发执行。
 
+    :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
+    :param max_workers: 最大并发数
+    """
+    total_tasks = len(tasks)
+    task_counter = 0
 
-from concurrent.futures import ThreadPoolExecutor, as_completed
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        future_tasks = {}
 
+        for task in tasks:
+            task_counter += 1
+            print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
 
-def ad_download():
+            # 提交任务
+            future = executor.submit(task)
+            future_tasks[future] = (task, task_counter)
+
+            time.sleep(0.01)
+
+            # 控制每批次提交的任务数
+            if len(future_tasks) == max_workers or task_counter == total_tasks:
+                # 等待当前批次完成
+                for future in as_completed(future_tasks):
+                    task, counter = future_tasks[future]
+                    try:
+                        # 获取任务执行结果
+                        future.result()
+                        print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
+                    except Exception as exc:
+                        print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
+                # 清空当前批次任务
+                future_tasks = {}
+
+
+def ad_download() -> None:
     batch_size = 10000
-    # 计算总页数
-    total_pages = math.ceil(7633541 / batch_size)
-    print(f"总页数: {total_pages}")
-    # 最大线程数
+    total_records = 7633541
     max_workers = 24
-    # SQL 文件路径
     sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
-    # 线程池
-    with ThreadPoolExecutor(max_workers=max_workers) as executor:
-        for page in range(total_pages):
-            offset = page * batch_size
-            print(f"正在下载第 {page + 1}/{total_pages} 页,记录范围: {offset} - {offset + batch_size}")
-
-            # 存储任务
-            future_tasks = {}
 
+    def create_task(task_index: int) -> Callable[[], None]:
+        def task() -> None:
+            offset = task_index * batch_size
             params = {
                 "start_bizdate": "20240909",
                 "end_bizdate": "20241208",
+                "l_bizdate": "20241209",
                 "offset": str(offset),
                 "size": str(batch_size),
             }
-            result_file_path = f"/Users/zhao/Desktop/{page}.csv"
-
-            # 提交任务
-            future = executor.submit(
-                odps_client.execute_sql_file_result_save_fle,
+            result_file_path = f"/Users/zhao/Desktop/tzld/ad/人群选择/csv/{task_index}.csv"
+            print(f"准备任务: 第 {task_index + 1} 页,记录范围: {offset} - {offset + batch_size}")
+            odps_client.execute_sql_file_result_save_fle(
                 sql_file_path,
                 params,
                 result_file_path
             )
-            future_tasks[future] = (params, result_file_path)
-
-        # 监控任务完成情况
-        for future in as_completed(future_tasks):
-            params, result_file_path = future_tasks[future]
-            try:
-                # 获取任务执行结果
-                future.result()
-                print(f"Completed: {result_file_path} for date {page}")
-            except Exception as exc:
-                print(f"Error: {exc} for date {page}")
+
+        return task
+
+    total_pages = math.ceil(total_records / batch_size)
+    tasks = [create_task(task_index) for task_index in range(total_pages)]
+
+    print(f"总页数: {total_pages}")
+    process_tasks(tasks, max_workers)
     print("数据下载完成。")