data_download.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import math
  2. import pandas as pd
  3. from client import ODPSClient
  4. odps_client = ODPSClient.ODPSClient()
  5. dt_list = ["20241115", "20241116"]
  6. hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
  7. "18", "19", "20", "21", "22", "23"]
  8. VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
  9. APP_TYPE = "4"
  10. # 定义查询函数
  11. def read_odps(dt: str, hh: str):
  12. # 读取SQL文件
  13. sql = ""
  14. with open(f"{VOV_BASE_PATH}/sql/vovh24_小时_top1000.sql", "r") as f:
  15. sql = f.read()
  16. real_sql = (
  17. sql.replace("${bizdate}", dt)
  18. .replace("${hh}", hh)
  19. .replace("${apptype}", APP_TYPE)
  20. )
  21. print(f"Executing for dt: {dt}, hh: {hh}")
  22. odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
  23. # 并行执行函数
  24. def run_parallel():
  25. # 定义线程池的最大线程数
  26. with ThreadPoolExecutor(max_workers=24) as executor:
  27. # 创建任务列表
  28. future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh)
  29. for dt in dt_list
  30. for hh in hh_list
  31. }
  32. # 监控每个任务的完成情况
  33. for future in as_completed(future_to_task):
  34. dt, hh = future_to_task[future]
  35. try:
  36. future.result() # 获取执行结果
  37. print(f"Completed for dt: {dt}, hh: {hh}")
  38. except Exception as exc:
  39. print(f"Error for dt: {dt}, hh: {hh} - {exc}")
  40. def download():
  41. # 执行并行任务
  42. run_parallel()
  43. for dt in dt_list:
  44. csv_list = []
  45. for hh in hh_list:
  46. csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
  47. df_list = [pd.read_csv(file) for file in csv_list]
  48. df = pd.concat(df_list, ignore_index=True)
  49. df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False)
  50. from concurrent.futures import ThreadPoolExecutor, as_completed
  51. def ad_download():
  52. batch_size = 10000
  53. # 计算总页数
  54. total_pages = math.ceil(7633541 / batch_size)
  55. print(f"总页数: {total_pages}")
  56. # 最大线程数
  57. max_workers = 24
  58. # SQL 文件路径
  59. sql_file_path = "/Users/zhao/Desktop/tzld/ad/人群选择/v1.sql"
  60. # 线程池
  61. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  62. for page in range(total_pages):
  63. offset = page * batch_size
  64. print(f"正在下载第 {page + 1}/{total_pages} 页,记录范围: {offset} - {offset + batch_size}")
  65. # 存储任务
  66. future_tasks = {}
  67. params = {
  68. "start_bizdate": "20240909",
  69. "end_bizdate": "20241208",
  70. "offset": str(offset),
  71. "size": str(batch_size),
  72. }
  73. result_file_path = f"/Users/zhao/Desktop/{page}.csv"
  74. # 提交任务
  75. future = executor.submit(
  76. odps_client.execute_sql_file_result_save_fle,
  77. sql_file_path,
  78. params,
  79. result_file_path
  80. )
  81. future_tasks[future] = (params, result_file_path)
  82. # 监控任务完成情况
  83. for future in as_completed(future_tasks):
  84. params, result_file_path = future_tasks[future]
  85. try:
  86. # 获取任务执行结果
  87. future.result()
  88. print(f"Completed: {result_file_path} for date {page}")
  89. except Exception as exc:
  90. print(f"Error: {exc} for date {page}")
  91. print("数据下载完成。")
  92. def _main():
  93. ad_download()
  94. if __name__ == "__main__":
  95. _main()