| 
					
				 | 
			
			
				@@ -18,7 +18,7 @@ configs = { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 hdfs_client = MyHDFSClient(hadoop_home, configs) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-def process_file(file_path, model_file, params_file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def process_file(file_path): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     """处理单个文件""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     ret, out = hdfs_client._run_cmd(f"text {file_path}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     result=[] 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -37,7 +37,7 @@ def write_results(results, output_file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         for s in results: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             json_file.write(s + "\n") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-def thread_task(name, file_list, model_file, params_file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def thread_task(name, file_list): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     """线程任务""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     print(f"Thread {name}: starting file_list:{file_list}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     i=0 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -45,7 +45,7 @@ def thread_task(name, file_list, model_file, params_file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         i=i+1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         count=len(file_list) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         print(f"Thread {name}: starting file:{file_path} {i}/{count}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        result=process_file(file_path, model_file, params_file)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        result=process_file(file_path) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         file_name, file_suffix = os.path.splitext(os.path.basename(file_path)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         output_file = f"/app/milvus-{file_name}.json" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         write_results(json.dumps({"rows":result}), output_file) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -64,7 +64,7 @@ def main(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     future_list = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     with ThreadPoolExecutor(max_workers=max_workers) as executor: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         for i, file_list in enumerate(split_file_list): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            future_list.append(executor.submit(thread_task, f"thread{i}", file_list, model_file, params_file)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            future_list.append(executor.submit(thread_task, f"thread{i}", file_list)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     for future in future_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         future.result() 
			 |