import logging
import warnings
from dataclasses import dataclass
from functools import partial, reduce
from io import BytesIO
from operator import add
from typing import (
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Tuple,
Union,
)
import numpy as np
import torch
from intervaltree import IntervalTree
from lhotse.array import Array, TemporalArray
from lhotse.audio import Recording, VideoInfo, get_audio_duration_mismatch_tolerance
from lhotse.audio.backend import save_audio
from lhotse.audio.mixer import AudioMixer, VideoMixer, audio_energy
from lhotse.augmentation import (
AudioTransform,
AugmentFn,
LoudnessNormalization,
ReverbWithImpulseResponse,
)
from lhotse.augmentation.compress import Codec
from lhotse.cut.base import Cut
from lhotse.cut.data import DataCut
from lhotse.cut.padding import PaddingCut
from lhotse.features import (
FeatureExtractor,
FeatureMixer,
create_default_feature_extractor,
)
from lhotse.features.base import Features
from lhotse.features.io import FeaturesWriter
from lhotse.image import Image
from lhotse.supervision import SupervisionSegment
from lhotse.utils import (
DEFAULT_PADDING_VALUE,
LOG_EPSILON,
Decibels,
Pathlike,
Seconds,
add_durations,
compute_num_frames,
compute_num_samples,
fastcopy,
hash_str_to_int,
merge_items_with_delimiter,
overlaps,
perturb_num_samples,
rich_exception_info,
uuid4,
)
[docs]@dataclass
class MixTrack:
"""
Represents a single track in a mix of Cuts. Points to a specific DataCut or PaddingCut and holds information on
how to mix it with other Cuts, relative to the first track in a mix.
"""
cut: Cut
type: str = None
offset: Seconds = 0.0
snr: Optional[Decibels] = None
tag: Optional[str] = None
is_snr_reference: bool = False
mute: bool = False
def __post_init__(self):
self.type = type(self.cut).__name__
[docs] @staticmethod
def from_dict(data: dict):
from .set import deserialize_cut
# Take out `type` from data dict and put it into the `cut` dict.
cut_dict = data.pop("cut")
cut_dict["type"] = data.pop("type")
return MixTrack(deserialize_cut(cut_dict), **data)
[docs] def to_dict(self) -> Dict:
ans = {
"cut": self.cut.to_dict(),
"type": self.type,
"offset": self.offset,
}
if self.snr is not None:
ans["snr"] = self.snr
if self.tag is not None:
ans["tag"] = self.tag
if self.is_snr_reference:
ans["is_snr_reference"] = self.is_snr_reference
if self.mute:
ans["mute"] = self.mute
return ans
[docs]@dataclass
class MixedCut(Cut):
"""
:class:`~lhotse.cut.MixedCut` is a :class:`~lhotse.cut.Cut` that actually consists of multiple other cuts.
Its primary purpose is to allow time-domain and feature-domain augmentation via mixing the training cuts
with noise, music, and babble cuts. The actual mixing operations are performed on-the-fly.
Internally, :class:`~lhotse.cut.MixedCut` holds other cuts in multiple tracks (:class:`~lhotse.cut.MixTrack`),
each with its own offset and SNR that is relative to the first track.
Please refer to the documentation of :class:`~lhotse.cut.Cut` to learn more about using cuts.
In addition to methods available in :class:`~lhotse.cut.Cut`, :class:`~lhotse.cut.MixedCut` provides the methods to
read all of its tracks audio and features as separate channels:
>>> cut = MixedCut(...)
>>> mono_features = cut.load_features()
>>> assert len(mono_features.shape) == 2
>>> multi_features = cut.load_features(mixed=False)
>>> # Now, the first dimension is the channel.
>>> assert len(multi_features.shape) == 3
.. note:: MixedCut is different from MultiCut, which is intended to represent multi-channel recordings
that share the same supervisions.
.. note:: Each track in a MixedCut can be either a MonoCut, MultiCut, or PaddingCut.
.. note:: The ``transforms`` field is a list of dictionaries that describe the transformations
that should be applied to the track after mixing.
See also:
- :class:`lhotse.cut.Cut`
- :class:`lhotse.cut.MonoCut`
- :class:`lhotse.cut.MultiCut`
- :class:`lhotse.cut.CutSet`
"""
id: str
tracks: List[MixTrack]
transforms: Optional[List[AudioTransform]] = None
@property
def supervisions(self) -> List[SupervisionSegment]:
"""
Lists the supervisions of the underlying source cuts.
Each segment start time will be adjusted by the track offset.
"""
return [
segment.with_offset(track.offset)
for track in _get_audible_tracks(self)
for segment in track.cut.supervisions
]
@property
def start(self) -> Seconds:
return 0
@property
def duration(self) -> Seconds:
track_durations = (
track.offset + track.cut.duration for track in _get_audible_tracks(self)
)
return round(max(track_durations), ndigits=8)
@property
def channel(self) -> Union[int, List[int]]:
num_channels = self.num_channels
return list(range(num_channels)) if num_channels > 1 else 0
@property
def has_features(self) -> bool:
return self._first_non_padding_cut.has_features
@property
def has_recording(self) -> bool:
return self._first_non_padding_cut.has_recording
@property
def has_video(self) -> bool:
return self._first_non_padding_cut.has_video
@property
def is_in_memory(self) -> bool:
return any(track.cut.is_in_memory for track in _get_audible_tracks(self))
[docs] def has(self, field: str) -> bool:
return self._first_non_padding_cut.has(field)
@property
def num_frames(self) -> Optional[int]:
if self.has_features:
return compute_num_frames(
duration=self.duration,
frame_shift=self.frame_shift,
sampling_rate=self.sampling_rate,
)
return None
@property
def frame_shift(self) -> Optional[Seconds]:
return self._first_non_padding_cut.frame_shift
@property
def sampling_rate(self) -> Optional[int]:
return self._first_non_padding_cut.sampling_rate
@property
def num_samples(self) -> Optional[int]:
return compute_num_samples(self.duration, self.sampling_rate)
@property
def num_features(self) -> Optional[int]:
return self._first_non_padding_cut.num_features
@property
def num_channels(self) -> Optional[int]:
# PaddingCut and MonoCut have 1 channel each, MultiCut has N. We don't support MixedCut with MixedCut tracks.
return max(track.cut.num_channels for track in _get_audible_tracks(self))
@property
def features_type(self) -> Optional[str]:
return self._first_non_padding_cut.features.type if self.has_features else None
[docs] def to_dict(self) -> dict:
ans = {
"id": self.id,
"tracks": [t.to_dict() for t in self.tracks],
"type": type(self).__name__,
}
if self.transforms:
ans["transforms"] = [t.to_dict() for t in self.transforms]
return ans
[docs] def unmix(self, tag: Optional[str] = None) -> List[Cut]:
"""
Split this mixed cut into time-aligned constituent cuts.
When ``tag`` is ``None``, this returns one cut per non-padding audible track.
Each returned cut preserves the original offsets and overall duration, so the
loaded audio/features can be summed to reconstruct the original mix.
When ``tag`` is provided, this returns exactly two cuts in order:
``[without_tag, with_tag]``. Tracks are grouped by whether their
:attr:`MixTrack.tag` matches ``tag``. For exact SNR preservation, the grouped
outputs may carry an internal muted SNR-reference track that is ignored by the
public track views but retained for mixing math.
:param tag: Optional track-group label to split on.
:return: A list of one cut per track, or two grouped cuts when ``tag`` is provided.
"""
tracks = [
track
for track in _get_audible_tracks(self)
if not isinstance(track.cut, PaddingCut)
]
if tag is None:
return [_to_unmixed_cut(self, [track]) for track in tracks]
without_tag = [track for track in tracks if track.tag != tag]
with_tag = [track for track in tracks if track.tag == tag]
return [
_to_unmixed_cut(self, without_tag),
_to_unmixed_cut(self, with_tag),
]
[docs] def iter_data(
self,
) -> Generator[
Tuple[str, Union[Recording, Features, Array, TemporalArray, Image]], None, None
]:
"""
Iterate over each data piece attached to this cut.
Returns a generator yielding tuples of ``(key, manifest)``, where
``key`` is the name of the attribute under which ``manifest`` is found.
``manifest`` is of type :class:`~lhotse.Recording`, :class:`~lhotse.Features`,
:class:`~lhotse.TemporalArray`, :class:`~lhotse.Array`, or :class:`~lhotse.Image`.
For example, if ``key`` is ``recording``, then ``manifest`` is ``self.recording``.
"""
return self._first_non_padding_cut.iter_data()
def __setattr__(self, key: str, value: Any) -> None:
"""
This magic function is called when the user tries to set an attribute.
We use it as syntactic sugar to store custom attributes in ``self._first_non_padding_cut.custom``
field, so that they can be (de)serialized later.
Setting a ``None`` value will remove the attribute from ``custom``.
.. note:: MixedCut doesn't have its own ``custom`` field, and by convention
always refers to the ``custom`` field on its first non padding cut.
"""
if key in self.__dataclass_fields__:
super().__setattr__(key, value)
else:
setattr(self._first_non_padding_cut, key, value)
def __getattr__(self, name: str) -> Any:
"""
This magic function is called when the user tries to access an attribute
of :class:`.MixedCut` that doesn't exist. It is used for accessing the custom
attributes of cuts. We support exactly one scenario for mixed cuts:
If :attr:`tracks` contains exactly one :class:`.MonoCut` object (and an arbitrary
number of :class:`.PaddingCut` objects), we will look up the custom attributes
of that cut.
If one of the custom attributes is of type :class:`~lhotse.array.Array` or
:class:`~lhotse.array.TemporalArray` we'll also support loading those arrays
(see example below). Additionally, we will incorporate extra padding as
dictated by padding cuts.
Example:
>>> cut = MonoCut('cut1', start=0, duration=4, channel=0)
>>> cut.alignment = TemporalArray(...)
>>> mixed_cut = cut.pad(10, pad_value_dict={'alignment': -1})
>>> ali = mixed_cut.load_alignment()
"""
# Python will sometimes try to call undefined magic functions,
# just fail for them (e.g. __setstate__ when pickling).
if name.startswith("__"):
raise AttributeError()
# Loading a custom array attribute + performing padding.
if name.startswith("load_"):
attr_name = name[5:]
return partial(self.load_custom, attr_name)
if name == "custom":
# Merge custom dicts of underlying data.
ans = {}
for t in _get_audible_tracks(self):
if cstm := t.cut.custom:
ans.update(cstm)
return ans
# Returning the contents of "mono_cut.custom[name]",
# or raising AttributeError.
tracks_with_attr = self._get_tracks_with_custom_attr(name)
if tracks_with_attr:
_, mono_cut = tracks_with_attr[0]
return getattr(mono_cut, name)
raise AttributeError(f"No such attribute: '{name}'")
[docs] def has_custom(self, name: str) -> bool:
tracks_with_attr = self._get_tracks_with_custom_attr(name)
if not tracks_with_attr:
return False
_, mono_cut = tracks_with_attr[0]
return hasattr(mono_cut, name)
[docs] def load_custom(self, name: str) -> np.ndarray:
"""
Load custom data as numpy array. The custom data is expected to have
been stored in cuts ``custom`` field as an :class:`~lhotse.array.Array` or
:class:`~lhotse.array.TemporalArray` manifest.
.. note:: It works with Array manifests stored via attribute assignments,
e.g.: ``cut.my_custom_data = Array(...)``.
.. note:: For :class:`.MixedCut` with :class:`~lhotse.audio.Recording`-type
custom attributes, this supports multiple non-overlapping tracks (e.g.
from :meth:`Cut.append`). The audio from each track's custom Recording
is loaded and placed at the correct offset in the output buffer, similar
to how :meth:`load_audio` works for the main recording.
.. warning:: For :class:`~lhotse.array.Array` (non-temporal) and
:class:`~lhotse.array.TemporalArray` custom attributes, this will only
work if the mixed cut has a single non-padding track with that attribute.
:param name: name of the custom attribute.
:return: a numpy array with the data (after padding).
"""
from lhotse.array import Array, pad_array
tracks_with_attr = self._get_tracks_with_custom_attr(name)
assert len(tracks_with_attr) > 0, (
f"No non-padding tracks with custom attribute '{name}' found "
f"in this MixedCut."
)
# Use getattr to propagate AttributeError if "name" is not defined.
first_idx, first_cut = tracks_with_attr[0]
manifest = getattr(first_cut, name)
# Multiple tracks with the attribute: only supported for Recording-type.
if len(tracks_with_attr) > 1:
if isinstance(manifest, Recording):
return self._load_custom_recording_multi_track(name, tracks_with_attr)
raise ValueError(
f"This MixedCut has {len(tracks_with_attr)} non-padding tracks "
f"with a custom attribute '{name}'. Mixing custom attributes is only "
f"supported for Recording-type attributes. Problematic cut:\n{self}"
)
# Single track with the attribute — existing behavior.
# Check if the corresponding manifest for 'load_something' is of type
# Array; if yes, just return the loaded data.
# This is likely an embedding without a temporal dimension.
if isinstance(manifest, Array):
return first_cut.load_custom(name)
# We are loading either an array with a temporal dimension, or a recording:
# We need to pad it.
left_padding = self.tracks[first_idx].offset
padded_duration = self.duration
# Then, check if it's a Recording. In that case we convert it to a cut,
# leverage existing padding methods, and load padded audio data.
if isinstance(manifest, Recording):
return (
manifest.to_cut()
.pad(duration=manifest.duration + left_padding, direction="left")
.pad(duration=padded_duration, direction="right")
.load_audio()
)
# Load the array and retrieve the manifest from the only non-padding cut.
# We'll also need to fetch the dict defining what padding value to use (if present).
array = first_cut.load_custom(name)
try:
pad_value_dict = [
t.cut for t in self.tracks if isinstance(t.cut, PaddingCut)
][0].custom
pad_value = pad_value_dict[name]
except:
pad_value = DEFAULT_PADDING_VALUE
return pad_array(
array,
temporal_dim=manifest.temporal_dim,
frame_shift=manifest.frame_shift,
offset=left_padding,
padded_duration=padded_duration,
pad_value=pad_value,
)
def _load_custom_recording_multi_track(
self,
name: str,
tracks_with_attr: list,
) -> np.ndarray:
"""
Load and combine audio from a custom Recording attribute across multiple
tracks. This handles the case where a MixedCut was created via
:meth:`Cut.append` and each constituent cut has its own custom Recording
(e.g., ``target_audio`` at a potentially different sampling rate than the
main recording).
"""
first_idx, first_cut = tracks_with_attr[0]
first_audio = first_cut.load_custom(name)
first_recording = getattr(first_cut, name)
custom_sr = first_recording.sampling_rate
mixer = AudioMixer(
base_audio=first_audio,
sampling_rate=custom_sr,
base_offset=self.tracks[first_idx].offset,
)
for track_idx, cut in tracks_with_attr[1:]:
audio = cut.load_custom(name)
mixer.add_to_mix(
audio=audio,
offset=self.tracks[track_idx].offset,
)
audio = mixer.mixed_audio
# Fix off-by-one due to float arithmetic, same as in load_audio().
expected_num_samples = compute_num_samples(self.duration, custom_sr)
tol_samples = compute_num_samples(
get_audio_duration_mismatch_tolerance(), custom_sr
)
num_samples_diff = audio.shape[1] - expected_num_samples
if 0 < num_samples_diff < tol_samples:
audio = audio[:, :expected_num_samples]
if -tol_samples < num_samples_diff < 0:
audio = np.pad(audio, [(0, 0), (0, -num_samples_diff)])
return audio
def _get_tracks_with_custom_attr(
self,
attr_name: str,
) -> list:
"""
Return a list of ``(track_idx, cut)`` tuples for all non-padding tracks
that have a custom attribute with the given name.
"""
return [
(idx, t.cut)
for idx, t in enumerate(self.tracks)
if isinstance(t.cut, DataCut)
and not t.mute
and t.cut.custom is not None
and attr_name in t.cut.custom
]
[docs] def move_to_memory(
self,
audio_format: str = "flac",
load_audio: bool = True,
load_features: bool = True,
load_custom: bool = True,
) -> "MixedCut":
"""
Load data (audio, features, or custom arrays) into memory and attach them
to a copy of the manifest. This is useful when you want to store cuts together
with the actual data in some binary format that enables sequential data reads.
Audio is encoded with ``audio_format`` (compatible with ``torchaudio.save``),
floating point features are encoded with lilcom, and other arrays are pickled.
"""
return fastcopy(
self,
tracks=[
fastcopy(
t,
cut=t.cut.move_to_memory(
audio_format=audio_format,
load_audio=load_audio,
load_features=load_features,
load_custom=load_custom,
),
)
for t in self.tracks
],
)
[docs] def to_mono(
self,
encoding: str = "flac",
**kwargs,
) -> "Cut":
"""
Convert this MixedCut to a MonoCut by mixing all tracks and channels into a single one.
The result audio array is stored in memory, and can be saved to disk by calling
``cut.save_audio(path, ...)`` on the result.
.. hint:: the resulting MonoCut will have ``custom`` field populated with the
``custom`` value from the first track of the MixedCut.
:param encoding: any of "wav", "flac", or "opus".
:return: a new MonoCut instance.
"""
samples = self.load_audio(mono_downmix=True)
stream = BytesIO()
save_audio(
stream,
samples,
self.sampling_rate,
format=encoding,
)
recording = Recording.from_bytes(stream.getvalue(), recording_id=self.id)
return fastcopy(
recording.to_cut(),
supervisions=[fastcopy(s, channel=0) for s in self.supervisions],
custom=self._first_non_padding_track.cut.custom,
)
[docs] def truncate(
self,
*,
offset: Seconds = 0.0,
duration: Optional[Seconds] = None,
keep_excessive_supervisions: bool = True,
preserve_id: bool = False,
_supervisions_index: Optional[Dict[str, IntervalTree]] = None,
) -> Cut:
"""
Returns a new MixedCut that is a sub-region of the current MixedCut. This method truncates the underlying Cuts
and modifies their offsets in the mix, as needed. Tracks that do not fit in the truncated cut are removed.
Note that no operation is done on the actual features - it's only during the call to load_features()
when the actual changes happen (a subset of features is loaded).
:param offset: float (seconds), controls the start of the new cut relative to the current MixedCut's start.
:param duration: optional float (seconds), controls the duration of the resulting MixedCut.
By default, the duration is (end of the cut before truncation) - (offset).
:param keep_excessive_supervisions: bool. Since trimming may happen inside a SupervisionSegment, the caller has
an option to either keep or discard such supervisions.
:param preserve_id: bool. Should the truncated cut keep the same ID or get a new, random one.
:return: a new MixedCut instance.
"""
assert (
offset >= 0
), f"Offset for truncate must be non-negative (provided {offset})."
old_duration = self.duration
new_mix_end = (
add_durations(old_duration, -offset, sampling_rate=self.sampling_rate)
if duration is None
else add_durations(offset, duration, sampling_rate=self.sampling_rate)
)
def truncate_track(track: MixTrack) -> Optional[MixTrack]:
# First, determine how much of the beginning of the current track we're going to truncate:
# when the track offset is larger than the truncation offset, we are not truncating the cut;
# just decreasing the track offset.
# 'cut_offset' determines how much we're going to truncate the Cut for the current track.
cut_offset = max(
add_durations(offset, -track.offset, sampling_rate=self.sampling_rate),
0,
)
# 'track_offset' determines the new track's offset after truncation.
track_offset = max(
add_durations(track.offset, -offset, sampling_rate=self.sampling_rate),
0,
)
# 'track_end' is expressed relative to the beginning of the mix
# (not to be confused with the 'start' of the underlying MonoCut)
track_end = add_durations(
track.offset, track.cut.duration, sampling_rate=self.sampling_rate
)
if track_end < offset:
# Omit a Cut that ends before the truncation offset.
return None
cut_duration_decrease = 0
if track_end > new_mix_end:
if duration is not None:
cut_duration_decrease = add_durations(
track_end, -new_mix_end, sampling_rate=self.sampling_rate
)
else:
cut_duration_decrease = add_durations(
track_end, -old_duration, sampling_rate=self.sampling_rate
)
# Compute the new Cut's duration after trimming the start and the end.
new_duration = add_durations(
track.cut.duration,
-cut_offset,
-cut_duration_decrease,
sampling_rate=self.sampling_rate,
)
if new_duration <= 0:
# Omit a Cut that is completely outside the time span of the new truncated MixedCut.
return None
return MixTrack(
cut=track.cut.truncate(
offset=cut_offset,
duration=new_duration,
keep_excessive_supervisions=keep_excessive_supervisions,
preserve_id=preserve_id,
_supervisions_index=_supervisions_index,
),
offset=track_offset,
snr=track.snr,
tag=track.tag,
is_snr_reference=track.is_snr_reference,
mute=track.mute,
)
new_tracks = [
new_track
for new_track in (
truncate_track(track)
for track in sorted(self.tracks, key=lambda t: t.offset)
)
if new_track is not None
]
# Edge case: no tracks with data left after truncation. This can happen if we truncated an offset region.
# In this case, return a PaddingCut of the requested duration
if len([t for t in new_tracks if not isinstance(t.cut, PaddingCut)]) == 0:
return PaddingCut(
id=self.id if preserve_id else str(uuid4()),
duration=duration,
sampling_rate=self.sampling_rate,
feat_value=0.0,
num_samples=compute_num_samples(duration, self.sampling_rate),
)
if len(new_tracks) == 1:
# The truncation resulted in just a single cut - simply return it.
return new_tracks[0].cut
new_cut = MixedCut(
id=self.id if preserve_id else str(uuid4()),
tracks=new_tracks,
)
# Final edge-case check. Scenario:
# - some of the original MixedCut tracks had specified an SNR
# - we truncated away the track that served as an SNR reference
# - we are left only with PaddingCuts and MonoCuts that have specified SNR
# Solution:
# - find first non padding cut and reset its SNR to None (make it the new reference)
if not any(track.is_snr_reference for track in new_cut.tracks) and all(
t.snr is not None or isinstance(t.cut, PaddingCut) for t in new_cut.tracks
):
first_non_padding_track_idx = [
idx
for idx, t in enumerate(new_cut.tracks)
if not isinstance(t.cut, PaddingCut)
][0]
new_cut.tracks[first_non_padding_track_idx] = fastcopy(
new_cut.tracks[first_non_padding_track_idx],
snr=None,
is_snr_reference=True,
)
return new_cut
[docs] def extend_by(
self,
*,
duration: Seconds,
direction: str = "both",
preserve_id: bool = False,
pad_silence: bool = True,
) -> "MixedCut":
"""
This raises a ValueError since extending a MixedCut is not defined.
:param duration: float (seconds), duration (in seconds) to extend the MixedCut.
:param direction: string, 'left', 'right' or 'both'. Determines whether to extend on the left,
right, or both sides. If 'both', extend on both sides by the duration specified in `duration`.
:param preserve_id: bool. Should the extended cut keep the same ID or get a new, random one.
:param pad_silence: bool. See usage in `lhotse.cut.MonoCut.extend_by`.
:return: a new MixedCut instance.
"""
raise ValueError("The extend_by() method is not defined for a MixedCut.")
[docs] def pad(
self,
duration: Seconds = None,
num_frames: int = None,
num_samples: int = None,
pad_feat_value: float = LOG_EPSILON,
direction: str = "right",
preserve_id: bool = False,
pad_value_dict: Optional[Dict[str, Union[int, float]]] = None,
) -> Cut:
"""
Return a new MixedCut, padded with zeros in the recording, and ``pad_feat_value`` in each feature bin.
The user can choose to pad either to a specific `duration`; a specific number of frames `num_frames`;
or a specific number of samples `num_samples`. The three arguments are mutually exclusive.
:param duration: The cut's minimal duration after padding.
:param num_frames: The cut's total number of frames after padding.
:param num_samples: The cut's total number of samples after padding.
:param pad_feat_value: A float value that's used for padding the features.
By default we assume a log-energy floor of approx. -23 (1e-10 after exp).
:param direction: string, 'left', 'right' or 'both'. Determines whether the padding is added before or after
the cut.
:param preserve_id: When ``True``, preserves the cut ID from before padding.
Otherwise, generates a new random ID (default).
:param pad_value_dict: Optional dict that specifies what value should be used
for padding arrays in custom attributes.
:return: a padded MixedCut if duration is greater than this cut's duration, otherwise ``self``.
"""
from .set import pad
return pad(
self,
duration=duration,
num_frames=num_frames,
num_samples=num_samples,
pad_feat_value=pad_feat_value,
direction=direction,
preserve_id=preserve_id,
pad_value_dict=pad_value_dict,
)
[docs] def resample(
self,
sampling_rate: int,
affix_id: bool = False,
recording_field: Optional[str] = None,
) -> "MixedCut":
"""
Return a new ``MixedCut`` that will lazily resample the audio while reading it.
This operation will drop the feature manifest, if attached.
It does not affect the supervision.
:param sampling_rate: The new sampling rate.
:param affix_id: Should we modify the ID (useful if both versions of the same
cut are going to be present in a single manifest).
:param recording_field: which recording field to resample.
:return: a modified copy of the current ``MixedCut``.
"""
assert self.has_recording, "Cannot resample a MixedCut without Recording."
return MixedCut(
id=f"{self.id}_rs{sampling_rate}" if affix_id else self.id,
tracks=[
fastcopy(
t,
cut=t.cut.resample(sampling_rate, recording_field=recording_field),
)
for t in self.tracks
],
)
[docs] def compress(
self,
codec: Codec = "opus",
compression_level: float = 0.99,
compress_custom_fields: bool = False,
):
"""
Return a copy of this Cut that has Recordings in its sub-Cuts processed by a lossy encoding.
:param codec: The codec to use for compression. Supported codecs are "opus", "mp3", "vorbis", "gsm".
:param compression_level: The level of compression (from 0.0 to 1.0, higher values correspond to higher compression).
:param compress_custom_fields: Whether to also compress any custom recording fields in sub-Cuts.
:return: A modified :class:`~lhotse.MixedCut` containing audio processed by a codec
"""
assert self.has_recording, "Cannot compress a MixedCut without a Recording."
return MixedCut(
id=self.id,
tracks=[
fastcopy(
t,
cut=t.cut.compress(
codec, compression_level, compress_custom_fields
),
)
for t in self.tracks
],
)
[docs] def perturb_speed(self, factor: float, affix_id: bool = True) -> "MixedCut":
"""
Return a new ``MixedCut`` that will lazily perturb the speed while loading audio.
The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts
(and their Recordings and SupervisionSegments) are updated to reflect
the shrinking/extending effect of speed.
We are also updating the offsets of all underlying tracks.
:param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster).
:param affix_id: When true, we will modify the ``MixedCut.id`` field
by affixing it with "_sp{factor}".
:return: a modified copy of the current ``MixedCut``.
"""
# TODO(pzelasko): test most extensively for edge cases
# Pre-conditions
assert (
self.has_recording
), "Cannot perturb speed on a MixedCut without Recording."
if self.has_features:
logging.warning(
"Attempting to perturb speed on a MixedCut that references pre-computed features. "
"The feature manifest(s) will be detached, as we do not support feature-domain "
"speed perturbation."
)
return MixedCut(
id=f"{self.id}_sp{factor}" if affix_id else self.id,
tracks=[
fastcopy(
track,
cut=track.cut.perturb_speed(factor=factor, affix_id=affix_id),
offset=round(
perturb_num_samples(
num_samples=compute_num_samples(
track.offset, self.sampling_rate
),
factor=factor,
)
/ self.sampling_rate,
ndigits=8,
),
)
for track in self.tracks
],
)
[docs] def perturb_tempo(self, factor: float, affix_id: bool = True) -> "MixedCut":
"""
Return a new ``MixedCut`` that will lazily perturb the tempo while loading audio.
Compared to speed perturbation, tempo preserves pitch.
The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts
(and their Recordings and SupervisionSegments) are updated to reflect
the shrinking/extending effect of tempo.
We are also updating the offsets of all underlying tracks.
:param factor: The tempo will be adjusted this many times (e.g. factor=1.1 means 1.1x faster).
:param affix_id: When true, we will modify the ``MixedCut.id`` field
by affixing it with "_tp{factor}".
:return: a modified copy of the current ``MixedCut``.
"""
# TODO(pzelasko): test most extensively for edge cases
# Pre-conditions
assert (
self.has_recording
), "Cannot perturb tempo on a MixedCut without Recording."
if self.has_features:
logging.warning(
"Attempting to perturb tempo on a MixedCut that references pre-computed features. "
"The feature manifest(s) will be detached, as we do not support feature-domain "
"tempo perturbation."
)
return MixedCut(
id=f"{self.id}_tp{factor}" if affix_id else self.id,
tracks=[
fastcopy(
track,
cut=track.cut.perturb_tempo(factor=factor, affix_id=affix_id),
offset=round(
perturb_num_samples(
num_samples=compute_num_samples(
track.offset, self.sampling_rate
),
factor=factor,
)
/ self.sampling_rate,
ndigits=8,
),
)
for track in self.tracks
],
)
[docs] def perturb_volume(self, factor: float, affix_id: bool = True) -> "MixedCut":
"""
Return a new ``MixedCut`` that will lazily perturb the volume while loading audio.
Recordings of the underlying Cuts are updated to reflect volume change.
:param factor: The volume will be adjusted this many times (e.g. factor=1.1 means 1.1x louder).
:param affix_id: When true, we will modify the ``MixedCut.id`` field
by affixing it with "_vp{factor}".
:return: a modified copy of the current ``MixedCut``.
"""
# Pre-conditions
assert (
self.has_recording
), "Cannot perturb volume on a MixedCut without Recording."
if self.has_features:
logging.warning(
"Attempting to perturb volume on a MixedCut that references pre-computed features. "
"The feature manifest(s) will be detached, as we do not support feature-domain "
"volume perturbation."
)
return MixedCut(
id=f"{self.id}_vp{factor}" if affix_id else self.id,
tracks=[
fastcopy(
track,
cut=track.cut.perturb_volume(factor=factor, affix_id=affix_id),
)
for track in self.tracks
],
)
[docs] def clip_amplitude(
self,
hard: bool = False,
gain_db: float = 0.0,
normalize: bool = True,
oversampling: Optional[int] = 2,
affix_id: bool = True,
) -> "MixedCut":
"""
Return a new ``MixedCut`` that will lazily apply clipping while loading audio.
Recordings of the underlying Cuts are updated to reflect clipping change.
:param hard: If True, apply hard clipping (sharp cutoff); otherwise, apply soft clipping (saturation).
:param gain_db: The amount of gain in decibels to apply before clipping.
:param normalize: If True, normalize the input signal to 0 dBFS before applying clipping.
:param oversampling: If provided, we will oversample the input signal by the given integer factor before applying saturation and then downsample back to the original sampling rate.
:param affix_id: When true, we will modify the ``MixedCut.id`` field
by affixing it with "_cl{gain_db}".
:return: a modified copy of the current ``MixedCut``.
"""
# Pre-conditions
assert (
self.has_recording
), "Cannot apply saturation on a MixedCut without Recording."
if self.has_features:
logging.warning(
"Attempting to apply saturation on a MixedCut that references pre-computed features. "
"The feature manifest(s) will be detached, as we do not support feature-domain "
"saturation."
)
return MixedCut(
id=f"{self.id}_cl{gain_db}" if affix_id else self.id,
tracks=[
fastcopy(
track,
cut=track.cut.clip_amplitude(
hard=hard,
gain_db=gain_db,
normalize=normalize,
oversampling=oversampling,
affix_id=affix_id,
),
)
for track in self.tracks
],
)
[docs] def normalize_loudness(
self, target: float, mix_first: bool = True, affix_id: bool = False
) -> "DataCut":
"""
Return a new ``MixedCut`` that will lazily apply loudness normalization.
:param target: The target loudness in dBFS.
:param mix_first: If true, we will mix the underlying cuts before applying
loudness normalization. If false, we cannot guarantee that the resulting
cut will have the target loudness.
:param affix_id: When true, we will modify the ``DataCut.id`` field
by affixing it with "_ln{target}".
:return: a modified copy of the current ``DataCut``.
"""
# Pre-conditions
assert (
self.has_recording
), "Cannot apply loudness normalization on a MixedCut without Recording."
if self.has_features:
logging.warning(
"Attempting to normalize loudness on a MixedCut that references pre-computed features. "
"The feature manifest will be detached, as we do not support feature-domain "
"loudness normalization."
)
self.features = None
if mix_first:
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(LoudnessNormalization(target=target))
return fastcopy(
self,
id=f"{self.id}_ln{target}" if affix_id else self.id,
transforms=transforms,
)
else:
return MixedCut(
id=f"{self.id}_ln{target}" if affix_id else self.id,
tracks=[
fastcopy(
track,
cut=track.cut.normalize_loudness(
target=target, affix_id=affix_id
),
)
for track in self.tracks
],
)
[docs] def reverb_rir(
self,
rir_recording: Optional["Recording"] = None,
normalize_output: bool = True,
early_only: bool = False,
affix_id: bool = True,
rir_channels: List[int] = [0],
room_rng_seed: Optional[int] = None,
source_rng_seed: Optional[int] = None,
mix_first: bool = True,
) -> "MixedCut":
"""
Return a new ``MixedCut`` that will convolve the audio with the provided impulse response.
If no ``rir_recording`` is provided, we will generate an impulse response using a fast random
generator (https://arxiv.org/abs/2208.04101).
:param rir_recording: The impulse response to use for convolving.
:param normalize_output: When true, output will be normalized to have energy as input.
:param early_only: When true, only the early reflections (first 50 ms) will be used.
:param affix_id: When true, we will modify the ``MixedCut.id`` field
by affixing it with "_rvb".
:param rir_channels: The channels of the impulse response to use. By default, first channel is used.
If only one channel is specified, all tracks will be convolved with this channel. If a list
is provided, it must contain as many channels as there are tracks such that each track will
be convolved with one of the specified channels.
:param room_rng_seed: Seed for the room configuration.
:param source_rng_seed: Seed for the source position.
:param mix_first: When true, the mixing will be done first before convolving with the RIR.
This effectively means that all tracks will be convolved with the same RIR. If you
are simulating multi-speaker mixtures, you should set this to False.
:return: a modified copy of the current ``MixedCut``.
"""
# Pre-conditions
assert (
self.has_recording
), "Cannot apply reverberation on a MixedCut without Recording."
if self.has_features:
logging.warning(
"Attempting to reverberate a MixedCut that references pre-computed features. "
"The feature manifest(s) will be detached, as we do not support feature-domain "
"reverberation."
)
assert rir_recording is None or all(
c < rir_recording.num_channels for c in rir_channels
), "Invalid channel index in `rir_channels`."
audible_tracks = _get_audible_tracks(self)
assert len(rir_channels) == 1 or len(rir_channels) == len(
audible_tracks
), "Invalid number of channels in `rir_channels`, must be either 1 or equal to the number of tracks."
# There are 2 ways to apply RIRs:
# 1. Mix the tracks first, then apply RIRs. This is same as applying the same RIR
# to all tracks. It does not make sense if all tracks belong to different speakers,
# but it is useful for cases when we have a mixture of MonoCut and PaddingCut,
# and we want to apply the same RIR to all of them.
# 2. Apply RIRs to each track separately. This is useful when we want to simulate
# different speakers in the same room.
# First simulate the room config (will only be used if RIR is not provided)
uuid4_str = str(uuid4())
# The room RNG seed is based on the cut ID. This ensures that all tracks in the
# mixed cut will have the same room configuration.
if room_rng_seed is None:
room_rng_seed = hash_str_to_int(uuid4_str + self.id)
# The source RNG seed is based on the track ID. This ensures that each track
# will have a different source position.
source_rng_seeds = [source_rng_seed] * len(self.tracks)
if source_rng_seed is None:
source_rng_seeds = [
hash_str_to_int(uuid4_str + track.cut.id) for track in self.tracks
]
source_rng_seed = source_rng_seeds[0]
# Apply same RIR to all tracks after mixing (default)
if mix_first:
if rir_recording is None:
from lhotse.augmentation.utils import FastRandomRIRGenerator
rir_generator = FastRandomRIRGenerator(
sr=self.sampling_rate,
room_seed=room_rng_seed,
source_seed=source_rng_seed,
)
else:
rir_generator = None
transforms = self.transforms.copy() if self.transforms is not None else []
transforms.append(
ReverbWithImpulseResponse(
rir=rir_recording,
normalize_output=normalize_output,
early_only=early_only,
rir_channels=rir_channels if rir_channels is not None else [0],
rir_generator=rir_generator,
)
)
return fastcopy(
self,
id=f"{self.id}_rvb" if affix_id else self.id,
transforms=transforms,
)
# Apply RIRs to each track separately. Note that we do not pass a `mix_first`
# argument below since it is True by default.
if len(rir_channels) == 1:
rir_channels = rir_channels * len(self.tracks)
else:
audible_channels = iter(rir_channels)
rir_channels = [
next(audible_channels) if not track.mute else rir_channels[0]
for track in self.tracks
]
return MixedCut(
id=f"{self.id}_rvb" if affix_id else self.id,
tracks=[
fastcopy(
track,
cut=track.cut.reverb_rir(
rir_recording=rir_recording,
normalize_output=normalize_output,
early_only=early_only,
affix_id=affix_id,
rir_channels=[channel],
room_rng_seed=room_rng_seed,
source_rng_seed=seed,
),
)
for track, channel, seed in zip(
self.tracks, rir_channels, source_rng_seeds
)
],
)
[docs] @rich_exception_info
def load_features(self, mixed: bool = True) -> Optional[np.ndarray]:
"""
Loads the features of the source cuts and mixes them on-the-fly.
:param mixed: when True (default), the features are mixed together (as defined in
the mixing function for the extractor). This could result in either a 2D or 3D
array. For example, if all underlying tracks are single-channel, the output
will be a 2D array of shape (num_frames, num_features). If any of the tracks
are multi-channel, the output may be a 3D array of shape (num_frames, num_features,
num_channels).
:return: A numpy ndarray with features and with shape ``(num_frames, num_features)``,
or ``(num_tracks, num_frames, num_features)``
"""
if not self.has_features:
return None
tracks = _get_audible_tracks(self)
first_track = tracks[0]
first_cut = first_track.cut
# First, check for a simple scenario: just a single cut with padding.
# When that is the case, we don't have to instantiate a feature extractor,
# because we are not performing any actual mixing.
# That makes life simpler for the users who have a custom feature extractor,
# but don't actually care about feature-domain mixing; just want to pad.
if (
mixed
and first_track.snr is None
and tracks[1:]
and all(isinstance(t.cut, PaddingCut) for t in tracks[1:])
):
padding_val = tracks[1].cut.feat_value
first_cut_feats = first_cut.load_features()
if first_cut_feats.ndim == 2:
# 2D features
feats = np.ones((self.num_frames, self.num_features)) * padding_val
else:
# 3D features
feats = (
np.ones(
(self.num_frames, self.num_features, first_cut_feats.shape[-1])
)
* padding_val
)
feats[: first_cut.num_frames, ...] = first_cut_feats
return feats
# When there is more than one "regular" cut, we will perform an actual mix.
# First, we have to make sure that the reference energy levels are appropriate.
# They might not be if the first track is a padding track.
reference_feats = None
reference_energy = None
_, reference_track = _get_snr_reference_track(self)
feature_extractor = create_default_feature_extractor(
reference_track.cut.features_type
)
if reference_track is not first_track:
reference_feats = reference_track.cut.load_features()
reference_energy = feature_extractor.compute_energy(reference_feats)
# The mix itself.
first_cut_feats = first_cut.load_features()
first_cut_feats = _scale_features_for_snr(
first_cut_feats,
feature_extractor=feature_extractor,
snr=first_track.snr,
reference_energy=reference_energy,
)
mixer = FeatureMixer(
feature_extractor=feature_extractor,
base_feats=first_cut_feats,
frame_shift=first_cut.frame_shift,
reference_energy=reference_energy,
)
for track in tracks[1:]:
if track is reference_track and reference_feats is not None:
feats = reference_feats # manual caching to avoid duplicated I/O
else:
feats = track.cut.load_features()
mixer.add_to_mix(
feats=feats,
snr=track.snr,
offset=track.offset,
sampling_rate=track.cut.sampling_rate,
)
if mixed:
# Checking for some edge cases below.
feats = mixer.mixed_feats
# Note: The slicing below is a work-around for an edge-case
# when two cuts have durations that ended with 0.005 (e.g. 10.125 and 5.715)
# - then, the feature extractor "squeezed in" a last extra frame and the simple
# relationship between num_frames and duration we strived for is not true;
# i.e. the duration is 10.125 + 5.715 = 15.84, but the number of frames is
# 1013 + 572 = 1585. If the frame_shift is 0.01, we have gained an extra 0.01s...
if feats.shape[0] - self.num_frames == 1:
feats = feats[: self.num_frames, :]
# TODO(pzelasko): This can sometimes happen in a MixedCut with >= 5 different Cuts,
# with a regular MonoCut at the end, when the mix offsets are floats with a lot of decimals.
# For now we're duplicating the last frame to match the declared "num_frames" of this cut.
if feats.shape[0] - self.num_frames == -1:
feats = np.concatenate((feats, feats[-1:, :]), axis=0)
assert feats.shape[0] == self.num_frames, (
"Inconsistent number of frames in a MixedCut: please report "
"this issue at https://github.com/lhotse-speech/lhotse/issues "
"showing the output of print(cut) or str(cut) on which"
"load_features() was called."
)
return feats
else:
return mixer.unmixed_feats
[docs] @rich_exception_info
def load_audio(
self, mixed: bool = True, mono_downmix: bool = False
) -> Optional[np.ndarray]:
"""
Loads the audios of the source cuts and mix them on-the-fly.
:param mixed: When True (default), returns a mix of the underlying tracks. This will
return a numpy array with shape ``(num_channels, num_samples)``, where ``num_channels``
is determined by the ``num_channels`` property of the MixedCut. Otherwise returns a
numpy array with the number of channels equal to the total number of channels
across all tracks in the MixedCut. For example, if it contains a MultiCut with 2
channels and a MonoCut with 1 channel, the returned array will have shape
``(3, num_samples)``.
:param mono_downmix: If the MixedCut contains > 1 channels (for e.g. when one of its tracks
is a MultiCut), this parameter controls whether the returned array will be down-mixed
to a single channel. This down-mixing is done by summing the channels together.
:return: A numpy ndarray with audio samples and with shape ``(num_channels, num_samples)``
"""
if not self.has_recording:
return None
tracks = _get_audible_tracks(self)
first_track = tracks[0]
first_cut = first_track.cut
# First, we have to make sure that the reference energy levels are appropriate.
# They might not be if the first track is a padding track.
reference_audio = None
reference_energy = None
_, reference_track = _get_snr_reference_track(self)
if reference_track is not first_track:
reference_audio = reference_track.cut.load_audio()
reference_energy = audio_energy(reference_audio)
first_cut_audio = _scale_audio_for_snr(
first_cut.load_audio(),
snr=first_track.snr,
reference_energy=reference_energy,
)
mixer = AudioMixer(
first_cut_audio,
sampling_rate=first_cut.sampling_rate,
reference_energy=reference_energy,
base_offset=first_track.offset,
)
for track in tracks[1:]:
if track is reference_track and reference_audio is not None:
audio = reference_audio # manual caching to avoid duplicated I/O
else:
audio = track.cut.load_audio()
mixer.add_to_mix(
audio=audio,
snr=track.snr,
offset=track.offset,
)
# Flattening a MixedCut without MultiCut tracks has no effect
mono_downmix = mono_downmix and any(
track.type == "MultiCut" for track in tracks
)
# Flattening a MixedCut without mixed=True has no effect
mono_downmix = mono_downmix and mixed
if mixed:
# Off-by-one errors can happen during mixing due to imperfect float arithmetic and rounding;
# we will fix them on-the-fly so that the manifest does not lie about the num_samples.
audio = mixer.mixed_mono_audio if mono_downmix else mixer.mixed_audio
tol_samples = compute_num_samples(
get_audio_duration_mismatch_tolerance(),
sampling_rate=self.sampling_rate,
)
num_samples_diff = audio.shape[1] - self.num_samples
if 0 < num_samples_diff < tol_samples:
audio = audio[:, : self.num_samples]
if -tol_samples < num_samples_diff < 0:
audio = np.pad(audio, [(0, 0), (0, -num_samples_diff)], mode="reflect")
assert audio.shape[1] == self.num_samples, (
f"Inconsistent number of samples in a MixedCut. Expected {self.num_samples} "
f"but the output of mix has {audio.shape[1]}. Please report "
f"this issue at https://github.com/lhotse-speech/lhotse/issues "
f"showing the cut below. MixedCut:\n{self}"
)
# We'll apply the transforms now (if any).
transforms = [
tnfm
if isinstance(tnfm, AudioTransform)
else AudioTransform.from_dict(tnfm)
for tnfm in self.transforms or []
]
for tfn in transforms:
audio = tfn(audio, self.sampling_rate)
else:
audio = mixer.unmixed_audio
return audio
@property
def video(self) -> Optional[VideoInfo]:
if self.has_video:
v = self._first_non_padding_cut.video
return v.copy_with(num_frames=compute_num_samples(self.duration, v.fps))
return None
[docs] @rich_exception_info
def load_video(
self,
with_audio: bool = True,
mixed: bool = True,
mono_downmix: bool = False,
) -> Optional[Tuple[torch.Tensor, Optional[torch.Tensor]]]:
if not self.has_video:
return None
tracks = _get_audible_tracks(self)
mixer = VideoMixer(
tracks[0].cut.load_video(with_audio=False)[0],
fps=self.video.fps,
base_offset=tracks[0].offset,
)
for track in tracks[1:]:
mixer.add_to_mix(
video=track.cut.load_video(with_audio=False)[0],
offset=track.offset,
)
video = mixer.mixed_video
if with_audio:
# For now load audio separately to re-use the complex logic of load_audio
# This means the same video file is potentially opened twice, but given the
# cost of video decoding, the extra file open cost could be negligible.
audio = self.load_audio(mixed=mixed, mono_downmix=mono_downmix)
return video, torch.from_numpy(audio)
[docs] def plot_tracks_features(self):
"""
Display the feature matrix as an image. Requires matplotlib to be installed.
"""
import matplotlib.pyplot as plt
tracks = _get_audible_tracks(self)
fig, axes = plt.subplots(len(tracks))
features = self.load_features(mixed=False)
fmin, fmax = features.min(), features.max()
for idx, ax in enumerate(axes):
ax.imshow(np.flip(features[idx].transpose(1, 0), 0), vmin=fmin, vmax=fmax)
return axes
[docs] def plot_tracks_audio(self):
"""
Display plots of the individual tracks' waveforms. Requires matplotlib to be installed.
"""
import matplotlib.pyplot as plt
audio = self.load_audio(mixed=False)
tracks = _get_audible_tracks(self)
fig, axes = plt.subplots(len(tracks), sharex=False, sharey=True)
for idx, (track, ax) in enumerate(zip(tracks, axes)):
samples = audio[idx].squeeze(0)
ax.plot(np.linspace(0, self.duration, len(samples)), samples)
for supervision in track.cut.supervisions:
supervision = supervision.trim(track.cut.duration)
ax.axvspan(
track.offset + supervision.start,
track.offset + supervision.end,
color="green",
alpha=0.1,
)
return axes
[docs] def drop_features(self) -> "MixedCut":
"""Return a copy of the current :class:`MixedCut`, detached from ``features``."""
assert (
self.has_recording
), f"Cannot detach features from a MixedCut with no Recording (cut ID = {self.id})."
return fastcopy(
self,
tracks=[fastcopy(t, cut=t.cut.drop_features()) for t in self.tracks],
)
[docs] def drop_recording(self) -> "MixedCut":
"""Return a copy of the current :class:`.MixedCut`, detached from ``recording``."""
assert (
self.has_features
), f"Cannot detach recording from a MixedCut with no Features (cut ID = {self.id})."
return fastcopy(
self,
tracks=[fastcopy(t, cut=t.cut.drop_recording()) for t in self.tracks],
)
[docs] def drop_supervisions(self) -> "MixedCut":
"""Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``."""
return fastcopy(
self,
tracks=[fastcopy(t, cut=t.cut.drop_supervisions()) for t in self.tracks],
)
[docs] def drop_alignments(self) -> "MixedCut":
"""Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``."""
return fastcopy(
self,
tracks=[fastcopy(t, cut=t.cut.drop_alignments()) for t in self.tracks],
)
[docs] def drop_in_memory_data(self) -> "MixedCut":
"""Return a copy of the current :class:`MixedCut`, which doesn't contain any in-memory data."""
return fastcopy(
self,
tracks=[fastcopy(t, cut=t.cut.drop_in_memory_data()) for t in self.tracks],
)
[docs] def compute_and_store_features(
self,
extractor: FeatureExtractor,
storage: FeaturesWriter,
augment_fn: Optional[AugmentFn] = None,
mix_eagerly: bool = True,
) -> DataCut:
"""
Compute the features from this cut, store them on disk, and create a new `MonoCut` object with the
feature manifest attached. This cut has to be able to load audio.
:param extractor: a ``FeatureExtractor`` instance used to compute the features.
:param storage: a ``FeaturesWriter`` instance used to store the features.
When the optional ``lilcom`` dependency is installed and on-disk size matters,
``LilcomChunkyWriter`` is the preferred backend.
:param augment_fn: an optional callable used for audio augmentation.
:param mix_eagerly: when False, extract and store the features for each track separately,
and mix them dynamically when loading the features.
When True, mix the audio first and store the mixed features, returning a new ``MonoCut`` instance
with the same ID. The returned ``MonoCut`` will not have a ``Recording`` attached.
:return: a new ``MonoCut`` instance if ``mix_eagerly`` is True, or returns ``self``
with each of the tracks containing the ``Features`` manifests.
"""
if mix_eagerly:
from .mono import MonoCut
features_info = extractor.extract_from_samples_and_store(
samples=self.load_audio(),
storage=storage,
sampling_rate=self.sampling_rate,
offset=0,
channel=0,
augment_fn=augment_fn,
)
features_info.recording_id = self.id
return MonoCut(
id=self.id,
start=0,
duration=self.duration,
channel=0,
supervisions=[
fastcopy(s, recording_id=self.id) for s in self.supervisions
],
features=features_info,
recording=None,
custom=self.custom if hasattr(self, "custom") else None,
)
else: # mix lazily
new_tracks = [
MixTrack(
cut=track.cut.compute_and_store_features(
extractor=extractor,
storage=storage,
augment_fn=augment_fn,
),
offset=track.offset,
snr=track.snr,
tag=track.tag,
is_snr_reference=track.is_snr_reference,
mute=track.mute,
)
for track in self.tracks
]
return MixedCut(
id=self.id,
tracks=new_tracks,
)
[docs] def fill_supervision(
self, add_empty: bool = True, shrink_ok: bool = False
) -> "MixedCut":
"""
Fills the whole duration of a cut with a supervision segment.
If the cut has one supervision, its start is set to 0 and duration is set to ``cut.duration``.
Note: this may either expand a supervision that was shorter than a cut, or shrink a supervision
that exceeds the cut.
If there are no supervisions, we will add an empty one when ``add_empty==True``, otherwise
we won't change anything.
If there are two or more supervisions, we will raise an exception.
.. note:: For :class:`.MixedCut`, we expect that only one track contains a supervision.
That supervision will be expanded to cover the full MixedCut's duration.
:param add_empty: should we add an empty supervision with identical time bounds as the cut.
:param shrink_ok: should we raise an error if a supervision would be shrank as a result
of calling this method.
"""
n_sups = len(self.supervisions)
if n_sups == 0:
if not add_empty:
return self
first_non_padding_idx = self.tracks.index(self._first_non_padding_track)
new_tracks = [
fastcopy(
t,
cut=fastcopy(
t.cut,
supervisions=[
SupervisionSegment(
id=self.id,
recording_id=t.cut.recording_id,
start=-t.offset,
duration=self.duration,
channel=-1,
)
],
),
)
if idx == first_non_padding_idx
else t
for idx, t in enumerate(self.tracks)
]
else:
assert (
n_sups == 1
), f"Cannot expand more than one supervision (found {len(self.supervisions)}."
new_tracks = []
for t in self.tracks:
if t.mute:
new_tracks.append(t)
continue
if len(t.cut.supervisions) == 0:
new_tracks.append(t)
else:
sup = t.cut.supervisions[0]
if not shrink_ok and (
sup.start < -t.offset or sup.end > self.duration
):
raise ValueError(
f"Cannot shrink supervision (start={sup.start}, end={sup.end}) to cut "
f"(start=0, duration={t.cut.duration}) because the argument `shrink_ok` is `False`. "
f"Note: this check prevents accidental data loss for speech recognition, "
f"as supervision exceeding a cut indicates there might be some spoken content "
f"beyond cuts start or end (an ASR model would be trained to predict more text than "
f"spoken in the audio). If this is okay, set `shrink_ok` to `True`."
)
new_tracks.append(
fastcopy(
t,
cut=fastcopy(
t.cut,
supervisions=[
fastcopy(
sup, start=-t.offset, duration=self.duration
)
],
),
)
)
return fastcopy(self, tracks=new_tracks)
[docs] def map_supervisions(
self, transform_fn: Callable[[SupervisionSegment], SupervisionSegment]
) -> Cut:
"""
Modify the SupervisionSegments by `transform_fn` of this MixedCut.
:param transform_fn: a function that modifies a supervision as an argument.
:return: a modified MixedCut.
"""
new_mixed_cut = fastcopy(self)
for track in new_mixed_cut.tracks:
if isinstance(track.cut, PaddingCut) or track.mute:
continue
track.cut.supervisions = [
segment.map(transform_fn) for segment in track.cut.supervisions
]
return new_mixed_cut
[docs] def merge_supervisions(
self,
merge_policy: str = "delimiter",
custom_merge_fn: Optional[Callable[[str, Iterable[Any]], Any]] = None,
) -> "MixedCut":
"""
Return a copy of the cut that has all of its supervisions merged into
a single segment.
The new start is the start of the earliest superivion, and the new duration
is a minimum spanning duration for all the supervisions. The text fields are
concatenated with a whitespace.
.. note:: If you're using individual tracks of a mixed cut, note that this transform
drops all the supervisions in individual tracks and assigns the merged supervision
in the first :class:`.DataCut` found in ``self.tracks``.
:param merge_policy: one of "keep_first" or "delimiter". If "keep_first", we
keep only the first segment's field value, otherwise all string fields
(including IDs) are prefixed with "cat#" and concatenated with a hash symbol "#".
This is also applied to ``custom`` fields. Fields with a ``None`` value are omitted.
:param custom_merge_fn: a function that will be called to merge custom fields values.
We expect ``custom_merge_fn`` to handle all possible custom keys.
When not provided, we will treat all custom values as strings.
It will be called roughly like:
``custom_merge_fn(custom_key, [s.custom[custom_key] for s in sups])``
"""
merge_func_ = partial(
merge_items_with_delimiter,
delimiter="#",
return_first=(merge_policy == "keep_first"),
)
# "m" stands for merged in variable names below
if custom_merge_fn is not None:
# Merge custom fields with the user-provided function.
merge_custom = custom_merge_fn
else:
# Merge the string representations of custom fields.
merge_custom = lambda k, vs: merge_func_(map(str, vs))
sups = sorted(self.supervisions, key=lambda s: s.start)
if len(sups) <= 1:
return self
# the sampling rate is arbitrary, ensures there are no float precision errors
mstart = sups[0].start
mend = sups[-1].end
mduration = add_durations(mend, -mstart, sampling_rate=self.sampling_rate)
custom_keys = set(
k for s in sups if s.custom is not None for k in s.custom.keys()
)
alignment_keys = set(
k for s in sups if s.alignment is not None for k in s.alignment.keys()
)
if any(overlaps(s1, s2) for s1, s2 in zip(sups, sups[1:])) and any(
s.text is not None for s in sups
):
warnings.warn(
"You are merging overlapping supervisions that have text transcripts. "
"The result is likely to be unusable if you are going to train speech "
f"recognition models (cut id: {self.id})."
)
msup = SupervisionSegment(
id=merge_func_(s.id for s in sups),
# Make merged recording_id is a mix of recording_ids.
recording_id=merge_func_(s.recording_id for s in sups),
start=mstart,
duration=mduration,
# Hardcode -1 to indicate no specific channel, as the supervisions might have
# come from different channels in their original recordings.
channel=-1,
text=" ".join(s.text for s in sups if s.text),
speaker=merge_func_(s.speaker for s in sups if s.speaker),
language=merge_func_(s.language for s in sups if s.language),
gender=merge_func_(s.gender for s in sups if s.gender),
custom={
k: merge_custom(
k,
(
s.custom[k]
for s in sups
if s.custom is not None and k in s.custom
),
)
for k in custom_keys
},
alignment={
# Concatenate the lists of alignment units.
k: reduce(
add,
(
s.alignment[k]
for s in sups
if s.alignment is not None and k in s.alignment
),
)
for k in alignment_keys
},
)
new_cut = self.drop_supervisions()
new_cut._first_non_padding_cut.supervisions = [msup]
return new_cut
[docs] def filter_supervisions(
self, predicate: Callable[[SupervisionSegment], bool]
) -> Cut:
"""
Modify cut to store only supervisions accepted by `predicate`
Example:
>>> cut = cut.filter_supervisions(lambda s: s.id in supervision_ids)
>>> cut = cut.filter_supervisions(lambda s: s.duration < 5.0)
>>> cut = cut.filter_supervisions(lambda s: s.text is not None)
:param predicate: A callable that accepts `SupervisionSegment` and returns bool
:return: a modified MixedCut
"""
new_mixed_cut = fastcopy(
self,
tracks=[
fastcopy(track, cut=track.cut.filter_supervisions(predicate))
for track in self.tracks
],
)
return new_mixed_cut
[docs] @staticmethod
def from_dict(data: dict) -> "MixedCut":
if "type" in data:
data.pop("type")
transforms = None
if "transforms" in data:
transforms = [AudioTransform.from_dict(t) for t in data["transforms"]]
tracks = [MixTrack.from_dict(track) for track in data["tracks"]]
if "snr_reference" in data:
tracks.append(
fastcopy(
MixTrack.from_dict(data["snr_reference"]),
is_snr_reference=True,
mute=True,
)
)
return MixedCut(
id=data["id"],
tracks=tracks,
transforms=transforms,
)
[docs] def with_features_path_prefix(self, path: Pathlike) -> "MixedCut":
if not self.has_features:
return self
return MixedCut(
id=self.id,
tracks=[
fastcopy(t, cut=t.cut.with_features_path_prefix(path))
for t in self.tracks
],
)
[docs] def with_recording_path_prefix(self, path: Pathlike) -> "MixedCut":
if not self.has_recording:
return self
return MixedCut(
id=self.id,
tracks=[
fastcopy(t, cut=t.cut.with_recording_path_prefix(path))
for t in self.tracks
],
)
@property
def first_non_padding_cut(self) -> DataCut:
return self._first_non_padding_track.cut
@property
def first_non_padding_track(self) -> MixTrack:
return _get_first_non_padding_track(self)
# Note: the private properties below are kept for backward compatibility.
@property
def _first_non_padding_cut(self) -> DataCut:
return self.first_non_padding_cut
@property
def _first_non_padding_track(self) -> MixTrack:
return self.first_non_padding_track
def _get_audible_tracks(mixed_cut: "MixedCut") -> List[MixTrack]:
tracks = [track for track in mixed_cut.tracks if not track.mute]
return tracks if tracks else mixed_cut.tracks
def _get_first_non_padding_track(mixed_cut: "MixedCut") -> MixTrack:
tracks = [
track
for track in _get_audible_tracks(mixed_cut)
if not isinstance(track.cut, PaddingCut)
]
if tracks:
return tracks[0]
return _get_audible_tracks(mixed_cut)[0]
def _get_snr_reference_track(mixed_cut: "MixedCut") -> Tuple[Optional[int], MixTrack]:
for idx, track in enumerate(mixed_cut.tracks):
if track.is_snr_reference:
return idx, track
for idx, track in enumerate(mixed_cut.tracks):
if not isinstance(track.cut, PaddingCut) and track.snr is None:
return idx, track
raise ValueError(
f"Cannot determine SNR reference track for MixedCut '{mixed_cut.id}'."
)
def _ensure_explicit_snr_reference(tracks: List[MixTrack]) -> List[MixTrack]:
if any(track.is_snr_reference for track in tracks):
return tracks
for idx, track in enumerate(tracks):
if not isinstance(track.cut, PaddingCut) and track.snr is None:
tracks[idx] = fastcopy(track, is_snr_reference=True)
break
return tracks
def _scale_audio_for_snr(
audio: np.ndarray,
snr: Optional[Decibels],
reference_energy: Optional[float],
) -> np.ndarray:
if snr is None or reference_energy is None or reference_energy <= 0.0:
return audio
added_audio_energy = audio_energy(audio)
if added_audio_energy <= 0.0:
return audio
target_energy = reference_energy * (10.0 ** (-snr / 10))
return np.sqrt(target_energy / added_audio_energy) * audio
def _scale_features_for_snr(
features: np.ndarray,
feature_extractor: FeatureExtractor,
snr: Optional[Decibels],
reference_energy: Optional[float],
) -> np.ndarray:
if snr is None or reference_energy is None or reference_energy <= 0.0:
return features
added_features_energy = feature_extractor.compute_energy(features)
if added_features_energy <= 0.0:
return features
target_energy = reference_energy * (10.0 ** (-snr / 10))
return feature_extractor.scale(features, target_energy / added_features_energy)
def _make_padding_cut(mixed_cut: "MixedCut") -> PaddingCut:
return PaddingCut(
id=str(uuid4()),
duration=mixed_cut.duration,
sampling_rate=mixed_cut.sampling_rate,
feat_value=LOG_EPSILON,
num_frames=mixed_cut.num_frames if mixed_cut.has_features else None,
num_features=mixed_cut.num_features if mixed_cut.has_features else None,
frame_shift=mixed_cut.frame_shift if mixed_cut.has_features else None,
num_samples=mixed_cut.num_samples if mixed_cut.has_recording else None,
video=mixed_cut.video if mixed_cut.has_video else None,
)
def _to_unmixed_cut(mixed_cut: "MixedCut", tracks: List[MixTrack]) -> Cut:
if not tracks:
return _make_padding_cut(mixed_cut)
tracks = _ensure_explicit_snr_reference([fastcopy(track) for track in tracks])
needs_reference = all(track.snr is not None for track in tracks)
if needs_reference:
_, reference_track = _get_snr_reference_track(mixed_cut)
tracks.append(fastcopy(reference_track, is_snr_reference=True, mute=True))
cut = MixedCut(
id=str(uuid4()),
tracks=tracks,
)
if cut.duration < mixed_cut.duration:
cut = cut.pad(duration=mixed_cut.duration, preserve_id=True)
return cut