import os import sys __dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) #sys.path.append(__dir__) sys.path.append(os.path.join(__dir__,"tools")) import json from concurrent.futures import ThreadPoolExecutor import utils.compress as compress from utils.my_hdfs_client import MyHDFSClient from utils.oss_client import HangZhouOSSClient # Hadoop 安装目录和配置信息 hadoop_home = "/app/env/hadoop-3.2.4" configs = { "fs.defaultFS": "hdfs://192.168.141.208:9000", "hadoop.job.ugi": "" } hdfs_client = MyHDFSClient(hadoop_home, configs) oss_client = HangZhouOSSClient("tz-milvus") def process_file(file_path): """处理单个文件""" ret, out = hdfs_client._run_cmd(f"text {file_path}") result=[] for line in out: sample_values = line.rstrip('\n').split('\t') if(len(sample_values) == 2): vid, vec = sample_values result.append({ "vid":int(vid), "vec":json.loads(vec) }) else : # 过滤PaxHeader print(f"{sample_values}") return result def write_results(data, output_file): """将结果写入文件""" with open(output_file, 'w') as json_file: json_file.write(data) def thread_task(name, file_list): """线程任务""" print(f"Thread {name}: starting file_list:{file_list}") i=0 for file_path in file_list: i=i+1 count=len(file_list) print(f"Thread {name}: starting file:{file_path} {i}/{count}") result=process_file(file_path) file_name, file_suffix = os.path.splitext(os.path.basename(file_path)) output_file = f"/app/milvus-{file_name}.json" write_results(json.dumps({"rows":result}, indent=4), output_file) oss_object_name=f"dyp/milvus/{file_name}.json" oss_client.put_object_from_file(oss_object_name, output_file) #compress.compress_file_tar(output_file, f"{output_file}.tar.gz") # hdfs_client.delete(f"/dyp/milvus/{file_name}.gz") # hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True) # print(f"Thread {name}: ending file:{file_path} {i}/{count}") print(f"Thread {name}: finishing") def main(): sub_dirs,file_list = hdfs_client.ls_dir('/dw/recommend/model/56_dssm_i2i_itempredData/20241212') all_file=[] file_extensions=[".gz"] for file in file_list: # 扩展名过滤 if file_extensions and not any(file.endswith(ext) for ext in file_extensions): continue all_file.append(file) print(f"File list : {all_file}") max_workers = 8 chunk_size = len(all_file) // max_workers remaining = len(all_file) % max_workers # 分割列表 split_file_list = [] for i in range(max_workers): # 计算每份的起始和结束索引 start = i * chunk_size + min(i, remaining) end = start + chunk_size + (1 if i < remaining else 0) # 添加分割后的子列表 split_file_list.append(all_file[start:end]) future_list = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: for i, file_list in enumerate(split_file_list): future_list.append(executor.submit(thread_task, f"thread{i}", file_list)) for future in future_list: future.result() print("Main program ending") if __name__ == "__main__": main()