|
@@ -94,24 +94,26 @@ def main():
|
|
|
|
|
|
sub_dirs,file_list = hdfs_client.ls_dir('/dw/recommend/model/56_dssm_i2i_itempredData/20241212')
|
|
|
all_file=[]
|
|
|
+ file_extensions=[".gz"]
|
|
|
for file in files:
|
|
|
# 扩展名过滤
|
|
|
- if file_extensions and not any(file.endswith(ext) for ext in ".gz"):
|
|
|
+ if file_extensions and not any(file.endswith(ext) for ext in file_extensions):
|
|
|
continue
|
|
|
all_files.append(file)
|
|
|
print(f"File list : {all_files}")
|
|
|
+
|
|
|
max_workers = 16
|
|
|
- chunk_size = len(file_list) // max_workers
|
|
|
- remaining = len(file_list) % max_workers
|
|
|
+ chunk_size = len(all_file) // max_workers
|
|
|
+ remaining = len(all_file) % max_workers
|
|
|
|
|
|
# 分割列表
|
|
|
split_file_list = []
|
|
|
- for i in range(8):
|
|
|
+ for i in range(max_workers):
|
|
|
# 计算每份的起始和结束索引
|
|
|
start = i * chunk_size + min(i, remaining)
|
|
|
end = start + chunk_size + (1 if i < remaining else 0)
|
|
|
# 添加分割后的子列表
|
|
|
- split_file_list.append(file_list[start:end])
|
|
|
+ split_file_list.append(all_file[start:end])
|
|
|
|
|
|
future_list = []
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|