data_download.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import pandas as pd
  2. from client import ODPSClient
  3. odps_client = ODPSClient.ODPSClient()
  4. dt_list = ["20241115", "20241116"]
  5. hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
  6. "18", "19", "20", "21", "22", "23"]
  7. VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
  8. APP_TYPE = "4"
  9. # 定义查询函数
  10. def read_odps(dt: str, hh: str):
  11. # 读取SQL文件
  12. sql = ""
  13. with open(f"{VOV_BASE_PATH}/sql/vovh24_小时_top1000.sql", "r") as f:
  14. sql = f.read()
  15. real_sql = (
  16. sql.replace("${bizdate}", dt)
  17. .replace("${hh}", hh)
  18. .replace("${apptype}", APP_TYPE)
  19. )
  20. print(f"Executing for dt: {dt}, hh: {hh}")
  21. odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
  22. # 并行执行函数
  23. def run_parallel():
  24. # 定义线程池的最大线程数
  25. with ThreadPoolExecutor(max_workers=24) as executor:
  26. # 创建任务列表
  27. future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh)
  28. for dt in dt_list
  29. for hh in hh_list
  30. }
  31. # 监控每个任务的完成情况
  32. for future in as_completed(future_to_task):
  33. dt, hh = future_to_task[future]
  34. try:
  35. future.result() # 获取执行结果
  36. print(f"Completed for dt: {dt}, hh: {hh}")
  37. except Exception as exc:
  38. print(f"Error for dt: {dt}, hh: {hh} - {exc}")
  39. def download():
  40. # 执行并行任务
  41. run_parallel()
  42. for dt in dt_list:
  43. csv_list = []
  44. for hh in hh_list:
  45. csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
  46. df_list = [pd.read_csv(file) for file in csv_list]
  47. df = pd.concat(df_list, ignore_index=True)
  48. df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False)
  49. from concurrent.futures import ThreadPoolExecutor, as_completed
  50. def ad_download():
  51. # 日期列表
  52. date_list = ["20241123", "20241124", "20241125", "20241126", "20241127", "20241128"]
  53. # 最大线程数
  54. max_workers = 24
  55. # SQL 文件路径
  56. sql_file_path = "/Users/zhao/Desktop/广告分析.sql"
  57. # 线程池
  58. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  59. # 存储任务
  60. future_tasks = {}
  61. for date in date_list:
  62. params = {
  63. "bizdate": date,
  64. "hh": "23",
  65. "hhl": "00",
  66. "filter_id": "XXXXXXXX",
  67. "apptype": "3,36,6,17"
  68. }
  69. result_file_path = f"/Users/zhao/Desktop/{date}.csv"
  70. # 提交任务
  71. future = executor.submit(
  72. odps_client.execute_sql_file_result_save_fle,
  73. sql_file_path,
  74. params,
  75. result_file_path
  76. )
  77. future_tasks[future] = (params, result_file_path)
  78. # 监控任务完成情况
  79. for future in as_completed(future_tasks):
  80. params, result_file_path = future_tasks[future]
  81. try:
  82. # 获取任务执行结果
  83. future.result()
  84. print(f"Completed: {result_file_path} for date {params['bizdate']}")
  85. except Exception as exc:
  86. print(f"Error: {exc} for date {params['bizdate']}")
  87. def _main():
  88. ad_download()
  89. if __name__ == "__main__":
  90. _main()