import torch from torch import nn from fish_speech.models.vqgan.modules.modules import WN, Flip from fish_speech.models.vqgan.modules.normalization import LayerNorm from fish_speech.models.vqgan.modules.transformer import FFN, MultiHeadAttention class ResidualCouplingBlock(nn.Module): def __init__( self, channels, hidden_channels, kernel_size, dilation_rate, n_layers, n_flows=4, gin_channels=0, ): super().__init__() self.channels = channels self.hidden_channels = hidden_channels self.kernel_size = kernel_size self.dilation_rate = dilation_rate self.n_layers = n_layers self.n_flows = n_flows self.gin_channels = gin_channels self.flows = nn.ModuleList() for i in range(n_flows): self.flows.append( ResidualCouplingLayer( channels, hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=gin_channels, mean_only=True, ) ) self.flows.append(Flip()) def forward(self, x, x_mask, g=None, reverse=False): if not reverse: for flow in self.flows: x, _ = flow(x, x_mask, g=g, reverse=reverse) else: for flow in reversed(self.flows): x = flow(x, x_mask, g=g, reverse=reverse) return x class ResidualCouplingLayer(nn.Module): def __init__( self, channels, hidden_channels, kernel_size, dilation_rate, n_layers, p_dropout=0, gin_channels=0, mean_only=False, ): assert channels % 2 == 0, "channels should be divisible by 2" super().__init__() self.channels = channels self.hidden_channels = hidden_channels self.kernel_size = kernel_size self.dilation_rate = dilation_rate self.n_layers = n_layers self.half_channels = channels // 2 self.mean_only = mean_only self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) self.enc = WN( hidden_channels, kernel_size, dilation_rate, n_layers, p_dropout=p_dropout, gin_channels=gin_channels, ) self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) self.post.weight.data.zero_() self.post.bias.data.zero_() def forward(self, x, x_mask, g=None, reverse=False): x0, x1 = torch.split(x, [self.half_channels] * 2, 1) h = self.pre(x0) * x_mask h = self.enc(h, x_mask, g=g) stats = self.post(h) * x_mask if not self.mean_only: m, logs = torch.split(stats, [self.half_channels] * 2, 1) else: m = stats logs = torch.zeros_like(m) if not reverse: x1 = m + x1 * torch.exp(logs) * x_mask x = torch.cat([x0, x1], 1) logdet = torch.sum(logs, [1, 2]) return x, logdet else: x1 = (x1 - m) * torch.exp(-logs) * x_mask x = torch.cat([x0, x1], 1) return x class TransformerCouplingBlock(nn.Module): def __init__( self, channels, hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout, n_flows=4, gin_channels=0, ): super().__init__() self.channels = channels self.hidden_channels = hidden_channels self.kernel_size = kernel_size self.n_layers = n_layers self.n_flows = n_flows self.gin_channels = gin_channels self.flows = nn.ModuleList() for i in range(n_flows): self.flows.append( TransformerCouplingLayer( channels, hidden_channels, kernel_size, n_layers, n_heads, p_dropout, filter_channels, mean_only=True, gin_channels=self.gin_channels, ) ) self.flows.append(Flip()) def forward(self, x, x_mask, g=None, reverse=False): if not reverse: for flow in self.flows: x, _ = flow(x, x_mask, g=g, reverse=reverse) else: for flow in reversed(self.flows): x = flow(x, x_mask, g=g, reverse=reverse) return x class TransformerCouplingLayer(nn.Module): def __init__( self, channels, hidden_channels, kernel_size, n_layers, n_heads, p_dropout=0, filter_channels=0, mean_only=False, gin_channels=0, ): super().__init__() assert channels % 2 == 0, "channels should be divisible by 2" self.channels = channels self.hidden_channels = hidden_channels self.kernel_size = kernel_size self.n_layers = n_layers self.half_channels = channels // 2 self.mean_only = mean_only self.pre = nn.Conv1d(self.half_channels, hidden_channels, 1) self.enc = Encoder( hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout, gin_channels=gin_channels, ) self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1) self.post.weight.data.zero_() self.post.bias.data.zero_() def forward(self, x, x_mask, g=None, reverse=False): x0, x1 = torch.split(x, [self.half_channels] * 2, 1) h = self.pre(x0) * x_mask h = self.enc(h, x_mask, g=g) stats = self.post(h) * x_mask if not self.mean_only: m, logs = torch.split(stats, [self.half_channels] * 2, 1) else: m = stats logs = torch.zeros_like(m) if not reverse: x1 = m + x1 * torch.exp(logs) * x_mask x = torch.cat([x0, x1], 1) logdet = torch.sum(logs, [1, 2]) return x, logdet else: x1 = (x1 - m) * torch.exp(-logs) * x_mask x = torch.cat([x0, x1], 1) return x class Encoder(nn.Module): def __init__( self, hidden_channels, filter_channels, n_heads, n_layers, kernel_size=1, p_dropout=0.0, window_size=4, gin_channels=512, cond_layer_idx=2, ): super().__init__() self.hidden_channels = hidden_channels self.filter_channels = filter_channels self.n_heads = n_heads self.n_layers = n_layers self.kernel_size = kernel_size self.p_dropout = p_dropout self.window_size = window_size self.spk_emb_linear = nn.Linear(gin_channels, self.hidden_channels) self.cond_layer_idx = cond_layer_idx assert ( self.cond_layer_idx < self.n_layers ), "cond_layer_idx should be less than n_layers" self.drop = nn.Dropout(p_dropout) self.attn_layers = nn.ModuleList() self.norm_layers_1 = nn.ModuleList() self.ffn_layers = nn.ModuleList() self.norm_layers_2 = nn.ModuleList() for i in range(self.n_layers): self.attn_layers.append( MultiHeadAttention( hidden_channels, hidden_channels, n_heads, p_dropout=p_dropout, window_size=window_size, ) ) self.norm_layers_1.append(LayerNorm(hidden_channels)) self.ffn_layers.append( FFN( hidden_channels, hidden_channels, filter_channels, kernel_size, p_dropout=p_dropout, ) ) self.norm_layers_2.append(LayerNorm(hidden_channels)) def forward(self, x, x_mask, g=None): attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1) x = x * x_mask for i in range(self.n_layers): if i == self.cond_layer_idx and g is not None: g = self.spk_emb_linear(g.transpose(1, 2)) g = g.transpose(1, 2) x = x + g x = x * x_mask y = self.attn_layers[i](x, x, attn_mask) y = self.drop(y) x = self.norm_layers_1[i](x + y) y = self.ffn_layers[i](x, x_mask) y = self.drop(y) x = self.norm_layers_2[i](x + y) x = x * x_mask return x