| 
					
				 | 
			
			
				@@ -5,14 +5,10 @@ __dir__ = os.path.dirname(os.path.abspath(__file__)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #sys.path.append(__dir__) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import numpy as np 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import json 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from concurrent.futures import ThreadPoolExecutor 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-from utils.oss_client import HangZhouOSSClient 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import utils.compress as compress 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from utils.my_hdfs_client import MyHDFSClient 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-import paddle.inference as paddle_infer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 # Hadoop 安装目录和配置信息 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 hadoop_home = "/app/env/hadoop-3.2.4" 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -52,23 +48,18 @@ def thread_task(name, file_list, model_file, params_file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         result=process_file(file_path, model_file, params_file)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         file_name, file_suffix = os.path.splitext(os.path.basename(file_path)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         output_file = f"/app/milvus-{file_name}.json" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        write_results({"rows":result}, output_file) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        hdfs_client.upload(output_file, f"/dyp/vec/{output_file}", multi_processes=1, overwrite=True): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        results=[] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        write_results(json.dumps({"rows":result}), output_file) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        compress.compress_file_tar(output_file, f"{output_file}.tar.gz") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         print(f"Thread {name}: ending file:{file_path} {i}/{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-     
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     print(f"Thread {name}: finishing") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def main(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    init_model_path = "/app/output_model_dssm" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    client = HangZhouOSSClient("art-recommend") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-     
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     max_workers = 2 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     split_file_list = [ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        ['/dyp/vec/data_part-00017.gz.json'], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        ['/dyp/vec/data_part-00018.gz.json'] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ['/dyp/vec/vec-part-00017.gz'], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ['/dyp/vec/vec-part-00018.gz'] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     ] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     future_list = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     with ThreadPoolExecutor(max_workers=max_workers) as executor: 
			 |