export-onnx.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import torch
  2. import torch.nn.functional as F
  3. from fish_speech.conversation import CODEBOOK_PAD_TOKEN_ID
  4. from tools.vqgan.extract_vq import get_model
  5. PAD_TOKEN_ID = torch.LongTensor([CODEBOOK_PAD_TOKEN_ID])
  6. class Encoder(torch.nn.Module):
  7. def __init__(self, model):
  8. super().__init__()
  9. self.model = model
  10. self.model.spec_transform.spectrogram.return_complex = False
  11. def forward(self, audios):
  12. mels = self.model.spec_transform(audios)
  13. encoded_features = self.model.backbone(mels)
  14. z = self.model.quantizer.downsample(encoded_features)
  15. _, indices = self.model.quantizer.residual_fsq(z.transpose(-2, -1))
  16. _, b, l, _ = indices.shape
  17. return indices.permute(1, 0, 3, 2).contiguous().view(b, -1, l)
  18. class Decoder(torch.nn.Module):
  19. def __init__(self, model):
  20. super().__init__()
  21. self.model = model
  22. self.model.head.training = False
  23. self.model.head.checkpointing = False
  24. def get_codes_from_indices(self, cur_index, indices):
  25. _, quantize_dim, _ = indices.shape
  26. d_dim = self.model.quantizer.residual_fsq.rvqs[cur_index].codebooks.shape[2]
  27. if (
  28. quantize_dim
  29. < self.model.quantizer.residual_fsq.rvqs[cur_index].num_quantizers
  30. ):
  31. assert (
  32. self.model.quantizer.residual_fsq.rvqs[cur_index].quantize_dropout > 0.0
  33. ), "quantize dropout must be greater than 0 if you wish to reconstruct from a signal with less fine quantizations"
  34. indices = F.pad(
  35. indices,
  36. (
  37. 0,
  38. self.model.quantizer.residual_fsq.rvqs[cur_index].num_quantizers
  39. - quantize_dim,
  40. ),
  41. value=-1,
  42. )
  43. mask = indices == -1
  44. indices = indices.masked_fill(mask, 0)
  45. all_codes = torch.gather(
  46. self.model.quantizer.residual_fsq.rvqs[cur_index].codebooks.unsqueeze(1),
  47. dim=2,
  48. index=indices.long().permute(2, 0, 1).unsqueeze(-1).repeat(1, 1, 1, d_dim),
  49. )
  50. all_codes = all_codes.masked_fill(mask.permute(2, 0, 1).unsqueeze(-1), 0.0)
  51. scales = (
  52. self.model.quantizer.residual_fsq.rvqs[cur_index]
  53. .scales.unsqueeze(1)
  54. .unsqueeze(1)
  55. )
  56. all_codes = all_codes * scales
  57. return all_codes
  58. def get_output_from_indices(self, cur_index, indices):
  59. codes = self.get_codes_from_indices(cur_index, indices)
  60. codes_summed = codes.sum(dim=0)
  61. return self.model.quantizer.residual_fsq.rvqs[cur_index].project_out(
  62. codes_summed
  63. )
  64. def forward(self, indices) -> torch.Tensor:
  65. batch_size, _, length = indices.shape
  66. dims = self.model.quantizer.residual_fsq.dim
  67. groups = self.model.quantizer.residual_fsq.groups
  68. dim_per_group = dims // groups
  69. # indices = rearrange(indices, "b (g r) l -> g b l r", g=groups)
  70. indices = indices.view(batch_size, groups, -1, length).permute(1, 0, 3, 2)
  71. # z_q = self.model.quantizer.residual_fsq.get_output_from_indices(indices)
  72. z_q = torch.empty((batch_size, length, dims))
  73. for i in range(groups):
  74. z_q[:, :, i * dim_per_group : (i + 1) * dim_per_group] = (
  75. self.get_output_from_indices(i, indices[i])
  76. )
  77. z = self.model.quantizer.upsample(z_q.transpose(1, 2))
  78. x = self.model.head(z)
  79. return x
  80. def main(firefly_gan_vq_path, llama_path, export_prefix):
  81. GanModel = get_model("firefly_gan_vq", firefly_gan_vq_path, device="cpu")
  82. enc = Encoder(GanModel)
  83. dec = Decoder(GanModel)
  84. audio_example = torch.randn(1, 1, 96000)
  85. indices = enc(audio_example)
  86. torch.onnx.export(
  87. enc,
  88. audio_example,
  89. f"{export_prefix}encoder.onnx",
  90. dynamic_axes={
  91. "audio": [0, 2],
  92. },
  93. do_constant_folding=False,
  94. opset_version=18,
  95. verbose=False,
  96. input_names=["audio"],
  97. output_names=["prompt"],
  98. )
  99. torch.onnx.export(
  100. dec,
  101. indices,
  102. f"{export_prefix}decoder.onnx",
  103. dynamic_axes={
  104. "prompt": [0, 2],
  105. },
  106. do_constant_folding=False,
  107. opset_version=18,
  108. verbose=False,
  109. input_names=["prompt"],
  110. output_names=["audio"],
  111. )
  112. if __name__ == "__main__":
  113. main("checkpoints/pre/firefly-gan-vq-fsq-8x1024-21hz-generator.pth", None, "test_")