from dataclasses import dataclass
from io import BytesIO
from math import ceil, isclose
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import torch
from _decimal import ROUND_HALF_UP
from lhotse.audio.backend import info, save_audio, torchaudio_info
from lhotse.audio.source import AudioSource
from lhotse.audio.utils import (
AudioLoadingError,
DurationMismatchError,
VideoInfo,
get_audio_duration_mismatch_tolerance,
)
from lhotse.augmentation import (
AudioTransform,
DereverbWPE,
LoudnessNormalization,
Resample,
ReverbWithImpulseResponse,
Speed,
Tempo,
Volume,
)
from lhotse.utils import (
Pathlike,
Seconds,
SetContainingAnything,
asdict_nonull,
compute_num_samples,
fastcopy,
ifnone,
perturb_num_samples,
rich_exception_info,
)
Channels = Union[int, List[int]]
[docs]
@dataclass
class Recording:
"""
The :class:`~lhotse.audio.Recording` manifest describes the recordings in a given corpus.
It contains information about the recording, such as its path(s), duration, the number of samples, etc.
It allows to represent multiple channels coming from one or more files.
This manifest does not specify any segmentation information or supervision such as the transcript or the speaker
-- we use :class:`~lhotse.supervision.SupervisionSegment` for that.
Note that :class:`~lhotse.audio.Recording` can represent both a single utterance (e.g., in LibriSpeech)
and a 1-hour session with multiple channels and speakers (e.g., in AMI).
In the latter case, it is partitioned into data suitable for model training using :class:`~lhotse.cut.Cut`.
Internally, Lhotse supports multiple audio backends to read audio file.
By default, we try to use libsoundfile, then torchaudio (with FFMPEG integration starting with torchaudio 2.1),
and then audioread (which is an ffmpeg CLI wrapper).
For sphere files we prefer to use sph2pipe binary as it can work with certain unique encodings such as "shorten".
Audio backends in Lhotse are configurable. See:
* :func:`~lhotse.audio.backend.available_audio_backends`
* :func:`~lhotse.audio.backend.audio_backend`,
* :func:`~lhotse.audio.backend.get_current_audio_backend`
* :func:`~lhotse.audio.backend.set_current_audio_backend`
* :func:`~lhotse.audio.backend.get_default_audio_backend`
Examples
A :class:`~lhotse.audio.Recording` can be simply created from a local audio file::
>>> from lhotse import RecordingSet, Recording, AudioSource
>>> recording = Recording.from_file('meeting.wav')
>>> recording
Recording(
id='meeting',
sources=[AudioSource(type='file', channels=[0], source='meeting.wav')],
sampling_rate=16000,
num_samples=57600000,
duration=3600.0,
transforms=None
)
This manifest can be easily converted to a Python dict and serialized to JSON/JSONL/YAML/etc::
>>> recording.to_dict()
{'id': 'meeting',
'sources': [{'type': 'file',
'channels': [0],
'source': 'meeting.wav'}],
'sampling_rate': 16000,
'num_samples': 57600000,
'duration': 3600.0}
Recordings can be also created programatically, e.g. when they refer to URLs stored in S3 or somewhere else::
>>> s3_audio_files = ['s3://my-bucket/123-5678.flac', ...]
>>> recs = RecordingSet.from_recordings(
... Recording(
... id=url.split('/')[-1].replace('.flac', ''),
... sources=[AudioSource(type='url', source=url, channels=[0])],
... sampling_rate=16000,
... num_samples=get_num_samples(url),
... duration=get_duration(url)
... )
... for url in s3_audio_files
... )
It allows reading a subset of the audio samples as a numpy array::
>>> samples = recording.load_audio()
>>> assert samples.shape == (1, 16000)
>>> samples2 = recording.load_audio(offset=0.5)
>>> assert samples2.shape == (1, 8000)
See also: :class:`~lhotse.audio.recording.Recording`, :class:`~lhotse.cut.Cut`, :class:`~lhotse.cut.CutSet`.
"""
id: str
sources: List[AudioSource]
sampling_rate: int
num_samples: int
duration: Seconds
channel_ids: Optional[List[int]] = None
transforms: Optional[List[Dict]] = None
def __post_init__(self):
if self.channel_ids is None:
self.channel_ids = sorted(
cid for source in self.sources for cid in source.channels
)
assert (
sum(source.has_video for source in self.sources) < 2
), "Lhotse does not currently support recordings with more than a single video stream."
@property
def video(self) -> Optional[VideoInfo]:
s = self._video_source
if s is None:
return None
return s.video
@property
def has_video(self) -> bool:
return self._video_source is not None
@property
def _video_source(self) -> Optional[AudioSource]:
for s in self.sources:
if s.has_video:
return s
return None
@property
def num_channels(self) -> int:
return len(self.channel_ids)
[docs]
@staticmethod
def from_file(
path: Pathlike,
recording_id: Optional[Union[str, Callable[[Path], str]]] = None,
relative_path_depth: Optional[int] = None,
force_opus_sampling_rate: Optional[int] = None,
force_read_audio: bool = False,
) -> "Recording":
"""
Read an audio file's header and create the corresponding ``Recording``.
Suitable to use when each physical file represents a separate recording session.
.. caution::
If a recording session consists of multiple files (e.g. one per channel),
it is advisable to create the ``Recording`` object manually, with each
file represented as a separate ``AudioSource`` object.
:param path: Path to an audio file supported by libsoundfile (pysoundfile).
:param recording_id: recording id, when not specified ream the filename's stem ("x.wav" -> "x").
It can be specified as a string or a function that takes the recording path and returns a string.
:param relative_path_depth: optional int specifying how many last parts of the file path
should be retained in the ``AudioSource``. By default writes the path as is.
:param force_opus_sampling_rate: when specified, this value will be used as the sampling rate
instead of the one we read from the manifest. This is useful for OPUS files that always
have 48kHz rate and need to be resampled to the real one -- we will perform that operation
"under-the-hood". For non-OPUS files this input is undefined.
:param force_read_audio: Set it to ``True`` for audio files that do not have any metadata
in their headers (e.g., "The People's Speech" FLAC files).
:return: a new ``Recording`` instance pointing to the audio file.
"""
path = Path(path)
recording_id = (
path.stem
if recording_id is None
else recording_id(path)
if callable(recording_id)
else recording_id
)
audio_info = info(
path,
force_opus_sampling_rate=force_opus_sampling_rate,
force_read_audio=force_read_audio,
)
if audio_info.video is not None:
duration = audio_info.video.duration
num_samples = compute_num_samples(duration, audio_info.samplerate)
else:
duration = audio_info.duration
num_samples = audio_info.frames
return Recording(
id=recording_id,
sampling_rate=audio_info.samplerate,
num_samples=num_samples,
duration=duration,
sources=[
AudioSource(
type="file",
channels=list(range(audio_info.channels)),
source=(
"/".join(path.parts[-relative_path_depth:])
if relative_path_depth is not None and relative_path_depth > 0
else str(path)
),
video=audio_info.video,
)
],
)
[docs]
@staticmethod
def from_bytes(
data: bytes,
recording_id: str,
) -> "Recording":
"""
Like :meth:`.Recording.from_file`, but creates a manifest for a byte string with
raw encoded audio data. This data is first decoded to obtain info such as the
sampling rate, number of channels, etc. Then, the binary data is attached to the
manifest. Calling :meth:`.Recording.load_audio` does not perform any I/O and
instead decodes the byte string contents in memory.
.. note:: Intended use of this method is for packing Recordings into archives
where metadata and data should be available together
(e.g., in WebDataset style tarballs).
.. caution:: Manifest created with this method cannot be stored as JSON
because JSON doesn't allow serializing binary data.
:param data: bytes, byte string containing encoded audio contents.
:param recording_id: recording id, unique string identifier.
:return: a new ``Recording`` instance that owns the byte string data.
"""
stream = BytesIO(data)
audio_info = torchaudio_info(stream)
return Recording(
id=recording_id,
sampling_rate=audio_info.samplerate,
num_samples=audio_info.frames,
duration=audio_info.duration,
sources=[
AudioSource(
type="memory",
channels=list(range(audio_info.channels)),
source=data,
)
],
)
[docs]
def move_to_memory(
self,
channels: Optional[Channels] = None,
offset: Seconds = None,
duration: Optional[Seconds] = None,
format: Optional[str] = None,
) -> "Recording":
"""
Read audio data and return a copy of the manifest with binary data attached.
Calling :meth:`.Recording.load_audio` on that copy will not trigger I/O.
If all arguments are left as defaults, we won't decode the audio and attach
the bytes we read from disk/other source as-is.
If ``channels``, ``duration``, or ``offset`` are specified, we'll decode the
audio and re-encode it into ``format`` before attaching.
The default format is FLAC, other formats compatible with torchaudio.save are
also accepted.
"""
if all(src.type == "memory" for src in self.sources):
return self # nothing to do
def _aslist(x):
if isinstance(x, int):
return [x]
return x
# Case #1: no opts specified, read audio without decoding and move it in memory.
if all(opt is None for opt in (channels, offset, duration)) or (
(channels is None or _aslist(channels) == self.channel_ids)
and (offset is None or isclose(offset, 0.0))
and (duration is None or isclose(duration, self.duration))
):
memory_sources = [
AudioSource(
type="memory",
channels=old_source.channels,
source=open(old_source.source, "rb").read(),
)
for old_source in self.sources
]
return fastcopy(self, sources=memory_sources)
# Case #2: user specified some subset of the recording, decode audio,
# subset it, and encode it again but save in memory.
audio = self.load_audio(
channels=channels, offset=ifnone(offset, 0), duration=duration
)
stream = BytesIO()
save_audio(stream, torch.from_numpy(audio), self.sampling_rate, format=format)
channels = ifnone(channels, self.channel_ids)
if isinstance(channels, int):
channels = [channels]
return Recording(
id=self.id,
sources=[
AudioSource(
type="memory",
channels=channels,
source=stream.getvalue(),
)
],
sampling_rate=self.sampling_rate,
num_samples=audio.shape[1],
duration=ifnone(duration, self.duration),
)
[docs]
def to_dict(self) -> dict:
return asdict_nonull(self)
[docs]
def to_cut(self):
"""
Create a Cut out of this recording --- MonoCut or MultiCut, depending on the
number of channels.
"""
from lhotse.cut import MonoCut, MultiCut
cls = MonoCut if self.num_channels == 1 else MultiCut
return cls(
id=self.id,
start=0.0,
duration=self.duration,
channel=self.channel_ids[0] if self.num_channels == 1 else self.channel_ids,
recording=self,
)
[docs]
@rich_exception_info
def load_audio(
self,
channels: Optional[Channels] = None,
offset: Seconds = 0.0,
duration: Optional[Seconds] = None,
) -> np.ndarray:
"""
Read the audio samples from the underlying audio source (path, URL, unix pipe/command).
:param channels: int or iterable of ints, a subset of channel IDs to read (reads all by default).
:param offset: seconds, where to start reading the audio (at offset 0 by default).
Note that it is only efficient for local filesystem files, i.e. URLs and commands will read
all the samples first and discard the unneeded ones afterwards.
:param duration: seconds, indicates the total audio time to read (starting from ``offset``).
:return: a numpy array of audio samples with shape ``(num_channels, num_samples)``.
"""
assert offset <= self.duration, (
f"Cannot load audio because the Recording's duration {self.duration}s "
f"is smaller than the requested offset {offset}s."
)
# Micro-optimization for a number of audio loading cases:
# if duration is very close to full recording,
# just read everything, and we'll discard some samples at the end.
orig_duration = duration
if duration is not None and isclose(duration, self.duration, abs_tol=1e-3):
duration = None
if channels is None:
channels = SetContainingAnything()
else:
channels = frozenset([channels] if isinstance(channels, int) else channels)
recording_channels = frozenset(self.channel_ids)
assert channels.issubset(recording_channels), (
"Requested to load audio from a channel "
"that does not exist in the recording: "
f"(recording channels: {recording_channels} -- "
f"requested channels: {channels})"
)
transforms = [
AudioTransform.from_dict(params) for params in self.transforms or []
]
# Do a "backward pass" over data augmentation transforms to get the
# offset and duration for loading a piece of the original audio.
offset_aug, duration_aug = offset, duration
for tfn in reversed(transforms):
offset_aug, duration_aug = tfn.reverse_timestamps(
offset=offset_aug,
duration=duration_aug,
sampling_rate=self.sampling_rate,
)
samples_per_source = []
for source in self.sources:
# Case: source not requested
if not channels.intersection(source.channels):
continue
samples = source.load_audio(
offset=offset_aug,
duration=duration_aug,
force_opus_sampling_rate=self.sampling_rate,
)
# Case: two-channel audio file but only one channel requested
# it might not be optimal to load all channels, but IDK if there's anything we can do about it
channels_to_remove = [
idx for idx, cid in enumerate(source.channels) if cid not in channels
]
if channels_to_remove:
samples = np.delete(samples, channels_to_remove, axis=0)
samples_per_source.append(samples)
# Stack all the samples from all the sources into a single array.
audio = self._stack_audio_channels(samples_per_source)
# We'll apply the transforms now (if any).
for tfn in transforms:
audio = tfn(audio, self.sampling_rate)
if self.has_video:
# It's possible the audio and video durations are quite mismatched.
# We'll pad audio with zeroes or truncate audio to accomodate the video,
# when it's available
audio = assert_and_maybe_fix_num_samples(
audio,
offset=offset,
duration=orig_duration,
recording=self,
tolerance=1e6,
pad_mode="constant",
)
else:
# Transformation chains can introduce small mismatches in the number of samples:
# we'll fix them here, or raise an error if they exceeded a tolerance threshold.
audio = assert_and_maybe_fix_num_samples(
audio, offset=offset, duration=orig_duration, recording=self
)
return audio
[docs]
@rich_exception_info
def load_video(
self,
channels: Optional[Channels] = None,
offset: Seconds = 0.0,
duration: Optional[Seconds] = None,
with_audio: bool = True,
force_consistent_duration: bool = True,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
Read the video frames and audio samples from the underlying source (path, URL, unix pipe/command).
:param channels: int or iterable of ints, a subset of channel IDs to read (reads all by default).
:param offset: seconds, where to start reading the video (at offset 0 by default).
Note that it is only efficient for local filesystem files, i.e. URLs and commands will read
all the samples first and discard the unneeded ones afterwards.
:param duration: seconds, indicates the total video time to read (starting from ``offset``).
:param with_audio: bool, whether to load and return audio alongside video. True by default.
:param force_consistent_duration: bool, if audio duration is different than video duration
(as counted by ``num_frames / fps``), we'll either truncate or pad the audio with zeros.
True by default.
:return: a tuple of video tensor and optional audio tensor (or None).
"""
assert self.has_video, f"Recording {self.id} has no video to load."
assert offset <= self.duration, (
f"Cannot load audio because the Recording's duration {self.duration}s "
f"is smaller than the requested offset {offset}s."
)
for t in ifnone(self.transforms, ()):
assert t["name"] not in (
"Speed",
"Tempo",
), "Recording.load_video() does not support speed/tempo perturbation."
if not with_audio:
video, _ = self._video_source.load_video(
offset=offset, duration=duration, with_audio=False
)
return video, None
# Micro-optimization for a number of audio loading cases:
# if duration is very close to full recording,
# just read everything, and we'll discard some samples at the end.
orig_duration = duration
if duration is not None and isclose(duration, self.duration, abs_tol=1e-3):
duration = None
if channels is None:
channels = SetContainingAnything()
else:
channels = frozenset([channels] if isinstance(channels, int) else channels)
recording_channels = frozenset(self.channel_ids)
assert channels.issubset(recording_channels), (
"Requested to load audio from a channel "
"that does not exist in the recording: "
f"(recording channels: {recording_channels} -- "
f"requested channels: {channels})"
)
transforms = [
AudioTransform.from_dict(params) for params in self.transforms or []
]
# Do a "backward pass" over data augmentation transforms to get the
# offset and duration for loading a piece of the original audio.
offset_aug, duration_aug = offset, duration
for tfn in reversed(transforms):
offset_aug, duration_aug = tfn.reverse_timestamps(
offset=offset_aug,
duration=duration_aug,
sampling_rate=self.sampling_rate,
)
samples_per_source = []
video = None
for source in self.sources:
if source.has_video:
video, samples = source.load_video(
offset=offset_aug,
duration=duration_aug,
)
else:
samples = source.load_audio(offset=offset_aug, duration=duration_aug)
# Case: source not requested (for audio, but it might be the only one with video)
if not channels.intersection(source.channels):
continue
# Case: two-channel audio file but only one channel requested
# it might not be optimal to load all channels, but IDK if there's anything we can do about it
channels_to_remove = [
idx for idx, cid in enumerate(source.channels) if cid not in channels
]
if channels_to_remove:
samples = np.delete(samples, channels_to_remove, axis=0)
samples_per_source.append(samples)
assert video is not None
# Stack all the samples from all the sources into a single array.
audio = self._stack_audio_channels(samples_per_source)
# We'll apply the transforms now (if any).
for tfn in transforms:
audio = tfn(audio, self.sampling_rate)
if force_consistent_duration:
# We want to keep audio and video duration identical by truncating/padding audio.
audio = assert_and_maybe_fix_num_samples(
audio,
offset=offset,
duration=video.shape[0] / self.video.fps,
recording=self,
# hack: "infinite" tolerance disables exceptions, i.e. 1min video and 1h audio => 1min audio
tolerance=1e6,
pad_mode="zero",
)
else:
# Transformation chains can introduce small mismatches in the number of samples:
# we'll fix them here, or raise an error if they exceeded a tolerance threshold.
audio = assert_and_maybe_fix_num_samples(
audio,
offset=offset,
duration=orig_duration,
recording=self,
pad_mode="reflect",
)
return video, torch.from_numpy(audio)
[docs]
def play_video(self):
if self.has_video:
from IPython.display import Video
return Video(filename=self._video_source.source)
def _stack_audio_channels(self, samples_per_source: List[np.ndarray]) -> np.ndarray:
# There may be a mismatch in the number of samples between different channels. We
# check if the mismatch is within a reasonable tolerance and if so, we pad
# all channels to the length of the longest one.
allowed_diff = int(
compute_num_samples(
get_audio_duration_mismatch_tolerance(),
sampling_rate=self.sampling_rate,
)
)
if len(samples_per_source) > 1:
# Make all arrays 2D
samples_per_source = [
s[None, :] if s.ndim == 1 else s for s in samples_per_source
]
max_samples = max(s.shape[1] for s in samples_per_source)
for i, s in enumerate(samples_per_source):
if max_samples - s.shape[1] <= allowed_diff:
s = np.pad(s, ((0, 0), (0, max_samples - s.shape[1])), "constant")
samples_per_source[i] = s
else:
raise DurationMismatchError(
f"The mismatch between the number of samples in the "
f"different channels of the recording {self.id} is "
f"greater than the allowed tolerance {get_audio_duration_mismatch_tolerance()}."
)
audio = np.concatenate(samples_per_source, axis=0)
else:
# shape: (n_channels, n_samples)
audio = np.vstack(samples_per_source)
return audio
def _expected_num_samples(
self, offset: Seconds, duration: Optional[Seconds]
) -> int:
if offset == 0 and duration is None:
return self.num_samples
duration = duration if duration is not None else self.duration - offset
return compute_num_samples(duration, sampling_rate=self.sampling_rate)
[docs]
def with_path_prefix(self, path: Pathlike) -> "Recording":
return fastcopy(self, sources=[s.with_path_prefix(path) for s in self.sources])
[docs]
def with_video_resolution(self, width: int, height: int) -> "Recording":
return fastcopy(
self,
sources=[
s.with_video_resolution(width=width, height=height)
for s in self.sources
],
)
[docs]
def perturb_speed(self, factor: float, affix_id: bool = True) -> "Recording":
"""
Return a new ``Recording`` that will lazily perturb the speed while loading audio.
The ``num_samples`` and ``duration`` fields are updated to reflect the
shrinking/extending effect of speed.
:param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster).
:param affix_id: When true, we will modify the ``Recording.id`` field
by affixing it with "_sp{factor}".
:return: a modified copy of the current ``Recording``.
"""
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(Speed(factor=factor).to_dict())
new_num_samples = perturb_num_samples(self.num_samples, factor)
new_duration = new_num_samples / self.sampling_rate
return fastcopy(
self,
id=f"{self.id}_sp{factor}" if affix_id else self.id,
num_samples=new_num_samples,
duration=new_duration,
transforms=transforms,
)
[docs]
def perturb_tempo(self, factor: float, affix_id: bool = True) -> "Recording":
"""
Return a new ``Recording`` that will lazily perturb the tempo while loading audio.
Compared to speed perturbation, tempo preserves pitch.
The ``num_samples`` and ``duration`` fields are updated to reflect the
shrinking/extending effect of tempo.
:param factor: The tempo will be adjusted this many times (e.g. factor=1.1 means 1.1x faster).
:param affix_id: When true, we will modify the ``Recording.id`` field
by affixing it with "_tp{factor}".
:return: a modified copy of the current ``Recording``.
"""
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(Tempo(factor=factor).to_dict())
new_num_samples = perturb_num_samples(self.num_samples, factor)
new_duration = new_num_samples / self.sampling_rate
return fastcopy(
self,
id=f"{self.id}_tp{factor}" if affix_id else self.id,
num_samples=new_num_samples,
duration=new_duration,
transforms=transforms,
)
[docs]
def perturb_volume(self, factor: float, affix_id: bool = True) -> "Recording":
"""
Return a new ``Recording`` that will lazily perturb the volume while loading audio.
:param factor: The volume scale to be applied (e.g. factor=1.1 means 1.1x louder).
:param affix_id: When true, we will modify the ``Recording.id`` field
by affixing it with "_tp{factor}".
:return: a modified copy of the current ``Recording``.
"""
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(Volume(factor=factor).to_dict())
return fastcopy(
self,
id=f"{self.id}_vp{factor}" if affix_id else self.id,
transforms=transforms,
)
[docs]
def normalize_loudness(self, target: float, affix_id: bool = False) -> "Recording":
"""
Return a new ``Recording`` that will lazily apply WPE dereverberation.
:param target: The target loudness (in dB) to normalize to.
:param affix_id: When true, we will modify the ``Recording.id`` field
by affixing it with "_ln{factor}".
:return: a modified copy of the current ``Recording``.
"""
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(LoudnessNormalization(target=target).to_dict())
return fastcopy(
self,
id=f"{self.id}_ln{target}" if affix_id else self.id,
transforms=transforms,
)
[docs]
def dereverb_wpe(self, affix_id: bool = True) -> "Recording":
"""
Return a new ``Recording`` that will lazily apply WPE dereverberation.
:param affix_id: When true, we will modify the ``Recording.id`` field
by affixing it with "_wpe".
:return: a modified copy of the current ``Recording``.
"""
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(DereverbWPE().to_dict())
return fastcopy(
self,
id=f"{self.id}_wpe" if affix_id else self.id,
transforms=transforms,
)
[docs]
def reverb_rir(
self,
rir_recording: Optional["Recording"] = None,
normalize_output: bool = True,
early_only: bool = False,
affix_id: bool = True,
rir_channels: Optional[List[int]] = None,
room_rng_seed: Optional[int] = None,
source_rng_seed: Optional[int] = None,
) -> "Recording":
"""
Return a new ``Recording`` that will lazily apply reverberation based on provided
impulse response while loading audio. If no impulse response is provided, we will
generate an RIR using a fast random generator (https://arxiv.org/abs/2208.04101).
:param rir_recording: The impulse response to be used.
:param normalize_output: When true, output will be normalized to have energy as input.
:param early_only: When true, only the early reflections (first 50 ms) will be used.
:param affix_id: When true, we will modify the ``Recording.id`` field
by affixing it with "_rvb".
:param rir_channels: The channels of the impulse response to be used (in case of multi-channel
impulse responses). By default, only the first channel is used. If no RIR is
provided, we will generate one with as many channels as this argument specifies.
:param room_rng_seed: The seed to be used for the room configuration.
:param source_rng_seed: The seed to be used for the source position.
:return: the perturbed ``Recording``.
"""
if rir_recording is not None:
assert (
rir_recording.sampling_rate == self.sampling_rate
), f"Sampling rate mismatch between RIR vs recording: {rir_recording.sampling_rate} vs {self.sampling_rate}."
# We may need to change the `channel_ids` field according to whether we are convolving
# with a multi-channel RIR or not.
# The following cases are possible:
# Case 1: input is mono, rir is mono -> mono output, no need to change
# Case 2: input is mono, rir is multi-channel -> multi-channel output, change channel_ids
# Case 3: input is multi-channel, rir is mono -> multi-channel output, no need to change
# Case 4: input is multi-channel, rir is multi-channel -> multi-channel output,
# no need to change (since we assume that the RIR has the same number of channels as the input)
if self.num_channels > 1 or rir_channels is None or len(rir_channels) == 1:
# Case 1, 3 or 4
new_channel_ids = self.channel_ids
else:
# Case 2
new_channel_ids = list(range(len(rir_channels)))
if rir_recording is None:
from lhotse.augmentation.utils import FastRandomRIRGenerator
rir_generator = FastRandomRIRGenerator(
sr=self.sampling_rate,
room_seed=room_rng_seed,
source_seed=source_rng_seed,
)
else:
rir_generator = None
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(
ReverbWithImpulseResponse(
rir=rir_recording,
normalize_output=normalize_output,
early_only=early_only,
rir_channels=rir_channels if rir_channels is not None else [0],
rir_generator=rir_generator,
).to_dict()
)
return fastcopy(
self,
id=f"{self.id}_rvb" if affix_id else self.id,
channel_ids=new_channel_ids,
transforms=transforms,
)
[docs]
def resample(self, sampling_rate: int) -> "Recording":
"""
Return a new ``Recording`` that will be lazily resampled while loading audio.
:param sampling_rate: The new sampling rate.
:return: A resampled ``Recording``.
"""
if sampling_rate == self.sampling_rate:
return fastcopy(self)
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(
Resample(
source_sampling_rate=self.sampling_rate,
target_sampling_rate=sampling_rate,
).to_dict()
)
new_num_samples = compute_num_samples(
self.duration, sampling_rate, rounding=ROUND_HALF_UP
)
# Duration might need an adjustment when doing a non-trivial resampling
# (e.g. 16000 -> 22050), where the resulting number of samples cannot
# correspond to old duration exactly.
new_duration = new_num_samples / sampling_rate
return fastcopy(
self,
duration=new_duration,
num_samples=new_num_samples,
sampling_rate=sampling_rate,
transforms=transforms,
)
[docs]
@staticmethod
def from_dict(data: dict) -> "Recording":
raw_sources = data.pop("sources")
return Recording(
sources=[AudioSource.from_dict(s) for s in raw_sources], **data
)
def assert_and_maybe_fix_num_samples(
audio: np.ndarray,
offset: Seconds,
duration: Optional[Seconds],
recording: Recording,
tolerance: Optional[Seconds] = None,
pad_mode: str = "reflect",
) -> np.ndarray:
# When resampling in high sampling rates (48k -> 44.1k)
# it is difficult to estimate how sox will perform rounding;
# we will just add/remove one sample to be consistent with
# what we have estimated.
# This effect is exacerbated by chaining multiple augmentations together.
if tolerance is None: # use Lhotse's default
tolerance = get_audio_duration_mismatch_tolerance()
expected_num_samples = compute_num_samples(
duration=duration if duration is not None else recording.duration - offset,
sampling_rate=recording.sampling_rate,
)
diff = expected_num_samples - audio.shape[1]
if diff == 0:
return audio # this is normal condition
allowed_diff = int(ceil(tolerance * recording.sampling_rate))
if 0 < diff <= allowed_diff:
audio = np.pad(audio, ((0, 0), (0, diff)), mode=pad_mode)
return audio
elif -allowed_diff <= diff < 0:
audio = audio[:, :diff]
return audio
else:
raise AudioLoadingError(
"The number of declared samples in the recording diverged from the one obtained "
f"when loading audio (offset={offset}, duration={duration}). "
f"This could be internal Lhotse's error or a faulty transform implementation. "
"Please report this issue in Lhotse and show the "
f"following: diff={diff}, audio.shape={audio.shape}, recording={recording}"
)