mirror of
https://gitee.com/270580156/weiyu.git
synced 2026-05-19 05:37:53 +00:00
Sync from bytedesk-private: update
This commit is contained in:
226
modules/python/vendors/FunASR/funasr/frontends/utils/stft.py
vendored
Normal file
226
modules/python/vendors/FunASR/funasr/frontends/utils/stft.py
vendored
Normal file
@@ -0,0 +1,226 @@
|
||||
from distutils.version import LooseVersion
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
import torch
|
||||
|
||||
try:
|
||||
from torch_complex.tensor import ComplexTensor
|
||||
except:
|
||||
print("Please install torch_complex firstly")
|
||||
from funasr.models.transformer.utils.nets_utils import make_pad_mask
|
||||
from funasr.frontends.utils.complex_utils import is_complex
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
|
||||
is_torch_1_9_plus = LooseVersion(torch.__version__) >= LooseVersion("1.9.0")
|
||||
|
||||
|
||||
is_torch_1_7_plus = LooseVersion(torch.__version__) >= LooseVersion("1.7")
|
||||
|
||||
|
||||
class Stft(torch.nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
n_fft: int = 512,
|
||||
win_length: int = None,
|
||||
hop_length: int = 128,
|
||||
window: Optional[str] = "hann",
|
||||
center: bool = True,
|
||||
normalized: bool = False,
|
||||
onesided: bool = True,
|
||||
):
|
||||
super().__init__()
|
||||
self.n_fft = n_fft
|
||||
if win_length is None:
|
||||
self.win_length = n_fft
|
||||
else:
|
||||
self.win_length = win_length
|
||||
self.hop_length = hop_length
|
||||
self.center = center
|
||||
self.normalized = normalized
|
||||
self.onesided = onesided
|
||||
if window is not None and not hasattr(torch, f"{window}_window"):
|
||||
if window.lower() != "povey":
|
||||
raise ValueError(f"{window} window is not implemented")
|
||||
self.window = window
|
||||
|
||||
def extra_repr(self):
|
||||
return (
|
||||
f"n_fft={self.n_fft}, "
|
||||
f"win_length={self.win_length}, "
|
||||
f"hop_length={self.hop_length}, "
|
||||
f"center={self.center}, "
|
||||
f"normalized={self.normalized}, "
|
||||
f"onesided={self.onesided}"
|
||||
)
|
||||
|
||||
def forward(
|
||||
self, input: torch.Tensor, ilens: torch.Tensor = None
|
||||
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
|
||||
"""STFT forward function.
|
||||
|
||||
Args:
|
||||
input: (Batch, Nsamples) or (Batch, Nsample, Channels)
|
||||
ilens: (Batch)
|
||||
Returns:
|
||||
output: (Batch, Frames, Freq, 2) or (Batch, Frames, Channels, Freq, 2)
|
||||
|
||||
"""
|
||||
bs = input.size(0)
|
||||
if input.dim() == 3:
|
||||
multi_channel = True
|
||||
# input: (Batch, Nsample, Channels) -> (Batch * Channels, Nsample)
|
||||
input = input.transpose(1, 2).reshape(-1, input.size(1))
|
||||
else:
|
||||
multi_channel = False
|
||||
|
||||
# NOTE(kamo):
|
||||
# The default behaviour of torch.stft is compatible with librosa.stft
|
||||
# about padding and scaling.
|
||||
# Note that it's different from scipy.signal.stft
|
||||
|
||||
# output: (Batch, Freq, Frames, 2=real_imag)
|
||||
# or (Batch, Channel, Freq, Frames, 2=real_imag)
|
||||
if self.window is not None:
|
||||
if self.window.lower() == "povey":
|
||||
window = torch.hann_window(
|
||||
self.win_length, periodic=False, device=input.device, dtype=input.dtype
|
||||
).pow(0.85)
|
||||
else:
|
||||
window_func = getattr(torch, f"{self.window}_window")
|
||||
window = window_func(self.win_length, dtype=input.dtype, device=input.device)
|
||||
else:
|
||||
window = None
|
||||
|
||||
# For the compatibility of ARM devices, which do not support
|
||||
# torch.stft() due to the lake of MKL.
|
||||
if input.is_cuda or torch.backends.mkl.is_available():
|
||||
stft_kwargs = dict(
|
||||
n_fft=self.n_fft,
|
||||
win_length=self.win_length,
|
||||
hop_length=self.hop_length,
|
||||
center=self.center,
|
||||
window=window,
|
||||
normalized=self.normalized,
|
||||
onesided=self.onesided,
|
||||
)
|
||||
if is_torch_1_7_plus:
|
||||
stft_kwargs["return_complex"] = False
|
||||
output = torch.stft(input, **stft_kwargs)
|
||||
else:
|
||||
if self.training:
|
||||
raise NotImplementedError(
|
||||
"stft is implemented with librosa on this device, which does not "
|
||||
"support the training mode."
|
||||
)
|
||||
|
||||
# use stft_kwargs to flexibly control different PyTorch versions' kwargs
|
||||
stft_kwargs = dict(
|
||||
n_fft=self.n_fft,
|
||||
win_length=self.win_length,
|
||||
hop_length=self.hop_length,
|
||||
center=self.center,
|
||||
window=window,
|
||||
)
|
||||
|
||||
if window is not None:
|
||||
# pad the given window to n_fft
|
||||
n_pad_left = (self.n_fft - window.shape[0]) // 2
|
||||
n_pad_right = self.n_fft - window.shape[0] - n_pad_left
|
||||
stft_kwargs["window"] = torch.cat(
|
||||
[torch.zeros(n_pad_left), window, torch.zeros(n_pad_right)], 0
|
||||
).numpy()
|
||||
else:
|
||||
win_length = self.win_length if self.win_length is not None else self.n_fft
|
||||
stft_kwargs["window"] = torch.ones(win_length)
|
||||
|
||||
output = []
|
||||
# iterate over istances in a batch
|
||||
for i, instance in enumerate(input):
|
||||
stft = librosa.stft(input[i].numpy(), **stft_kwargs)
|
||||
output.append(torch.tensor(np.stack([stft.real, stft.imag], -1)))
|
||||
output = torch.stack(output, 0)
|
||||
if not self.onesided:
|
||||
len_conj = self.n_fft - output.shape[1]
|
||||
conj = output[:, 1 : 1 + len_conj].flip(1)
|
||||
conj[:, :, :, -1].data *= -1
|
||||
output = torch.cat([output, conj], 1)
|
||||
if self.normalized:
|
||||
output = output * (stft_kwargs["window"].shape[0] ** (-0.5))
|
||||
|
||||
# output: (Batch, Freq, Frames, 2=real_imag)
|
||||
# -> (Batch, Frames, Freq, 2=real_imag)
|
||||
output = output.transpose(1, 2)
|
||||
if multi_channel:
|
||||
# output: (Batch * Channel, Frames, Freq, 2=real_imag)
|
||||
# -> (Batch, Frame, Channel, Freq, 2=real_imag)
|
||||
output = output.view(bs, -1, output.size(1), output.size(2), 2).transpose(1, 2)
|
||||
|
||||
if ilens is not None:
|
||||
if self.center:
|
||||
pad = self.n_fft // 2
|
||||
ilens = ilens + 2 * pad
|
||||
|
||||
olens = (ilens - self.n_fft) // self.hop_length + 1
|
||||
output.masked_fill_(make_pad_mask(olens, output, 1), 0.0)
|
||||
else:
|
||||
olens = None
|
||||
|
||||
return output, olens
|
||||
|
||||
def inverse(
|
||||
self, input: Union[torch.Tensor, ComplexTensor], ilens: torch.Tensor = None
|
||||
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
|
||||
"""Inverse STFT.
|
||||
|
||||
Args:
|
||||
input: Tensor(batch, T, F, 2) or ComplexTensor(batch, T, F)
|
||||
ilens: (batch,)
|
||||
Returns:
|
||||
wavs: (batch, samples)
|
||||
ilens: (batch,)
|
||||
"""
|
||||
if LooseVersion(torch.__version__) >= LooseVersion("1.6.0"):
|
||||
istft = torch.functional.istft
|
||||
else:
|
||||
try:
|
||||
import torchaudio
|
||||
except ImportError:
|
||||
raise ImportError("Please install torchaudio>=0.3.0 or use torch>=1.6.0")
|
||||
|
||||
if not hasattr(torchaudio.functional, "istft"):
|
||||
raise ImportError("Please install torchaudio>=0.3.0 or use torch>=1.6.0")
|
||||
istft = torchaudio.functional.istft
|
||||
|
||||
if self.window is not None:
|
||||
window_func = getattr(torch, f"{self.window}_window")
|
||||
if is_complex(input):
|
||||
datatype = input.real.dtype
|
||||
else:
|
||||
datatype = input.dtype
|
||||
window = window_func(self.win_length, dtype=datatype, device=input.device)
|
||||
else:
|
||||
window = None
|
||||
|
||||
if is_complex(input):
|
||||
input = torch.stack([input.real, input.imag], dim=-1)
|
||||
elif input.shape[-1] != 2:
|
||||
raise TypeError("Invalid input type")
|
||||
input = input.transpose(1, 2)
|
||||
|
||||
wavs = istft(
|
||||
input,
|
||||
n_fft=self.n_fft,
|
||||
hop_length=self.hop_length,
|
||||
win_length=self.win_length,
|
||||
window=window,
|
||||
center=self.center,
|
||||
normalized=self.normalized,
|
||||
onesided=self.onesided,
|
||||
length=ilens.max() if ilens is not None else ilens,
|
||||
)
|
||||
|
||||
return wavs, ilens
|
||||
Reference in New Issue
Block a user