12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import os
- import sys
- import numpy as np
- import json
- from concurrent.futures import ThreadPoolExecutor
- from utils.oss_client import HangZhouOSSClient
- import utils.compress as compress
- from utils.my_hdfs_client import MyHDFSClient
- import paddle.inference as paddle_infer
- # Hadoop 安装目录和配置信息
- hadoop_home = "/app/env/hadoop-3.2.4"
- configs = {
- "fs.defaultFS": "hdfs://192.168.141.208:9000",
- "hadoop.job.ugi": ""
- }
- hdfs_client = MyHDFSClient(hadoop_home, configs)
- def download_and_extract_model(init_model_path, oss_client, oss_object_name):
- """下载并解压模型"""
- model_tar_path = "model.tar.gz"
- oss_client.get_object_to_file(oss_object_name, model_tar_path)
- compress.uncompress_tar(model_tar_path, init_model_path)
- assert os.path.exists(init_model_path)
- def create_paddle_predictor(model_file, params_file):
- """创建PaddlePaddle的predictor"""
- config = paddle_infer.Config(model_file, params_file)
- predictor = paddle_infer.create_predictor(config)
- return predictor
- def process_file(file_path, model_file, params_file):
- """处理单个文件"""
- predictor = create_paddle_predictor(model_file, params_file)
- ret, out = hdfs_client._run_cmd(f"text {file_path}")
- input_data = {}
- for line in out:
- sample_values = line.rstrip('\n').split('\t')
- vid, left_features_str = sample_values
- left_features = [float(x) for x in left_features_str.split(',')]
- input_data[vid] = left_features
- result = []
- for k, v in input_data.items():
- v2 = np.array([v], dtype=np.float32)
- input_handle = predictor.get_input_handle(predictor.get_input_names()[0])
- input_handle.copy_from_cpu(v2)
- predictor.run()
- output_handle = predictor.get_output_handle(predictor.get_output_names()[0])
- output_data = output_handle.copy_to_cpu()
- result.append(k + "\t" + str(output_data.tolist()[0]))
- return result
- def write_results(results, output_file):
- """将结果写入文件"""
- with open(output_file, 'w') as json_file:
- for s in results:
- json_file.write(s + "\n")
- def thread_task(name, file_list, model_file, params_file):
- """线程任务"""
- print(f"Thread {name}: starting file_list:{file_list}")
- results = []
- for file_path in file_list:
- results.extend(process_file(file_path, model_file, params_file))
- output_file = f"/app/data_{os.path.basename(file_path)}.json"
- write_results(results, output_file)
-
- print(f"Thread {name}: finishing")
- def main():
- init_model_path = "/app/output_model_dssm"
- client = HangZhouOSSClient("art-recommend")
- oss_object_name = "dyp/dssm.tar.gz"
- download_and_extract_model(init_model_path, client, oss_object_name)
- model_file = os.path.join(init_model_path, "dssm.pdmodel")
- params_file = os.path.join(init_model_path, "dssm.pdiparams")
- max_workers = 2
- split_file_list = [
- ['/dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00017.gz'],
- ['/dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00017.gz']
- ]
- future_list = []
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- for i, file_list in enumerate(split_file_list):
- future_list.append(executor.submit(thread_task, f"thread{i}", file_list, model_file, params_file))
- for future in future_list:
- future.result()
- print("Main program ending")
- if __name__ == "__main__":
- main()
|