milvus_data_process.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import os
  2. import sys
  3. __dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  4. #sys.path.append(__dir__)
  5. sys.path.append(os.path.join(__dir__,"tools"))
  6. import json
  7. from concurrent.futures import ThreadPoolExecutor
  8. import utils.compress as compress
  9. from utils.my_hdfs_client import MyHDFSClient
  10. # Hadoop 安装目录和配置信息
  11. hadoop_home = "/app/env/hadoop-3.2.4"
  12. configs = {
  13. "fs.defaultFS": "hdfs://192.168.141.208:9000",
  14. "hadoop.job.ugi": ""
  15. }
  16. hdfs_client = MyHDFSClient(hadoop_home, configs)
  17. def process_file(file_path):
  18. """处理单个文件"""
  19. ret, out = hdfs_client._run_cmd(f"text {file_path}")
  20. result=[]
  21. for line in out:
  22. sample_values = line.rstrip('\n').split('\t')
  23. if(len(sample_values) == 2):
  24. vid, vec = sample_values
  25. result.append({
  26. "vid":vid,
  27. "vec":vec
  28. })
  29. else :
  30. print(f"{sample_values}")
  31. return result
  32. def write_results(results, output_file):
  33. """将结果写入文件"""
  34. with open(output_file, 'w') as json_file:
  35. for s in results:
  36. json_file.write(s + "\n")
  37. def thread_task(name, file_list):
  38. """线程任务"""
  39. print(f"Thread {name}: starting file_list:{file_list}")
  40. i=0
  41. for file_path in file_list:
  42. i=i+1
  43. count=len(file_list)
  44. print(f"Thread {name}: starting file:{file_path} {i}/{count}")
  45. result=process_file(file_path)
  46. file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
  47. output_file = f"/app/milvus-{file_name}.json"
  48. write_results(json.dumps({"rows":result}, indent=4), output_file)
  49. compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
  50. hdfs_client.delete(f"/dyp/milvus/{file_name}.gz")
  51. hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
  52. print(f"Thread {name}: ending file:{file_path} {i}/{count}")
  53. print(f"Thread {name}: finishing")
  54. def main():
  55. max_workers = 2
  56. split_file_list = [
  57. ['/dyp/vec/part-00017.gz'],
  58. ['/dyp/vec/part-00018.gz']
  59. ]
  60. future_list = []
  61. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  62. for i, file_list in enumerate(split_file_list):
  63. future_list.append(executor.submit(thread_task, f"thread{i}", file_list))
  64. for future in future_list:
  65. future.result()
  66. print("Main program ending")
  67. if __name__ == "__main__":
  68. main()