import os import sys import numpy as np import json from concurrent.futures import ThreadPoolExecutor from utils.oss_client import HangZhouOSSClient import utils.compress as compress from utils.my_hdfs_client import MyHDFSClient import paddle.inference as paddle_infer # Hadoop 安装目录和配置信息 hadoop_home = "/app/env/hadoop-3.2.4" configs = { "fs.defaultFS": "hdfs://192.168.141.208:9000", "hadoop.job.ugi": "" } hdfs_client = MyHDFSClient(hadoop_home, configs) def download_and_extract_model(init_model_path, oss_client, oss_object_name): """下载并解压模型""" model_tar_path = "model.tar.gz" oss_client.get_object_to_file(oss_object_name, model_tar_path) compress.uncompress_tar(model_tar_path, init_model_path) assert os.path.exists(init_model_path) def create_paddle_predictor(model_file, params_file): """创建PaddlePaddle的predictor""" config = paddle_infer.Config(model_file, params_file) predictor = paddle_infer.create_predictor(config) return predictor def process_file(file_path, model_file, params_file): """处理单个文件""" predictor = create_paddle_predictor(model_file, params_file) ret, out = hdfs_client._run_cmd(f"text {file_path}") input_data = {} for line in out: sample_values = line.rstrip('\n').split('\t') vid, left_features_str = sample_values left_features = [float(x) for x in left_features_str.split(',')] input_data[vid] = left_features i=0 count=len(input_data) result = [] for k, v in input_data.items(): v2 = np.array([v], dtype=np.float32) input_handle = predictor.get_input_handle(predictor.get_input_names()[0]) input_handle.copy_from_cpu(v2) predictor.run() output_handle = predictor.get_output_handle(predictor.get_output_names()[0]) output_data = output_handle.copy_to_cpu() result.append(k + "\t" + str(output_data.tolist()[0])) i=i+1 if i % 1000 == 0: print(f"Thread {name}: write batch {i}/{count}") return result def write_results(results, output_file): """将结果写入文件""" with open(output_file, 'w') as json_file: for s in results: json_file.write(s + "\n") def thread_task(name, file_list, model_file, params_file): """线程任务""" print(f"Thread {name}: starting file_list:{file_list}") results = [] for file_path in file_list: results.extend(process_file(file_path, model_file, params_file)) output_file = f"/app/data_{os.path.basename(file_path)}.json" write_results(results, output_file) print(f"Thread {name}: finishing") def main(): init_model_path = "/app/output_model_dssm" client = HangZhouOSSClient("art-recommend") oss_object_name = "dyp/dssm.tar.gz" download_and_extract_model(init_model_path, client, oss_object_name) model_file = os.path.join(init_model_path, "dssm.pdmodel") params_file = os.path.join(init_model_path, "dssm.pdiparams") max_workers = 2 split_file_list = [ ['/dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00017.gz'], ['/dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00018.gz'] ] future_list = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: for i, file_list in enumerate(split_file_list): future_list.append(executor.submit(thread_task, f"thread{i}", file_list, model_file, params_file)) for future in future_list: future.result() print("Main program ending") if __name__ == "__main__": main()