data_download.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from concurrent.futures import ThreadPoolExecutor, as_completed
  2. from datetime import timedelta
  3. import pandas as pd
  4. from odps.src.types_c import datetime
  5. from client import ODPSClient
  6. odps_client = ODPSClient.ODPSClient()
  7. dt_list = ["20241108", "20241109", "20241110"]
  8. hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
  9. "18", "19", "20", "21", "22", "23"]
  10. VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
  11. # 定义查询函数
  12. def read_odps(dt: str, hh: str):
  13. # 读取SQL文件
  14. sql = ""
  15. with open(f"{VOV_BASE_PATH}/sql/vovh24_feature.sql", "r") as f:
  16. sql = f.read()
  17. real_sql = (
  18. sql.replace("${bizdate}", dt)
  19. .replace("${hh}", hh)
  20. .replace("${apptype}", "4")
  21. )
  22. print(f"Executing for dt: {dt}, hh: {hh}")
  23. odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}_feature.csv")
  24. # 并行执行函数
  25. def run_parallel():
  26. # 定义线程池的最大线程数
  27. with ThreadPoolExecutor(max_workers=24) as executor:
  28. # 创建任务列表
  29. future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh)
  30. for dt in dt_list
  31. for hh in hh_list
  32. }
  33. # 监控每个任务的完成情况
  34. for future in as_completed(future_to_task):
  35. dt, hh = future_to_task[future]
  36. try:
  37. future.result() # 获取执行结果
  38. print(f"Completed for dt: {dt}, hh: {hh}")
  39. except Exception as exc:
  40. print(f"Error for dt: {dt}, hh: {hh} - {exc}")
  41. def download():
  42. # 执行并行任务
  43. run_parallel()
  44. for dt in dt_list:
  45. csv_list = []
  46. for hh in hh_list:
  47. csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}_feature.csv")
  48. df_list = [pd.read_csv(file) for file in csv_list]
  49. df = pd.concat(df_list, ignore_index=True)
  50. df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}_feature.csv", index=False)
  51. def _main():
  52. download()
  53. if __name__ == "__main__":
  54. _main()