import os import sys __dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) #sys.path.append(__dir__) sys.path.append(os.path.join(__dir__,"tools")) import json from concurrent.futures import ThreadPoolExecutor import utils.compress as compress from utils.my_hdfs_client import MyHDFSClient # Hadoop 安装目录和配置信息 hadoop_home = "/app/env/hadoop-3.2.4" configs = { "fs.defaultFS": "hdfs://192.168.141.208:9000", "hadoop.job.ugi": "" } hdfs_client = MyHDFSClient(hadoop_home, configs) def process_file(file_path): """处理单个文件""" ret, out = hdfs_client._run_cmd(f"text {file_path}") result=[] for line in out: sample_values = line.rstrip('\n').split('\t') if(len(sample_values) == 2): vid, vec = sample_values result.append({ "vid":vid, "vec":vec }) else : print(f"{sample_values}") return result def write_results(results, output_file): """将结果写入文件""" with open(output_file, 'w') as json_file: for s in results: json_file.write(s + "\n") def thread_task(name, file_list): """线程任务""" print(f"Thread {name}: starting file_list:{file_list}") i=0 for file_path in file_list: i=i+1 count=len(file_list) print(f"Thread {name}: starting file:{file_path} {i}/{count}") result=process_file(file_path) file_name, file_suffix = os.path.splitext(os.path.basename(file_path)) output_file = f"/app/milvus-{file_name}.json" write_results(json.dumps({"rows":result}, indent=4), output_file) compress.compress_file_tar(output_file, f"{output_file}.tar.gz") hdfs_client.delete(f"/dyp/milvus/{file_name}.gz") hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True) print(f"Thread {name}: ending file:{file_path} {i}/{count}") print(f"Thread {name}: finishing") def main(): max_workers = 2 split_file_list = [ ['/dyp/vec/part-00017.gz'], ['/dyp/vec/part-00018.gz'] ] future_list = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: for i, file_list in enumerate(split_file_list): future_list.append(executor.submit(thread_task, f"thread{i}", file_list)) for future in future_list: future.result() print("Main program ending") if __name__ == "__main__": main()