丁云鹏 4 meses atrás
pai
commit
44fa38f881

+ 4 - 1
recommend-model-produce/src/main/java/com/tzld/piaoquan/recommend/model/produce/i2i/I2IDSSMService.java

@@ -70,12 +70,15 @@ public class I2IDSSMService {
         JavaRDD<String> rdd = jsc.textFile(file);
 
         // 定义处理数据的函数
-        JavaRDD<String> processedRdd = rdd.map(line -> processLine(line, predictor));
+        JavaRDD<String> processedRdd = rdd.map(line -> processLine(line));
 
         // 将处理后的数据写入新的文件,使用Gzip压缩
         String outputPath = "hdfs:/dyp/vec2";
         processedRdd.coalesce(repartition).saveAsTextFile(outputPath, GzipCodec.class);
     }
+    private String processLine(String line) {
+        return "";
+    }
 
     private String processLine(String line, Predictor predictor) {
 

+ 103 - 0
recommend-model-produce/src/main/python/models/dssm/milvus_data.py

@@ -0,0 +1,103 @@
+import os
+import sys
+
+__dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+#sys.path.append(__dir__)
+sys.path.append(os.path.join(__dir__,"tools"))
+
+import json
+from concurrent.futures import ThreadPoolExecutor
+import utils.compress as compress
+from utils.my_hdfs_client import MyHDFSClient
+from utils.oss_client import HangZhouOSSClient
+
+# Hadoop 安装目录和配置信息
+hadoop_home = "/app/env/hadoop-3.2.4"
+configs = {
+    "fs.defaultFS": "hdfs://192.168.141.208:9000",
+    "hadoop.job.ugi": ""
+}
+hdfs_client = MyHDFSClient(hadoop_home, configs)
+oss_client = HangZhouOSSClient("tz-milvus")
+
+
+def process_file(file_path):
+    """处理单个文件"""
+    ret, out = hdfs_client._run_cmd(f"text {file_path}")
+    result=[]
+    for line in out:
+        sample_values = line.rstrip('\n').split('\t')
+        if(len(sample_values) == 2):
+            vid, vec = sample_values
+            result.append({
+                "vid":int(vid),
+                "vec":json.loads(vec)
+            })
+        else :
+            # 过滤PaxHeader
+            print(f"{sample_values}")
+    return result
+
+def write_results(data, output_file):
+    """将结果写入文件"""
+    with open(output_file, 'w') as json_file:
+            json_file.write(data)
+
+def thread_task(name, file_list):
+    """线程任务"""
+    print(f"Thread {name}: starting file_list:{file_list}")
+    i=0
+    for file_path in file_list:
+        i=i+1
+        count=len(file_list)
+        print(f"Thread {name}: starting file:{file_path} {i}/{count}")
+        result=process_file(file_path)
+        file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
+        output_file = f"/app/milvus-{file_name}.json"
+        write_results(json.dumps({"rows":result}, indent=4), output_file)
+
+        oss_object_name=f"dyp/milvus/{file_name}.json"
+        oss_client.put_object_from_file(oss_object_name, output_file)
+
+        #compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
+        # hdfs_client.delete(f"/dyp/milvus/{file_name}.gz")
+        # hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
+        # print(f"Thread {name}: ending file:{file_path} {i}/{count}")
+    print(f"Thread {name}: finishing")
+
+def main():
+    sub_dirs,file_list = hdfs_client.ls_dir('/dyp/vec')   
+    all_file=[]
+    file_extensions=[".gz"]
+    for file in file_list:
+        # 扩展名过滤
+        if file_extensions and not any(file.endswith(ext) for ext in file_extensions):
+            continue    
+        all_file.append(file)
+    print(f"File list : {all_file}")
+
+    max_workers = 8
+    chunk_size = len(all_file) // max_workers
+    remaining = len(all_file) % max_workers
+
+    # 分割列表
+    split_file_list = []
+    for i in range(max_workers):
+        # 计算每份的起始和结束索引
+        start = i * chunk_size + min(i, remaining)
+        end = start + chunk_size + (1 if i < remaining else 0)
+        # 添加分割后的子列表
+        split_file_list.append(all_file[start:end])
+
+    future_list = []
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        for i, file_list in enumerate(split_file_list):
+            future_list.append(executor.submit(thread_task, f"thread{i}", file_list))
+
+    for future in future_list:
+        future.result()
+
+    print("Main program ending")
+
+if __name__ == "__main__":
+    main()

+ 31 - 103
recommend-model-produce/src/main/python/models/dssm/milvus_import.py

@@ -1,103 +1,31 @@
-import os
-import sys
-
-__dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-#sys.path.append(__dir__)
-sys.path.append(os.path.join(__dir__,"tools"))
-
-import json
-from concurrent.futures import ThreadPoolExecutor
-import utils.compress as compress
-from utils.my_hdfs_client import MyHDFSClient
-from utils.oss_client import HangZhouOSSClient
-
-# Hadoop 安装目录和配置信息
-hadoop_home = "/app/env/hadoop-3.2.4"
-configs = {
-    "fs.defaultFS": "hdfs://192.168.141.208:9000",
-    "hadoop.job.ugi": ""
-}
-hdfs_client = MyHDFSClient(hadoop_home, configs)
-oss_client = HangZhouOSSClient("tz-milvus")
-
-
-def process_file(file_path):
-    """处理单个文件"""
-    ret, out = hdfs_client._run_cmd(f"text {file_path}")
-    result=[]
-    for line in out:
-        sample_values = line.rstrip('\n').split('\t')
-        if(len(sample_values) == 2):
-            vid, vec = sample_values
-            result.append({
-                "vid":int(vid),
-                "vec":json.loads(vec)
-            })
-        else :
-            # 过滤PaxHeader
-            print(f"{sample_values}")
-    return result
-
-def write_results(data, output_file):
-    """将结果写入文件"""
-    with open(output_file, 'w') as json_file:
-            json_file.write(data)
-
-def thread_task(name, file_list):
-    """线程任务"""
-    print(f"Thread {name}: starting file_list:{file_list}")
-    i=0
-    for file_path in file_list:
-        i=i+1
-        count=len(file_list)
-        print(f"Thread {name}: starting file:{file_path} {i}/{count}")
-        result=process_file(file_path)
-        file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
-        output_file = f"/app/milvus-{file_name}.json"
-        write_results(json.dumps({"rows":result}, indent=4), output_file)
-
-        oss_object_name=f"dyp/milvus/{file_name}.json"
-        oss_client.put_object_from_file(oss_object_name, output_file)
-
-        #compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
-        # hdfs_client.delete(f"/dyp/milvus/{file_name}.gz")
-        # hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
-        # print(f"Thread {name}: ending file:{file_path} {i}/{count}")
-    print(f"Thread {name}: finishing")
-
-def main():
-    sub_dirs,file_list = hdfs_client.ls_dir('/dyp/vec')   
-    all_file=[]
-    file_extensions=[".gz"]
-    for file in file_list:
-        # 扩展名过滤
-        if file_extensions and not any(file.endswith(ext) for ext in file_extensions):
-            continue    
-        all_file.append(file)
-    print(f"File list : {all_file}")
-
-    max_workers = 8
-    chunk_size = len(all_file) // max_workers
-    remaining = len(all_file) % max_workers
-
-    # 分割列表
-    split_file_list = []
-    for i in range(max_workers):
-        # 计算每份的起始和结束索引
-        start = i * chunk_size + min(i, remaining)
-        end = start + chunk_size + (1 if i < remaining else 0)
-        # 添加分割后的子列表
-        split_file_list.append(all_file[start:end])
-
-    future_list = []
-    with ThreadPoolExecutor(max_workers=max_workers) as executor:
-        for i, file_list in enumerate(split_file_list):
-            future_list.append(executor.submit(thread_task, f"thread{i}", file_list))
-
-    for future in future_list:
-        future.result()
-
-    print("Main program ending")
-
-if __name__ == "__main__":
-    main()
+\from pymilvus.bulk_writer import bulk_import
+
+# 设置相关参数
+CLOUD_API_ENDPOINT="https://api.cloud.zilliz.com.cn"
+CLUSTER_ID = "in03-8f578d0b86f4589"
+API_KEY = "423a29de63a907e6662b9493c4f95caf799f64f8701cc70db930bb6da7f05914e6ed2374342dc438a8b9d37da0bf164c8ee531bd"
+ACCESS_KEY = "LTAI5tRP4dPZNxdEHw4ZFJXT"
+SECRET_KEY = "a9rtleVwVbNGpZtecChtFMaIaRgrXA"
+COLLECTION_NAME = "video"
+
+# 生成从00000到00064的列表
+seq_list = [str(i).zfill(5) for i in range(65)]
+
+# 循环导入数据
+for seq in seq_list:
+    # 构造object_url,确保使用正确的引号
+    object_url = f"https://tz-milvus.oss-cn-hangzhou.aliyuncs.com/dyp/milvus/part-{seq}.json"
+    
+    # 执行bulk_import
+    res = bulk_import(
+        url=CLOUD_API_ENDPOINT,  # 使用CLOUD_API_ENDPOINT变量
+        api_key=API_KEY,
+        object_url=object_url,  # 使用构造的object_url
+        access_key=ACCESS_KEY,
+        secret_key=SECRET_KEY,
+        cluster_id=CLUSTER_ID,
+        collection_name=COLLECTION_NAME
+    )
+    
+    # 打印响应结果
+    print(res.json())