Browse Source

dssm train

丁云鹏 4 months ago
parent
commit
e1a43659f1

+ 0 - 90
recommend-model-produce/src/main/python/models/dssm/infer.py

@@ -1,90 +0,0 @@
-import numpy as np
-import argparse
-import cv2
-
-from paddle.inference import Config
-from paddle.inference import create_predictor
-
-from img_preprocess import preprocess
-
-
-def init_predictor(args):
-    if args.model_dir is not "":
-        config = Config(args.model_dir)
-    else:
-        config = Config(args.model_file, args.params_file)
-
-    config.enable_memory_optim()
-    if args.use_onnxruntime:
-        config.enable_onnxruntime()
-        config.enable_ort_optimization()
-        config.set_cpu_math_library_num_threads(4)
-    else:
-        # If not specific mkldnn, you can set the blas thread.
-        # The thread num should not be greater than the number of cores in the CPU.
-        config.set_cpu_math_library_num_threads(4)
-        config.enable_mkldnn()
-
-    predictor = create_predictor(config)
-    return predictor
-
-
-def run(predictor, img):
-    # copy img data to input tensor
-    input_names = predictor.get_input_names()
-    for i, name in enumerate(input_names):
-        input_tensor = predictor.get_input_handle(name)
-        input_tensor.reshape(img[i].shape)
-        input_tensor.copy_from_cpu(img[i])
-
-    # do the inference
-    predictor.run()
-
-    results = []
-    # get out data from output tensor
-    output_names = predictor.get_output_names()
-    for i, name in enumerate(output_names):
-        output_tensor = predictor.get_output_handle(name)
-        output_data = output_tensor.copy_to_cpu()
-        results.append(output_data)
-
-    return results
-
-
-def parse_args():
-    parser = argparse.ArgumentParser()
-    parser.add_argument(
-        "--model_file",
-        type=str,
-        default="",
-        help="Model filename, Specify this when your model is a combined model."
-    )
-    parser.add_argument(
-        "--params_file",
-        type=str,
-        default="",
-        help=
-        "Parameter filename, Specify this when your model is a combined model."
-    )
-    parser.add_argument(
-        "--model_dir",
-        type=str,
-        default="",
-        help=
-        "Model dir, If you load a non-combined model, specify the directory of the model."
-    )
-    parser.add_argument("--use_onnxruntime",
-                        type=int,
-                        default=0,
-                        help="Whether use onnxruntime.")
-    return parser.parse_args()
-
-
-if __name__ == '__main__':
-    args = parse_args()
-    pred = init_predictor(args)
-    img = cv2.imread('./ILSVRC2012_val_00000247.jpeg')
-    img = preprocess(img)
-    #img = np.ones((1, 3, 224, 224)).astype(np.float32)
-    result = run(pred, [img])
-    print("class index: ", np.argmax(result[0][0]))

+ 84 - 0
recommend-model-produce/src/main/python/models/dssm/milvus_data_process.py

@@ -0,0 +1,84 @@
+import os
+import sys
+
+__dir__ = os.path.dirname(os.path.abspath(__file__))
+#sys.path.append(__dir__)
+sys.path.append(os.path.abspath(os.path.join(__dir__, '..')))
+
+
+import numpy as np
+import json
+from concurrent.futures import ThreadPoolExecutor
+from utils.oss_client import HangZhouOSSClient
+import utils.compress as compress
+from utils.my_hdfs_client import MyHDFSClient
+import paddle.inference as paddle_infer
+
+# Hadoop 安装目录和配置信息
+hadoop_home = "/app/env/hadoop-3.2.4"
+configs = {
+    "fs.defaultFS": "hdfs://192.168.141.208:9000",
+    "hadoop.job.ugi": ""
+}
+hdfs_client = MyHDFSClient(hadoop_home, configs)
+
+def process_file(file_path, model_file, params_file):
+    """处理单个文件"""
+    ret, out = hdfs_client._run_cmd(f"text {file_path}")
+    result=[]
+    for line in out:
+        sample_values = line.rstrip('\n').split('\t')
+        vid, vec = sample_values
+        result.append({
+            "vid":vid,
+            "vec":vec
+        })
+    return result
+
+def write_results(results, output_file):
+    """将结果写入文件"""
+    with open(output_file, 'w') as json_file:
+        for s in results:
+            json_file.write(s + "\n")
+
+def thread_task(name, file_list, model_file, params_file):
+    """线程任务"""
+    print(f"Thread {name}: starting file_list:{file_list}")
+    i=0
+    for file_path in file_list:
+        i=i+1
+        count=len(file_list)
+        print(f"Thread {name}: starting file:{file_path} {i}/{count}")
+        result=process_file(file_path, model_file, params_file))
+        file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
+        output_file = f"/app/milvus-{file_name}.json"
+        write_results({"rows":result}, output_file)
+        hdfs_client.upload(output_file, f"/dyp/vec/{output_file}", multi_processes=1, overwrite=True):
+        results=[]
+        print(f"Thread {name}: ending file:{file_path} {i}/{count}")
+    
+    print(f"Thread {name}: finishing")
+
+def main():
+    init_model_path = "/app/output_model_dssm"
+    client = HangZhouOSSClient("art-recommend")
+    
+
+    max_workers = 2
+
+    split_file_list = [
+        ['/dyp/vec/data_part-00017.gz.json'],
+        ['/dyp/vec/data_part-00018.gz.json']
+    ]
+    future_list = []
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        for i, file_list in enumerate(split_file_list):
+            future_list.append(executor.submit(thread_task, f"thread{i}", file_list, model_file, params_file))
+
+    for future in future_list:
+        future.result()
+
+    print("Main program ending")
+
+if __name__ == "__main__":
+    main()

+ 4 - 2
recommend-model-produce/src/main/python/tools/inferv2.py

@@ -67,9 +67,11 @@ def thread_task(name, file_list, model_file, params_file):
         count=len(file_list)
         print(f"Thread {name}: starting file:{file_path} {i}/{count}")
         results.extend(process_file(file_path, model_file, params_file))
-        output_file = f"/app/data_{os.path.basename(file_path)}.json"
+        file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
+        output_file = f"/app/vec-{file_name}.json"
         write_results(results, output_file)
-        hdfs_client.upload(output_file, f"/dyp/vec/data_{os.path.basename(file_path)}.json", multi_processes=1, overwrite=True):
+        compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
+        hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/vec/{output_file}.gz", multi_processes=1, overwrite=True):
         results=[]
         print(f"Thread {name}: ending file:{file_path} {i}/{count}")
     

+ 9 - 0
recommend-model-produce/src/main/python/tools/utils/compress.py

@@ -15,6 +15,15 @@
 import os
 import tarfile
 
+def compress_file_tar(file_path, output_filename):
+    # 确保输出文件名以 .tar.gz 结尾
+    if not output_filename.endswith('.tar.gz'):
+        output_filename += '.tar.gz'
+    # 创建一个 tarfile 对象,使用 'w:gz' 模式表示写入 gzip 压缩的 tar 包
+    with tarfile.open(output_filename, "w:gz") as tar:
+        # 将文件添加到 tar 包中,arcname 指定在 tar 包中的相对路径
+        tar.add(file_path, arcname=os.path.relpath(file_path))
+
 def compress_tar(folder_path, output_filename):
     # 确保输出文件名以 .tar.gz 结尾
     if not output_filename.endswith('.tar.gz'):