compress_tar.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import tarfile
  2. from pathlib import Path
  3. from tqdm import tqdm
  4. import io
  5. import random
  6. from multiprocessing import Process
  7. def chunked_tarring(rank, file_list, base_folder, output_folder, chunk_size=1024**3):
  8. chunk_count = 1
  9. total_size = 0
  10. saved_count = 0
  11. buffer = io.BytesIO()
  12. tar = tarfile.open(fileobj=buffer, mode="w")
  13. for audio_file in file_list:
  14. txt_file = audio_file.with_suffix(".txt")
  15. if not txt_file.exists():
  16. continue
  17. file_size = audio_file.stat().st_size + txt_file.stat().st_size
  18. if total_size + file_size > chunk_size:
  19. tar.close()
  20. # write the buffer to disk
  21. buffer.seek(0)
  22. with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
  23. f.write(buffer.read())
  24. chunk_count += 1
  25. total_size = 0
  26. buffer.close()
  27. buffer = io.BytesIO()
  28. tar = tarfile.open(fileobj=buffer, mode="w")
  29. tar.add(audio_file, arcname=audio_file.relative_to(base_folder))
  30. tar.add(txt_file, arcname=txt_file.relative_to(base_folder))
  31. total_size += file_size
  32. if saved_count % 1000 == 0:
  33. print(f"Rank {rank}: {saved_count}/{len(file_list)}")
  34. saved_count += 1
  35. tar.close()
  36. buffer.seek(0)
  37. with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
  38. f.write(buffer.read())
  39. print(f"Rank {rank}: {saved_count}/{len(file_list)}")
  40. if __name__ == "__main__":
  41. base_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/cleaned")
  42. output_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/compressed")
  43. output_folder.mkdir(exist_ok=True, parents=True)
  44. num_workers = 50
  45. file_list = list(tqdm(base_folder.rglob("*.flac")))
  46. random.shuffle(file_list)
  47. print(f"Total files: {len(file_list)}")
  48. chunk_size = len(file_list) // num_workers
  49. processes = []
  50. for i in range(num_workers):
  51. start = i * chunk_size
  52. end = (i + 1) * chunk_size
  53. if i == num_workers - 1:
  54. end = len(file_list)
  55. p = Process(target=chunked_tarring, args=(i, file_list[start:end], base_folder, output_folder))
  56. p.start()
  57. processes.append(p)
  58. for p in processes:
  59. p.join()
  60. print("Done")