Source code for lhotse.cut.mixed

import logging
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import numpy as np
from intervaltree import IntervalTree

from lhotse.audio import AudioMixer, Recording, audio_energy
from lhotse.augmentation import AugmentFn
from lhotse.cut.base import Cut
from lhotse.cut.mono import MonoCut
from lhotse.cut.padding import PaddingCut
from lhotse.features import (
    FeatureExtractor,
    FeatureMixer,
    create_default_feature_extractor,
)
from lhotse.features.io import FeaturesWriter
from lhotse.supervision import SupervisionSegment
from lhotse.utils import (
    DEFAULT_PADDING_VALUE,
    LOG_EPSILON,
    Decibels,
    NonPositiveEnergyError,
    Pathlike,
    Seconds,
    add_durations,
    compute_num_frames,
    compute_num_samples,
    fastcopy,
    perturb_num_samples,
    rich_exception_info,
    uuid4,
)


@dataclass
class MixTrack:
    """
    Represents a single track in a mix of Cuts. Points to a specific MonoCut and holds information on
    how to mix it with other Cuts, relative to the first track in a mix.
    """

    cut: Union[MonoCut, PaddingCut]
    offset: Seconds = 0.0
    snr: Optional[Decibels] = None

    @staticmethod
    def from_dict(data: dict):
        raw_cut = data.pop("cut")
        try:
            cut = MonoCut.from_dict(raw_cut)
        except TypeError:
            cut = PaddingCut.from_dict(raw_cut)
        return MixTrack(cut, **data)


[docs]@dataclass class MixedCut(Cut): """ :class:`~lhotse.cut.MixedCut` is a :class:`~lhotse.cut.Cut` that actually consists of multiple other cuts. It can be interpreted as a multi-channel cut, but its primary purpose is to allow time-domain and feature-domain augmentation via mixing the training cuts with noise, music, and babble cuts. The actual mixing operations are performed on-the-fly. Internally, :class:`~lhotse.cut.MixedCut` holds other cuts in multiple trakcs (:class:`~lhotse.cut.MixTrack`), each with its own offset and SNR that is relative to the first track. Please refer to the documentation of :class:`~lhotse.cut.Cut` to learn more about using cuts. In addition to methods available in :class:`~lhotse.cut.Cut`, :class:`~lhotse.cut.MixedCut` provides the methods to read all of its tracks audio and features as separate channels: >>> cut = MixedCut(...) >>> mono_features = cut.load_features() >>> assert len(mono_features.shape) == 2 >>> multi_features = cut.load_features(mixed=False) >>> # Now, the first dimension is the channel. >>> assert len(multi_features.shape) == 3 See also: - :class:`lhotse.cut.Cut` - :class:`lhotse.cut.MonoCut` - :class:`lhotse.cut.CutSet` """ id: str tracks: List[MixTrack] @property def supervisions(self) -> List[SupervisionSegment]: """ Lists the supervisions of the underlying source cuts. Each segment start time will be adjusted by the track offset. """ return [ segment.with_offset(track.offset) for track in self.tracks for segment in track.cut.supervisions ] @property def start(self) -> Seconds: return 0 @property def duration(self) -> Seconds: track_durations = (track.offset + track.cut.duration for track in self.tracks) return round(max(track_durations), ndigits=8) @property def has_features(self) -> bool: return self._first_non_padding_cut.has_features @property def has_recording(self) -> bool: return self._first_non_padding_cut.has_recording def has(self, field: str) -> bool: return self._first_non_padding_cut.has(field) @property def num_frames(self) -> Optional[int]: if self.has_features: return compute_num_frames( duration=self.duration, frame_shift=self.frame_shift, sampling_rate=self.sampling_rate, ) return None @property def frame_shift(self) -> Optional[Seconds]: return self.tracks[0].cut.frame_shift @property def sampling_rate(self) -> Optional[int]: return self.tracks[0].cut.sampling_rate @property def num_samples(self) -> Optional[int]: return compute_num_samples(self.duration, self.sampling_rate) @property def num_features(self) -> Optional[int]: return self.tracks[0].cut.num_features @property def features_type(self) -> Optional[str]: return self._first_non_padding_cut.features.type if self.has_features else None def __getattr__(self, name: str) -> Any: """ This magic function is called when the user tries to access an attribute of :class:`.MixedCut` that doesn't exist. It is used for accessing the custom attributes of cuts. We support exactly one scenario for mixed cuts: If :attr:`tracks` contains exactly one :class:`.MonoCut` object (and an arbitrary number of :class:`.PaddingCut` objects), we will look up the custom attributes of that cut. If one of the custom attributes is of type :class:`~lhotse.array.Array` or :class:`~lhotse.array.TemporalArray` we'll also support loading those arrays (see example below). Additionally, we will incorporate extra padding as dictated by padding cuts. Example: >>> cut = MonoCut('cut1', start=0, duration=4, channel=0) >>> cut.alignment = TemporalArray(...) >>> mixed_cut = cut.pad(10, pad_value_dict={'alignment': -1}) >>> ali = mixed_cut.load_alignment() """ # Python will sometimes try to call undefined magic functions, # just fail for them (e.g. __setstate__ when pickling). if name.startswith("__"): raise AttributeError() # Loading a custom array attribute + performing padding. if name.startswith("load_"): attr_name = name[5:] return partial(self.load_custom, attr_name) # Returning the contents of "mono_cut.custom[name]", # or raising AttributeError. try: ( non_padding_idx, mono_cut, ) = self._assert_one_mono_cut_with_attr_and_return_it_with_track_index(name) return getattr(mono_cut, name) except AssertionError: raise AttributeError( f"No such attribute: '{name}' (note: custom attributes are not supported " f"when a MixedCut consists of more than one MonoCut with that attribute)." ) def load_custom(self, name: str) -> np.ndarray: """ Load custom data as numpy array. The custom data is expected to have been stored in cuts ``custom`` field as an :class:`~lhotse.array.Array` or :class:`~lhotse.array.TemporalArray` manifest. .. note:: It works with Array manifests stored via attribute assignments, e.g.: ``cut.my_custom_data = Array(...)``. .. warning:: For :class:`.MixedCut`, this will only work if the mixed cut consists of a single :class:`.MonoCut` and an arbitrary number of :class:`.PaddingCuts`. This is because it is generally undefined how to mix arbitrary arrays. :param name: name of the custom attribute. :return: a numpy array with the data (after padding). """ from lhotse.array import Array, pad_array ( non_padding_idx, mono_cut, ) = self._assert_one_mono_cut_with_attr_and_return_it_with_track_index(name) # Use getattr to propagate AttributeError if "name" is not defined. manifest = getattr(mono_cut, name) # Check if the corresponding manifest for 'load_something' is of type # Array; if yes, just return the loaded data. # This is likely an embedding without a temporal dimension. if isinstance(manifest, Array): return mono_cut.load_custom(name) # We are loading either an array with a temporal dimension, or a recording: # We need to pad it. left_padding = self.tracks[non_padding_idx].offset padded_duration = self.duration # Then, check if it's a Recording. In that case we convert it to a cut, # leverage existing padding methods, and load padded audio data. if isinstance(manifest, Recording): return ( manifest.to_cut() .pad(duration=manifest.duration + left_padding, direction="left") .pad(duration=padded_duration, direction="right") .load_audio() ) # Load the array and retrieve the manifest from the only non-padding cut. # We'll also need to fetch the dict defining what padding value to use (if present). array = mono_cut.load_custom(name) try: pad_value_dict = [ t.cut for t in self.tracks if isinstance(t.cut, PaddingCut) ][0].custom pad_value = pad_value_dict[name] except: pad_value = DEFAULT_PADDING_VALUE return pad_array( array, temporal_dim=manifest.temporal_dim, frame_shift=manifest.frame_shift, offset=left_padding, padded_duration=padded_duration, pad_value=pad_value, ) def _assert_one_mono_cut_with_attr_and_return_it_with_track_index( self, attr_name: str, ) -> Tuple[int, MonoCut]: # TODO(pzelasko): consider relaxing this condition to # supporting mixed cuts that are not overlapping non_padding_cuts = [ (idx, t.cut) for idx, t in enumerate(self.tracks) if isinstance(t.cut, MonoCut) ] non_padding_cuts_with_custom_attr = [ (idx, cut) for idx, cut in non_padding_cuts if cut.custom is not None and attr_name in cut.custom ] assert len(non_padding_cuts_with_custom_attr) == 1, ( f"This MixedCut has {len(non_padding_cuts_with_custom_attr)} non-padding cuts " f"with a custom attribute '{attr_name}'. We currently don't support mixing custom attributes. " f"Consider dropping the attribute on all but one of MonoCuts. Problematic cut:\n{self}" ) non_padding_idx, mono_cut = non_padding_cuts_with_custom_attr[0] return non_padding_idx, mono_cut def truncate( self, *, offset: Seconds = 0.0, duration: Optional[Seconds] = None, keep_excessive_supervisions: bool = True, preserve_id: bool = False, _supervisions_index: Optional[Dict[str, IntervalTree]] = None, ) -> Cut: """ Returns a new MixedCut that is a sub-region of the current MixedCut. This method truncates the underlying Cuts and modifies their offsets in the mix, as needed. Tracks that do not fit in the truncated cut are removed. Note that no operation is done on the actual features - it's only during the call to load_features() when the actual changes happen (a subset of features is loaded). :param offset: float (seconds), controls the start of the new cut relative to the current MixedCut's start. :param duration: optional float (seconds), controls the duration of the resulting MixedCut. By default, the duration is (end of the cut before truncation) - (offset). :param keep_excessive_supervisions: bool. Since trimming may happen inside a SupervisionSegment, the caller has an option to either keep or discard such supervisions. :param preserve_id: bool. Should the truncated cut keep the same ID or get a new, random one. :return: a new MixedCut instance. """ assert ( offset >= 0 ), f"Offset for truncate must be non-negative (provided {offset})." new_tracks = [] old_duration = self.duration new_mix_end = ( add_durations(old_duration, -offset, sampling_rate=self.sampling_rate) if duration is None else add_durations(offset, duration, sampling_rate=self.sampling_rate) ) for track in sorted(self.tracks, key=lambda t: t.offset): # First, determine how much of the beginning of the current track we're going to truncate: # when the track offset is larger than the truncation offset, we are not truncating the cut; # just decreasing the track offset. # 'cut_offset' determines how much we're going to truncate the Cut for the current track. cut_offset = max( add_durations(offset, -track.offset, sampling_rate=self.sampling_rate), 0, ) # 'track_offset' determines the new track's offset after truncation. track_offset = max( add_durations(track.offset, -offset, sampling_rate=self.sampling_rate), 0, ) # 'track_end' is expressed relative to the beginning of the mix # (not to be confused with the 'start' of the underlying MonoCut) track_end = add_durations( track.offset, track.cut.duration, sampling_rate=self.sampling_rate ) if track_end < offset: # Omit a MonoCut that ends before the truncation offset. continue cut_duration_decrease = 0 if track_end > new_mix_end: if duration is not None: cut_duration_decrease = add_durations( track_end, -new_mix_end, sampling_rate=self.sampling_rate ) else: cut_duration_decrease = add_durations( track_end, -old_duration, sampling_rate=self.sampling_rate ) # Compute the new MonoCut's duration after trimming the start and the end. new_duration = add_durations( track.cut.duration, -cut_offset, -cut_duration_decrease, sampling_rate=self.sampling_rate, ) if new_duration <= 0: # Omit a MonoCut that is completely outside the time span of the new truncated MixedCut. continue new_tracks.append( MixTrack( cut=track.cut.truncate( offset=cut_offset, duration=new_duration, keep_excessive_supervisions=keep_excessive_supervisions, preserve_id=preserve_id, _supervisions_index=_supervisions_index, ), offset=track_offset, snr=track.snr, ) ) if len(new_tracks) == 1: # The truncation resulted in just a single cut - simply return it. return new_tracks[0].cut new_cut = MixedCut( id=self.id if preserve_id else str(uuid4()), tracks=new_tracks ) # Final edge-case check. Scenario: # - some of the original MixedCut tracks had specified an SNR # - we truncated away the track that served as an SNR reference # - we are left only with PaddingCuts and MonoCuts that have specified SNR # Solution: # - find first non padding cut and reset its SNR to None (make it the new reference) if all( t.snr is not None or isinstance(t.cut, PaddingCut) for t in new_cut.tracks ): first_non_padding_track_idx = [ idx for idx, t in enumerate(new_cut.tracks) if not isinstance(t.cut, PaddingCut) ][0] new_cut.tracks[first_non_padding_track_idx] = fastcopy( new_cut.tracks[first_non_padding_track_idx], snr=None ) return new_cut def extend_by( self, *, duration: Seconds, direction: str = "both", preserve_id: bool = False, pad_silence: bool = True, ) -> "MixedCut": """ This raises a ValueError since extending a MixedCut is not defined. :param duration: float (seconds), duration (in seconds) to extend the MixedCut. :param direction: string, 'left', 'right' or 'both'. Determines whether to extend on the left, right, or both sides. If 'both', extend on both sides by the duration specified in `duration`. :param preserve_id: bool. Should the extended cut keep the same ID or get a new, random one. :param pad_silence: bool. See usage in `lhotse.cut.MonoCut.extend_by`. :return: a new MixedCut instance. """ raise ValueError("The extend_by() method is not defined for a MixedCut.") def pad( self, duration: Seconds = None, num_frames: int = None, num_samples: int = None, pad_feat_value: float = LOG_EPSILON, direction: str = "right", preserve_id: bool = False, pad_value_dict: Optional[Dict[str, Union[int, float]]] = None, ) -> Cut: """ Return a new MixedCut, padded with zeros in the recording, and ``pad_feat_value`` in each feature bin. The user can choose to pad either to a specific `duration`; a specific number of frames `max_frames`; or a specific number of samples `num_samples`. The three arguments are mutually exclusive. :param duration: The cut's minimal duration after padding. :param num_frames: The cut's total number of frames after padding. :param num_samples: The cut's total number of samples after padding. :param pad_feat_value: A float value that's used for padding the features. By default we assume a log-energy floor of approx. -23 (1e-10 after exp). :param direction: string, 'left', 'right' or 'both'. Determines whether the padding is added before or after the cut. :param preserve_id: When ``True``, preserves the cut ID from before padding. Otherwise, generates a new random ID (default). :param pad_value_dict: Optional dict that specifies what value should be used for padding arrays in custom attributes. :return: a padded MixedCut if duration is greater than this cut's duration, otherwise ``self``. """ from .set import pad return pad( self, duration=duration, num_frames=num_frames, num_samples=num_samples, pad_feat_value=pad_feat_value, direction=direction, preserve_id=preserve_id, pad_value_dict=pad_value_dict, ) def resample(self, sampling_rate: int, affix_id: bool = False) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily resample the audio while reading it. This operation will drop the feature manifest, if attached. It does not affect the supervision. :param sampling_rate: The new sampling rate. :param affix_id: Should we modify the ID (useful if both versions of the same cut are going to be present in a single manifest). :return: a modified copy of the current ``MixedCut``. """ assert self.has_recording, "Cannot resample a MixedCut without Recording." return MixedCut( id=f"{self.id}_rs{sampling_rate}" if affix_id else self.id, tracks=[ fastcopy(t, cut=t.cut.resample(sampling_rate)) for t in self.tracks ], ) def perturb_speed(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the speed while loading audio. The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts (and their Recordings and SupervisionSegments) are updated to reflect the shrinking/extending effect of speed. We are also updating the offsets of all underlying tracks. :param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_sp{factor}". :return: a modified copy of the current ``MixedCut``. """ # TODO(pzelasko): test most extensively for edge cases # Pre-conditions assert ( self.has_recording ), "Cannot perturb speed on a MonoCut without Recording." if self.has_features: logging.warning( "Attempting to perturb speed on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "speed perturbation." ) return MixedCut( id=f"{self.id}_sp{factor}" if affix_id else self.id, tracks=[ MixTrack( cut=track.cut.perturb_speed(factor=factor, affix_id=affix_id), offset=round( perturb_num_samples( num_samples=compute_num_samples( track.offset, self.sampling_rate ), factor=factor, ) / self.sampling_rate, ndigits=8, ), snr=track.snr, ) for track in self.tracks ], ) def perturb_tempo(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the tempo while loading audio. Compared to speed perturbation, tempo preserves pitch. The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts (and their Recordings and SupervisionSegments) are updated to reflect the shrinking/extending effect of tempo. We are also updating the offsets of all underlying tracks. :param factor: The tempo will be adjusted this many times (e.g. factor=1.1 means 1.1x faster). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_tp{factor}". :return: a modified copy of the current ``MixedCut``. """ # TODO(pzelasko): test most extensively for edge cases # Pre-conditions assert ( self.has_recording ), "Cannot perturb tempo on a MonoCut without Recording." if self.has_features: logging.warning( "Attempting to perturb tempo on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "tempo perturbation." ) return MixedCut( id=f"{self.id}_tp{factor}" if affix_id else self.id, tracks=[ MixTrack( cut=track.cut.perturb_tempo(factor=factor, affix_id=affix_id), offset=round( perturb_num_samples( num_samples=compute_num_samples( track.offset, self.sampling_rate ), factor=factor, ) / self.sampling_rate, ndigits=8, ), snr=track.snr, ) for track in self.tracks ], ) def perturb_volume(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the volume while loading audio. Recordings of the underlying Cuts are updated to reflect volume change. :param factor: The volume will be adjusted this many times (e.g. factor=1.1 means 1.1x louder). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_vp{factor}". :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot perturb volume on a MonoCut without Recording." if self.has_features: logging.warning( "Attempting to perturb volume on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "volume perturbation." ) return MixedCut( id=f"{self.id}_vp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_volume(factor=factor, affix_id=affix_id), ) for track in self.tracks ], ) def reverb_rir( self, rir_recording: Optional["Recording"] = None, normalize_output: bool = True, early_only: bool = False, affix_id: bool = True, rir_channels: List[int] = [0], ) -> "MixedCut": """ Return a new ``MixedCut`` that will convolve the audio with the provided impulse response. If no ``rir_recording`` is provided, we will generate an impulse response using a fast random generator (https://arxiv.org/abs/2208.04101). :param rir_recording: The impulse response to use for convolving. :param normalize_output: When true, output will be normalized to have energy as input. :param early_only: When true, only the early reflections (first 50 ms) will be used. :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_rvb". :param rir_channels: The channels of the impulse response to use. By default, first channel is used. If only one channel is specified, all tracks will be convolved with this channel. If a list is provided, it must contain as many channels as there are tracks such that each track will be convolved with one of the specified channels. :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply reverberation on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to reverberate a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "reverberation." ) assert rir_recording is None or all( c < rir_recording.num_channels for c in rir_channels ), "Invalid channel index in `rir_channels`." assert len(rir_channels) == 1 or len(rir_channels) == len( self.tracks ), "Invalid number of channels in `rir_channels`. Must be 1 or equal to number of tracks." if len(rir_channels) == 1: rir_channels = rir_channels * len(self.tracks) # NOTE: Currently, if no RIR is provided, this method will generate a # random one for each track in the MixedCut. This is not ideal since the room # configuration for all the RIRs should be the same. But we ignore this for now # since it simplifies the implementation considerably. return MixedCut( id=f"{self.id}_rvb" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.reverb_rir( rir_recording=rir_recording, normalize_output=normalize_output, early_only=early_only, affix_id=affix_id, rir_channels=[channel], ), ) for track, channel in zip(self.tracks, rir_channels) ], ) @rich_exception_info def load_features(self, mixed: bool = True) -> Optional[np.ndarray]: """ Loads the features of the source cuts and mixes them on-the-fly. :param mixed: when True (default), returns a 2D array of features mixed in the feature domain. Otherwise returns a 3D array with the first dimension equal to the number of tracks. :return: A numpy ndarray with features and with shape ``(num_frames, num_features)``, or ``(num_tracks, num_frames, num_features)`` """ if not self.has_features: return None first_cut = self.tracks[0].cut # First, check for a simple scenario: just a single cut with padding. # When that is the case, we don't have to instantiate a feature extractor, # because we are not performing any actual mixing. # That makes life simpler for the users who have a custom feature extractor, # but don't actually care about feature-domain mixing; just want to pad. if mixed and all(isinstance(t.cut, PaddingCut) for t in self.tracks[1:]): padding_val = self.tracks[1].cut.feat_value feats = np.ones((self.num_frames, self.num_features)) * padding_val feats[: first_cut.num_frames, :] = first_cut.load_features() return feats # When there is more than one "regular" cut, we will perform an actual mix. # First, we have to make sure that the reference energy levels are appropriate. # They might not be if the first track is a padding track. reference_feats = None reference_energy = None reference_pos, reference_cut = [ (idx, t.cut) for idx, t in enumerate(self.tracks) if not isinstance(t.cut, PaddingCut) and t.snr is None ][0] feature_extractor = create_default_feature_extractor( reference_cut.features.type ) if first_cut.id != reference_cut.id: reference_feats = reference_cut.load_features() reference_energy = feature_extractor.compute_energy(reference_feats) # The mix itself. mixer = FeatureMixer( feature_extractor=create_default_feature_extractor( self._first_non_padding_cut.features.type ), base_feats=first_cut.load_features(), frame_shift=first_cut.frame_shift, reference_energy=reference_energy, ) for pos, track in enumerate(self.tracks[1:], start=1): try: if pos == reference_pos and reference_feats is not None: feats = reference_feats # manual caching to avoid duplicated I/O else: feats = track.cut.load_features() mixer.add_to_mix( feats=feats, snr=track.snr, offset=track.offset, sampling_rate=track.cut.sampling_rate, ) except NonPositiveEnergyError as e: logging.warning( str(e) + f' MonoCut with id "{track.cut.id}" will not be mixed in.' ) if mixed: # Checking for some edge cases below. feats = mixer.mixed_feats # Note: The slicing below is a work-around for an edge-case # when two cuts have durations that ended with 0.005 (e.g. 10.125 and 5.715) # - then, the feature extractor "squeezed in" a last extra frame and the simple # relationship between num_frames and duration we strived for is not true; # i.e. the duration is 10.125 + 5.715 = 15.84, but the number of frames is # 1013 + 572 = 1585. If the frame_shift is 0.01, we have gained an extra 0.01s... if feats.shape[0] - self.num_frames == 1: feats = feats[: self.num_frames, :] # TODO(pzelasko): This can sometimes happen in a MixedCut with >= 5 different Cuts, # with a regular MonoCut at the end, when the mix offsets are floats with a lot of decimals. # For now we're duplicating the last frame to match the declared "num_frames" of this cut. if feats.shape[0] - self.num_frames == -1: feats = np.concatenate((feats, feats[-1:, :]), axis=0) assert feats.shape[0] == self.num_frames, ( "Inconsistent number of frames in a MixedCut: please report " "this issue at https://github.com/lhotse-speech/lhotse/issues " "showing the output of print(cut) or str(cut) on which" "load_features() was called." ) return feats else: return mixer.unmixed_feats @rich_exception_info def load_audio(self, mixed: bool = True) -> Optional[np.ndarray]: """ Loads the audios of the source cuts and mix them on-the-fly. :param mixed: When True (default), returns a mono mix of the underlying tracks. Otherwise returns a numpy array with the number of channels equal to the number of tracks. :return: A numpy ndarray with audio samples and with shape ``(num_channels, num_samples)`` """ if not self.has_recording: return None first_cut = self.tracks[0].cut # First, we have to make sure that the reference energy levels are appropriate. # They might not be if the first track is a padding track. reference_audio = None reference_energy = None reference_pos, reference_cut = [ (idx, t.cut) for idx, t in enumerate(self.tracks) if not isinstance(t.cut, PaddingCut) and t.snr is None ][0] if first_cut.id != reference_cut.id: reference_audio = reference_cut.load_audio() reference_energy = audio_energy(reference_audio) try: mixer = AudioMixer( self.tracks[0].cut.load_audio(), sampling_rate=self.tracks[0].cut.sampling_rate, reference_energy=reference_energy, ) except NonPositiveEnergyError as e: logging.warning( f"{e}\nNote: we cannot mix signal with a given SNR to the reference audio with zero energy. " f'Cut ID: "{self.tracks[0].cut.id}"' ) raise for pos, track in enumerate(self.tracks[1:], start=1): try: if pos == reference_pos and reference_audio is not None: audio = reference_audio # manual caching to avoid duplicated I/O else: audio = track.cut.load_audio() mixer.add_to_mix( audio=audio, snr=track.snr, offset=track.offset, ) except NonPositiveEnergyError as e: logging.warning( f'{e} MonoCut with id "{track.cut.id}" will not be mixed in.' ) if mixed: # Off-by-one errors can happen during mixing due to imperfect float arithmetic and rounding; # we will fix them on-the-fly so that the manifest does not lie about the num_samples. audio = mixer.mixed_audio if audio.shape[1] - self.num_samples == 1: audio = audio[:, : self.num_samples] if audio.shape[1] - self.num_samples == -1: audio = np.concatenate((audio, audio[:, -1:]), axis=1) assert audio.shape[1] == self.num_samples, ( f"Inconsistent number of samples in a MixedCut: please report " f"this issue at https://github.com/lhotse-speech/lhotse/issues " f"showing the cut below. MixedCut:\n{self}" ) else: audio = mixer.unmixed_audio return audio def plot_tracks_features(self): """ Display the feature matrix as an image. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt fig, axes = plt.subplots(len(self.tracks)) features = self.load_features(mixed=False) fmin, fmax = features.min(), features.max() for idx, ax in enumerate(axes): ax.imshow(np.flip(features[idx].transpose(1, 0), 0), vmin=fmin, vmax=fmax) return axes def plot_tracks_audio(self): """ Display plots of the individual tracks' waveforms. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt audio = self.load_audio(mixed=False) fig, axes = plt.subplots(len(self.tracks), sharex=False, sharey=True) for idx, (track, ax) in enumerate(zip(self.tracks, axes)): samples = audio[idx].squeeze(0) ax.plot(np.linspace(0, self.duration, len(samples)), samples) for supervision in track.cut.supervisions: supervision = supervision.trim(track.cut.duration) ax.axvspan( track.offset + supervision.start, track.offset + supervision.end, color="green", alpha=0.1, ) return axes def drop_features(self) -> "MixedCut": """Return a copy of the current :class:`MixedCut`, detached from ``features``.""" assert ( self.has_recording ), f"Cannot detach features from a MixedCut with no Recording (cut ID = {self.id})." return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_features()) for t in self.tracks] ) def drop_recording(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``recording``.""" assert ( self.has_features ), f"Cannot detach recording from a MixedCut with no Features (cut ID = {self.id})." return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_recording()) for t in self.tracks] ) def drop_supervisions(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``.""" return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_supervisions()) for t in self.tracks], ) def compute_and_store_features( self, extractor: FeatureExtractor, storage: FeaturesWriter, augment_fn: Optional[AugmentFn] = None, mix_eagerly: bool = True, ) -> Cut: """ Compute the features from this cut, store them on disk, and create a new `MonoCut` object with the feature manifest attached. This cut has to be able to load audio. :param extractor: a ``FeatureExtractor`` instance used to compute the features. :param storage: a ``FeaturesWriter`` instance used to store the features. :param augment_fn: an optional callable used for audio augmentation. :param mix_eagerly: when False, extract and store the features for each track separately, and mix them dynamically when loading the features. When True, mix the audio first and store the mixed features, returning a new ``MonoCut`` instance with the same ID. The returned ``MonoCut`` will not have a ``Recording`` attached. :return: a new ``MonoCut`` instance if ``mix_eagerly`` is True, or returns ``self`` with each of the tracks containing the ``Features`` manifests. """ if mix_eagerly: features_info = extractor.extract_from_samples_and_store( samples=self.load_audio(), storage=storage, sampling_rate=self.sampling_rate, offset=0, channel=0, augment_fn=augment_fn, ) features_info.recording_id = self.id return MonoCut( id=self.id, start=0, duration=self.duration, channel=0, supervisions=[ fastcopy(s, recording_id=self.id) for s in self.supervisions ], features=features_info, recording=None, custom=self.custom if hasattr(self, "custom") else None, ) else: # mix lazily new_tracks = [ MixTrack( cut=track.cut.compute_and_store_features( extractor=extractor, storage=storage, augment_fn=augment_fn, ), offset=track.offset, snr=track.snr, ) for track in self.tracks ] return MixedCut(id=self.id, tracks=new_tracks) def fill_supervision( self, add_empty: bool = True, shrink_ok: bool = False ) -> "MixedCut": """ Fills the whole duration of a cut with a supervision segment. If the cut has one supervision, its start is set to 0 and duration is set to ``cut.duration``. Note: this may either expand a supervision that was shorter than a cut, or shrink a supervision that exceeds the cut. If there are no supervisions, we will add an empty one when ``add_empty==True``, otherwise we won't change anything. If there are two or more supervisions, we will raise an exception. .. note:: For :class:`.MixedCut`, we expect that only one track contains a supervision. That supervision will be expanded to cover the full MixedCut's duration. :param add_empty: should we add an empty supervision with identical time bounds as the cut. :param shrink_ok: should we raise an error if a supervision would be shrank as a result of calling this method. """ n_sups = len(self.supervisions) if n_sups == 0: if not add_empty: return self first_non_padding_idx = [ idx for idx, t in enumerate(self.tracks) if isinstance(t.cut, MonoCut) ][0] new_tracks = [ fastcopy( t, cut=fastcopy( t.cut, supervisions=[ SupervisionSegment( id=self.id, recording_id=t.cut.recording_id, start=-t.offset, duration=self.duration, channel=-1, ) ], ), ) if idx == first_non_padding_idx else t for idx, t in enumerate(self.tracks) ] else: assert ( n_sups == 1 ), f"Cannot expand more than one supervision (found {len(self.supervisions)}." new_tracks = [] for t in self.tracks: if len(t.cut.supervisions) == 0: new_tracks.append(t) else: sup = t.cut.supervisions[0] if not shrink_ok and ( sup.start < -t.offset or sup.end > self.duration ): raise ValueError( f"Cannot shrink supervision (start={sup.start}, end={sup.end}) to cut " f"(start=0, duration={t.cut.duration}) because the argument `shrink_ok` is `False`. " f"Note: this check prevents accidental data loss for speech recognition, " f"as supervision exceeding a cut indicates there might be some spoken content " f"beyond cuts start or end (an ASR model would be trained to predict more text than " f"spoken in the audio). If this is okay, set `shrink_ok` to `True`." ) new_tracks.append( fastcopy( t, cut=fastcopy( t.cut, supervisions=[ fastcopy( sup, start=-t.offset, duration=self.duration ) ], ), ) ) return fastcopy(self, tracks=new_tracks) def map_supervisions( self, transform_fn: Callable[[SupervisionSegment], SupervisionSegment] ) -> Cut: """ Modify the SupervisionSegments by `transform_fn` of this MixedCut. :param transform_fn: a function that modifies a supervision as an argument. :return: a modified MixedCut. """ new_mixed_cut = fastcopy(self) for track in new_mixed_cut.tracks: if isinstance(track.cut, PaddingCut): continue track.cut.supervisions = [ segment.map(transform_fn) for segment in track.cut.supervisions ] return new_mixed_cut def merge_supervisions( self, custom_merge_fn: Optional[Callable[[str, Iterable[Any]], Any]] = None ) -> "MixedCut": """ Return a copy of the cut that has all of its supervisions merged into a single segment. The new start is the start of the earliest superivion, and the new duration is a minimum spanning duration for all the supervisions. The text fields are concatenated with a whitespace, and all other string fields (including IDs) are prefixed with "cat#" and concatenated with a hash symbol "#". This is also applied to ``custom`` fields. Fields with a ``None`` value are omitted. .. note:: If you're using individual tracks of a mixed cut, note that this transform drops all the supervisions in individual tracks and assigns the merged supervision in the first :class:`.MonoCut` found in ``self.tracks``. :param custom_merge_fn: a function that will be called to merge custom fields values. We expect ``custom_merge_fn`` to handle all possible custom keys. When not provided, we will treat all custom values as strings. It will be called roughly like: ``custom_merge_fn(custom_key, [s.custom[custom_key] for s in sups])`` """ from .set import merge_supervisions return merge_supervisions(self, custom_merge_fn=custom_merge_fn) def filter_supervisions( self, predicate: Callable[[SupervisionSegment], bool] ) -> Cut: """ Modify cut to store only supervisions accepted by `predicate` Example: >>> cut = cut.filter_supervisions(lambda s: s.id in supervision_ids) >>> cut = cut.filter_supervisions(lambda s: s.duration < 5.0) >>> cut = cut.filter_supervisions(lambda s: s.text is not None) :param predicate: A callable that accepts `SupervisionSegment` and returns bool :return: a modified MonoCut """ new_mixed_cut = fastcopy( self, tracks=[ fastcopy(track, cut=track.cut.filter_supervisions(predicate)) for track in self.tracks ], ) return new_mixed_cut @staticmethod def from_dict(data: dict) -> "MixedCut": if "type" in data: data.pop("type") return MixedCut( id=data["id"], tracks=[MixTrack.from_dict(track) for track in data["tracks"]], ) def with_features_path_prefix(self, path: Pathlike) -> "MixedCut": if not self.has_features: return self return MixedCut( id=self.id, tracks=[ fastcopy(t, cut=t.cut.with_features_path_prefix(path)) for t in self.tracks ], ) def with_recording_path_prefix(self, path: Pathlike) -> "MixedCut": if not self.has_recording: return self return MixedCut( id=self.id, tracks=[ fastcopy(t, cut=t.cut.with_recording_path_prefix(path)) for t in self.tracks ], ) @property def _first_non_padding_cut(self) -> MonoCut: return self._first_non_padding_track.cut @property def _first_non_padding_track(self) -> MixTrack: return [t for t in self.tracks if not isinstance(t.cut, PaddingCut)][0]