| 
					
				 | 
			
			
				@@ -0,0 +1,86 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import os 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import sys 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+__dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#sys.path.append(__dir__) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+sys.path.append(os.path.join(__dir__,"tools")) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import json 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from concurrent.futures import ThreadPoolExecutor 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import utils.compress as compress 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from utils.my_hdfs_client import MyHDFSClient 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from utils.oss_client import HangZhouOSSClient 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+# Hadoop 安装目录和配置信息 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+hadoop_home = "/app/env/hadoop-3.2.4" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+configs = { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    "fs.defaultFS": "hdfs://192.168.141.208:9000", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    "hadoop.job.ugi": "" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+hdfs_client = MyHDFSClient(hadoop_home, configs) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+oss_client = HangZhouOSSClient("art-recommend") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def process_file(file_path): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """处理单个文件""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ret, out = hdfs_client._run_cmd(f"text {file_path}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    result=[] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for line in out: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        sample_values = line.rstrip('\n').split('\t') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if(len(sample_values) == 2): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            vid, vec = sample_values 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            result.append({ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "vid":vid, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                "vec":json.loads(vec) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            }) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else : 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 过滤PaxHeader 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            print(f"{sample_values}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    return result 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def write_results(data, output_file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """将结果写入文件""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with open(output_file, 'w') as json_file: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            json_file.write(data) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def thread_task(name, file_list): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """线程任务""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    print(f"Thread {name}: starting file_list:{file_list}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    i=0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for file_path in file_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        i=i+1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        count=len(file_list) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        print(f"Thread {name}: starting file:{file_path} {i}/{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        result=process_file(file_path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        file_name, file_suffix = os.path.splitext(os.path.basename(file_path)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        output_file = f"/app/milvus-{file_name}.json" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        write_results(json.dumps({"rows":result}, indent=4), output_file) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        oss_object_name="dyp/milvus" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        oss_client.put_object_from_file(oss_object_name, output_file) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        #compress.compress_file_tar(output_file, f"{output_file}.tar.gz") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # hdfs_client.delete(f"/dyp/milvus/{file_name}.gz") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # print(f"Thread {name}: ending file:{file_path} {i}/{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    print(f"Thread {name}: finishing") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def main(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    max_workers = 2 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    split_file_list = [ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ['/dyp/vec/part-00017.gz'], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ['/dyp/vec/part-00018.gz'] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    future_list = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    with ThreadPoolExecutor(max_workers=max_workers) as executor: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for i, file_list in enumerate(split_file_list): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            future_list.append(executor.submit(thread_task, f"thread{i}", file_list)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for future in future_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        future.result() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    print("Main program ending") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+if __name__ == "__main__": 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    main() 
			 |