|
@@ -51,8 +51,10 @@ def thread_task(name, file_list):
|
|
result=process_file(file_path)
|
|
result=process_file(file_path)
|
|
file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
|
|
file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
|
|
output_file = f"/app/milvus-{file_name}.json"
|
|
output_file = f"/app/milvus-{file_name}.json"
|
|
- write_results(json.dumps({"rows":result}), output_file)
|
|
|
|
|
|
+ write_results(json.dumps({"rows":result}, indent=4), output_file)
|
|
compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
|
|
compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
|
|
|
|
+
|
|
|
|
+ hdfs_client.delete(f"/dyp/milvus/{file_name}.gz")
|
|
hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
|
|
hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
|
|
print(f"Thread {name}: ending file:{file_path} {i}/{count}")
|
|
print(f"Thread {name}: ending file:{file_path} {i}/{count}")
|
|
print(f"Thread {name}: finishing")
|
|
print(f"Thread {name}: finishing")
|