milvus_import.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import os
  2. import sys
  3. __dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  4. #sys.path.append(__dir__)
  5. sys.path.append(os.path.join(__dir__,"tools"))
  6. import json
  7. from concurrent.futures import ThreadPoolExecutor
  8. import utils.compress as compress
  9. from utils.my_hdfs_client import MyHDFSClient
  10. from utils.oss_client import HangZhouOSSClient
  11. # Hadoop 安装目录和配置信息
  12. hadoop_home = "/app/env/hadoop-3.2.4"
  13. configs = {
  14. "fs.defaultFS": "hdfs://192.168.141.208:9000",
  15. "hadoop.job.ugi": ""
  16. }
  17. hdfs_client = MyHDFSClient(hadoop_home, configs)
  18. oss_client = HangZhouOSSClient("tz-milvus")
  19. def process_file(file_path):
  20. """处理单个文件"""
  21. ret, out = hdfs_client._run_cmd(f"text {file_path}")
  22. result=[]
  23. for line in out:
  24. sample_values = line.rstrip('\n').split('\t')
  25. if(len(sample_values) == 2):
  26. vid, vec = sample_values
  27. result.append({
  28. "vid":int(vid),
  29. "vec":json.loads(vec)
  30. })
  31. else :
  32. # 过滤PaxHeader
  33. print(f"{sample_values}")
  34. return result
  35. def write_results(data, output_file):
  36. """将结果写入文件"""
  37. with open(output_file, 'w') as json_file:
  38. json_file.write(data)
  39. def thread_task(name, file_list):
  40. """线程任务"""
  41. print(f"Thread {name}: starting file_list:{file_list}")
  42. i=0
  43. for file_path in file_list:
  44. i=i+1
  45. count=len(file_list)
  46. print(f"Thread {name}: starting file:{file_path} {i}/{count}")
  47. result=process_file(file_path)
  48. file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
  49. output_file = f"/app/milvus-{file_name}.json"
  50. write_results(json.dumps({"rows":result}, indent=4), output_file)
  51. oss_object_name=f"dyp/milvus/{file_name}.json"
  52. oss_client.put_object_from_file(oss_object_name, output_file)
  53. #compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
  54. # hdfs_client.delete(f"/dyp/milvus/{file_name}.gz")
  55. # hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
  56. # print(f"Thread {name}: ending file:{file_path} {i}/{count}")
  57. print(f"Thread {name}: finishing")
  58. def main():
  59. sub_dirs,file_list = hdfs_client.ls_dir('/dw/recommend/model/56_dssm_i2i_itempredData/20241212')
  60. all_file=[]
  61. file_extensions=[".gz"]
  62. for file in file_list:
  63. # 扩展名过滤
  64. if file_extensions and not any(file.endswith(ext) for ext in file_extensions):
  65. continue
  66. all_file.append(file)
  67. print(f"File list : {all_file}")
  68. max_workers = 8
  69. chunk_size = len(all_file) // max_workers
  70. remaining = len(all_file) % max_workers
  71. # 分割列表
  72. split_file_list = []
  73. for i in range(max_workers):
  74. # 计算每份的起始和结束索引
  75. start = i * chunk_size + min(i, remaining)
  76. end = start + chunk_size + (1 if i < remaining else 0)
  77. # 添加分割后的子列表
  78. split_file_list.append(all_file[start:end])
  79. future_list = []
  80. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  81. for i, file_list in enumerate(split_file_list):
  82. future_list.append(executor.submit(thread_task, f"thread{i}", file_list))
  83. for future in future_list:
  84. future.result()
  85. print("Main program ending")
  86. if __name__ == "__main__":
  87. main()