compress_tar.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import io
  2. import random
  3. import tarfile
  4. from multiprocessing import Process
  5. from pathlib import Path
  6. from tqdm import tqdm
  7. def chunked_tarring(rank, file_list, base_folder, output_folder, chunk_size=1024**3):
  8. chunk_count = 1
  9. total_size = 0
  10. saved_count = 0
  11. buffer = io.BytesIO()
  12. tar = tarfile.open(fileobj=buffer, mode="w")
  13. for audio_file in file_list:
  14. txt_file = audio_file.with_suffix(".txt")
  15. if not txt_file.exists():
  16. continue
  17. file_size = audio_file.stat().st_size + txt_file.stat().st_size
  18. if total_size + file_size > chunk_size:
  19. tar.close()
  20. # write the buffer to disk
  21. buffer.seek(0)
  22. with open(
  23. output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb"
  24. ) as f:
  25. f.write(buffer.read())
  26. chunk_count += 1
  27. total_size = 0
  28. buffer.close()
  29. buffer = io.BytesIO()
  30. tar = tarfile.open(fileobj=buffer, mode="w")
  31. tar.add(audio_file, arcname=audio_file.relative_to(base_folder))
  32. tar.add(txt_file, arcname=txt_file.relative_to(base_folder))
  33. total_size += file_size
  34. if saved_count % 1000 == 0:
  35. print(f"Rank {rank}: {saved_count}/{len(file_list)}")
  36. saved_count += 1
  37. tar.close()
  38. buffer.seek(0)
  39. with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
  40. f.write(buffer.read())
  41. print(f"Rank {rank}: {saved_count}/{len(file_list)}")
  42. if __name__ == "__main__":
  43. base_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/cleaned")
  44. output_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/compressed")
  45. output_folder.mkdir(exist_ok=True, parents=True)
  46. num_workers = 50
  47. file_list = list(tqdm(base_folder.rglob("*.flac")))
  48. random.shuffle(file_list)
  49. print(f"Total files: {len(file_list)}")
  50. chunk_size = len(file_list) // num_workers
  51. processes = []
  52. for i in range(num_workers):
  53. start = i * chunk_size
  54. end = (i + 1) * chunk_size
  55. if i == num_workers - 1:
  56. end = len(file_list)
  57. p = Process(
  58. target=chunked_tarring,
  59. args=(i, file_list[start:end], base_folder, output_folder),
  60. )
  61. p.start()
  62. processes.append(p)
  63. for p in processes:
  64. p.join()
  65. print("Done")