Browse Source

dssm train

丁云鹏 4 months ago
parent
commit
b2b542537c

+ 3 - 3
recommend-model-produce/src/main/python/models/dssm/infer.py

@@ -95,12 +95,12 @@ def main():
     sub_dirs,file_list = hdfs_client.ls_dir('/dw/recommend/model/56_dssm_i2i_itempredData/20241212')   
     all_file=[]
     file_extensions=[".gz"]
-    for file in files:
+    for file in file_list:
         # 扩展名过滤
         if file_extensions and not any(file.endswith(ext) for ext in file_extensions):
             continue    
-        all_files.append(file)
-    print(f"File list : {all_files}")
+        all_file.append(file)
+    print(f"File list : {all_file}")
 
     max_workers = 16
     chunk_size = len(all_file) // max_workers

+ 0 - 107
recommend-model-produce/src/main/python/tools/inferv2.py

@@ -1,107 +0,0 @@
-import os
-import sys
-import numpy as np
-import json
-from concurrent.futures import ThreadPoolExecutor
-from utils.oss_client import HangZhouOSSClient
-import utils.compress as compress
-from utils.my_hdfs_client import MyHDFSClient
-import paddle.inference as paddle_infer
-
-# Hadoop 安装目录和配置信息
-hadoop_home = "/app/env/hadoop-3.2.4"
-configs = {
-    "fs.defaultFS": "hdfs://192.168.141.208:9000",
-    "hadoop.job.ugi": ""
-}
-hdfs_client = MyHDFSClient(hadoop_home, configs)
-
-def download_and_extract_model(init_model_path, oss_client, oss_object_name):
-    """下载并解压模型"""
-    model_tar_path = "model.tar.gz"
-    oss_client.get_object_to_file(oss_object_name, model_tar_path)
-    compress.uncompress_tar(model_tar_path, init_model_path)
-    assert os.path.exists(init_model_path)
-
-def create_paddle_predictor(model_file, params_file):
-    """创建PaddlePaddle的predictor"""
-    config = paddle_infer.Config(model_file, params_file)
-    predictor = paddle_infer.create_predictor(config)
-    return predictor
-
-def process_file(file_path, model_file, params_file):
-    """处理单个文件"""
-    predictor = create_paddle_predictor(model_file, params_file)
-    ret, out = hdfs_client._run_cmd(f"text {file_path}")
-    input_data = {}
-    for line in out:
-        sample_values = line.rstrip('\n').split('\t')
-        vid, left_features_str = sample_values
-        left_features = [float(x) for x in left_features_str.split(',')]
-        input_data[vid] = left_features
-
-    result = []
-    for k, v in input_data.items():
-        v2 = np.array([v], dtype=np.float32)
-        input_handle = predictor.get_input_handle(predictor.get_input_names()[0])
-        input_handle.copy_from_cpu(v2)
-        predictor.run()
-        output_handle = predictor.get_output_handle(predictor.get_output_names()[0])
-        output_data = output_handle.copy_to_cpu()
-        result.append(k + "\t" + str(output_data.tolist()[0]))
-    return result
-
-def write_results(results, output_file):
-    """将结果写入文件"""
-    with open(output_file, 'w') as json_file:
-        for s in results:
-            json_file.write(s + "\n")
-
-def thread_task(name, file_list, model_file, params_file):
-    """线程任务"""
-    print(f"Thread {name}: starting file_list:{file_list}")
-    results = []
-    i=0
-    for file_path in file_list:
-        i=i+1
-        count=len(file_list)
-        print(f"Thread {name}: starting file:{file_path} {i}/{count}")
-        results.extend(process_file(file_path, model_file, params_file))
-        file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
-        output_file = f"/app/vec-{file_name}.json"
-        write_results(results, output_file)
-        compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
-        hdfs_client.delete(f"/dyp/vec/{file_name}.gz")
-        hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/vec/{file_name}.gz", multi_processes=1, overwrite=False)
-        results=[]
-        print(f"Thread {name}: ending file:{file_path} {i}/{count}")
-    
-    print(f"Thread {name}: finishing")
-
-def main():
-    init_model_path = "/app/output_model_dssm"
-    client = HangZhouOSSClient("art-recommend")
-    oss_object_name = "dyp/dssm.tar.gz"
-    download_and_extract_model(init_model_path, client, oss_object_name)
-
-    model_file = os.path.join(init_model_path, "dssm.pdmodel")
-    params_file = os.path.join(init_model_path, "dssm.pdiparams")
-
-    max_workers = 2
-
-    split_file_list = [
-        ['/dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00017.gz'],
-        ['/dw/recommend/model/56_dssm_i2i_itempredData/20241206/part-00018.gz']
-    ]
-    future_list = []
-    with ThreadPoolExecutor(max_workers=max_workers) as executor:
-        for i, file_list in enumerate(split_file_list):
-            future_list.append(executor.submit(thread_task, f"thread{i}", file_list, model_file, params_file))
-
-    for future in future_list:
-        future.result()
-
-    print("Main program ending")
-
-if __name__ == "__main__":
-    main()

BIN
recommend-model-produce/src/main/python/tools/utils/__pycache__/__init__.cpython-310.pyc


BIN
recommend-model-produce/src/main/python/tools/utils/__pycache__/compress.cpython-310.pyc


BIN
recommend-model-produce/src/main/python/tools/utils/__pycache__/my_hdfs_client.cpython-310.pyc


BIN
recommend-model-produce/src/main/python/tools/utils/__pycache__/oss_client.cpython-310.pyc