data_download.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from concurrent.futures import ThreadPoolExecutor, as_completed
  2. import pandas as pd
  3. from client import ODPSClient
  4. odps_client = ODPSClient.ODPSClient()
  5. dt_list = ['20241029']
  6. hh_list = ["00", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12", "13", "14", "15", "16", "17",
  7. "18", "19", "20", "21", "22", "23"]
  8. VOV_BASE_PATH = "/Users/zhao/Desktop/tzld/vov"
  9. # 定义查询函数
  10. def read_odps(dt: str, hh: str):
  11. # 读取SQL文件
  12. sql = ""
  13. with open(f"{VOV_BASE_PATH}/sql/vov排序_552_562.sql", "r") as f:
  14. sql = f.read()
  15. real_sql = (sql.replace("${bizdate}", dt)
  16. .replace("${hh}", hh))
  17. print(f"Executing for dt: {dt}, hh: {hh}")
  18. odps_client.execute_sql_result_save_file(real_sql, f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
  19. # 并行执行函数
  20. def run_parallel():
  21. # 定义线程池的最大线程数
  22. with ThreadPoolExecutor(max_workers=24) as executor:
  23. # 创建任务列表
  24. future_to_task = {executor.submit(read_odps, dt, hh): (dt, hh)
  25. for dt in dt_list
  26. for hh in hh_list
  27. }
  28. # 监控每个任务的完成情况
  29. for future in as_completed(future_to_task):
  30. dt, hh = future_to_task[future]
  31. try:
  32. future.result() # 获取执行结果
  33. print(f"Completed for dt: {dt}, hh: {hh}")
  34. except Exception as exc:
  35. print(f"Error for dt: {dt}, hh: {hh} - {exc}")
  36. def download():
  37. # 执行并行任务
  38. run_parallel()
  39. for dt in dt_list:
  40. csv_list = []
  41. for hh in hh_list:
  42. csv_list.append(f"{VOV_BASE_PATH}/csv/{dt}{hh}.csv")
  43. df_list = [pd.read_csv(file) for file in csv_list]
  44. df = pd.concat(df_list, ignore_index=True)
  45. df.to_csv(f"{VOV_BASE_PATH}/csv/{dt}.csv", index=False)
  46. def _main():
  47. download()
  48. if __name__ == "__main__":
  49. _main()