丁云鹏 il y a 4 mois
Parent
commit
d214badbb0

+ 22 - 5
recommend-model-produce/src/main/python/models/dssm/milvus_import.py

@@ -66,12 +66,29 @@ def thread_task(name, file_list):
     print(f"Thread {name}: finishing")
 
 def main():
+    sub_dirs,file_list = hdfs_client.ls_dir('/dw/recommend/model/56_dssm_i2i_itempredData/20241212')   
+    all_file=[]
+    file_extensions=[".gz"]
+    for file in file_list:
+        # 扩展名过滤
+        if file_extensions and not any(file.endswith(ext) for ext in file_extensions):
+            continue    
+        all_file.append(file)
+    print(f"File list : {all_file}")
+
+    max_workers = 8
+    chunk_size = len(all_file) // max_workers
+    remaining = len(all_file) % max_workers
+
+    # 分割列表
+    split_file_list = []
+    for i in range(max_workers):
+        # 计算每份的起始和结束索引
+        start = i * chunk_size + min(i, remaining)
+        end = start + chunk_size + (1 if i < remaining else 0)
+        # 添加分割后的子列表
+        split_file_list.append(all_file[start:end])
 
-    max_workers = 2
-    split_file_list = [
-        ['/dyp/vec/part-00017.gz'],
-        ['/dyp/vec/part-00018.gz']
-    ]
     future_list = []
     with ThreadPoolExecutor(max_workers=max_workers) as executor:
         for i, file_list in enumerate(split_file_list):