123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- import os
- import sys
- import numpy as np
- __dir__ = os.path.dirname(os.path.abspath(__file__))
- sys.path.append(__dir__)
- from utils.oss_client import HangZhouOSSClient
- import utils.compress as compress
- from utils.my_hdfs_client import MyHDFSClient
- # 引用 paddle inference 推理库
- import paddle.inference as paddle_infer
- import json
- hadoop_home = "/app/env/hadoop-3.2.4" # Hadoop 安装目录
- configs = {
- "fs.default.name": "hdfs://192.168.141.208:9000", # HDFS 名称和端口
- "hadoop.job.ugi": "" # HDFS 用户和密码
- }
- hdfs_client = MyHDFSClient(hadoop_home, configs)
- def main():
- init_model_path = "/app/output_model_dssm"
- client = HangZhouOSSClient("art-recommend")
- oss_object_name = "dyp/dssm.tar.gz"
- client.get_object_to_file(oss_object_name, "model.tar.gz")
- compress.uncompress_tar("model.tar.gz", init_model_path)
- assert os.path.exists(init_model_path)
- model_file=os.path.join(init_model_path, "dssm.pdmodel")
- params_file=os.path.join(init_model_path, "dssm.pdiparams")
- # 创建 config
- config = paddle_infer.Config(model_file, params_file)
- # 根据 config 创建 predictor
- predictor = paddle_infer.create_predictor(config)
- # 获取输入的名称
- input_names = predictor.get_input_names()
- input_handle = predictor.get_input_handle(input_names[0])
- output_names = predictor.get_output_names()
- output_handle = predictor.get_output_handle(output_names[0])
- ret, out = hdfs_client._run_cmd("text /dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00016.gz")
- input_data = {}
- for line in out:
- sample_values = line.rstrip('\n').split('\t')
- vid, left_features_str = sample_values
- left_features = [float(x) for x in left_features_str.split(',')]
- input_data[vid] = left_features
-
- # 设置输入
- result = []
- i=0
- count = len(input_data)
- for k,v in input_data.items():
- v2 = np.array([v], dtype=np.float32)
- input_handle.copy_from_cpu(v2)
- # 运行predictor
- predictor.run()
- # 获取输出
- output_data = output_handle.copy_to_cpu() # numpy.ndarray类型
- result.append(k + "\t" + str(output_data.tolist()[0]))
- i=i+1
- if i % 1000 == 0:
- print("write batch {}/{}".format(i, count))
- json_data = json.dumps(result, indent=4) # indent参数用于美化输出,使其更易读
- # 写入文件
- with open('/app/data.json', 'w') as json_file:
- for s in result:
- json_file.write(s + "\n")
- if __name__ == "__main__":
- main()
|