1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import os
- import sys
- __dir__ = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- #sys.path.append(__dir__)
- sys.path.append(os.path.join(__dir__,"tools"))
- import json
- from concurrent.futures import ThreadPoolExecutor
- import utils.compress as compress
- from utils.my_hdfs_client import MyHDFSClient
- # Hadoop 安装目录和配置信息
- hadoop_home = "/app/env/hadoop-3.2.4"
- configs = {
- "fs.defaultFS": "hdfs://192.168.141.208:9000",
- "hadoop.job.ugi": ""
- }
- hdfs_client = MyHDFSClient(hadoop_home, configs)
- def process_file(file_path):
- """处理单个文件"""
- ret, out = hdfs_client._run_cmd(f"text {file_path}")
- result=[]
- for line in out:
- sample_values = line.rstrip('\n').split('\t')
- if(len(sample_values) == 2):
- vid, vec = sample_values
- result.append({
- "vid":vid,
- "vec":vec
- })
- else :
- print(f"{sample_values}")
- return result
- def write_results(results, output_file):
- """将结果写入文件"""
- with open(output_file, 'w') as json_file:
- for s in results:
- json_file.write(s + "\n")
- def thread_task(name, file_list):
- """线程任务"""
- print(f"Thread {name}: starting file_list:{file_list}")
- i=0
- for file_path in file_list:
- i=i+1
- count=len(file_list)
- print(f"Thread {name}: starting file:{file_path} {i}/{count}")
- result=process_file(file_path)
- file_name, file_suffix = os.path.splitext(os.path.basename(file_path))
- output_file = f"/app/milvus-{file_name}.json"
- write_results(json.dumps({"rows":result}, indent=4), output_file)
- compress.compress_file_tar(output_file, f"{output_file}.tar.gz")
- hdfs_client.delete(f"/dyp/milvus/{file_name}.gz")
- hdfs_client.upload(f"{output_file}.tar.gz", f"/dyp/milvus/{file_name}.gz", multi_processes=1, overwrite=True)
- print(f"Thread {name}: ending file:{file_path} {i}/{count}")
- print(f"Thread {name}: finishing")
- def main():
- max_workers = 2
- split_file_list = [
- ['/dyp/vec/part-00017.gz'],
- ['/dyp/vec/part-00018.gz']
- ]
- future_list = []
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- for i, file_list in enumerate(split_file_list):
- future_list.append(executor.submit(thread_task, f"thread{i}", file_list))
- for future in future_list:
- future.result()
- print("Main program ending")
- if __name__ == "__main__":
- main()
|