Source code for lhotse.cut.mixed

import logging
import warnings
from dataclasses import dataclass
from functools import partial, reduce
from io import BytesIO
from operator import add
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

import numpy as np
import torch
from intervaltree import IntervalTree

from lhotse.audio import Recording, VideoInfo, get_audio_duration_mismatch_tolerance
from lhotse.audio.backend import save_audio
from lhotse.audio.mixer import AudioMixer, VideoMixer, audio_energy
from lhotse.augmentation import (
    AudioTransform,
    AugmentFn,
    LoudnessNormalization,
    ReverbWithImpulseResponse,
)
from lhotse.cut.base import Cut
from lhotse.cut.data import DataCut
from lhotse.cut.padding import PaddingCut
from lhotse.features import (
    FeatureExtractor,
    FeatureMixer,
    create_default_feature_extractor,
)
from lhotse.features.io import FeaturesWriter
from lhotse.supervision import SupervisionSegment
from lhotse.utils import (
    DEFAULT_PADDING_VALUE,
    LOG_EPSILON,
    Decibels,
    Pathlike,
    Seconds,
    add_durations,
    compute_num_frames,
    compute_num_samples,
    fastcopy,
    hash_str_to_int,
    merge_items_with_delimiter,
    overlaps,
    perturb_num_samples,
    rich_exception_info,
    uuid4,
)


[docs] @dataclass class MixTrack: """ Represents a single track in a mix of Cuts. Points to a specific DataCut or PaddingCut and holds information on how to mix it with other Cuts, relative to the first track in a mix. """ cut: Union[DataCut, PaddingCut] type: str = None offset: Seconds = 0.0 snr: Optional[Decibels] = None def __post_init__(self): self.type = type(self.cut).__name__
[docs] @staticmethod def from_dict(data: dict): from .set import deserialize_cut # Take out `type` from data dict and put it into the `cut` dict. cut_dict = data.pop("cut") cut_dict["type"] = data.pop("type") return MixTrack(deserialize_cut(cut_dict), **data)
[docs] @dataclass class MixedCut(Cut): """ :class:`~lhotse.cut.MixedCut` is a :class:`~lhotse.cut.Cut` that actually consists of multiple other cuts. Its primary purpose is to allow time-domain and feature-domain augmentation via mixing the training cuts with noise, music, and babble cuts. The actual mixing operations are performed on-the-fly. Internally, :class:`~lhotse.cut.MixedCut` holds other cuts in multiple tracks (:class:`~lhotse.cut.MixTrack`), each with its own offset and SNR that is relative to the first track. Please refer to the documentation of :class:`~lhotse.cut.Cut` to learn more about using cuts. In addition to methods available in :class:`~lhotse.cut.Cut`, :class:`~lhotse.cut.MixedCut` provides the methods to read all of its tracks audio and features as separate channels: >>> cut = MixedCut(...) >>> mono_features = cut.load_features() >>> assert len(mono_features.shape) == 2 >>> multi_features = cut.load_features(mixed=False) >>> # Now, the first dimension is the channel. >>> assert len(multi_features.shape) == 3 .. note:: MixedCut is different from MultiCut, which is intended to represent multi-channel recordings that share the same supervisions. .. note:: Each track in a MixedCut can be either a MonoCut, MultiCut, or PaddingCut. .. note:: The ``transforms`` field is a list of dictionaries that describe the transformations that should be applied to the track after mixing. See also: - :class:`lhotse.cut.Cut` - :class:`lhotse.cut.MonoCut` - :class:`lhotse.cut.MultiCut` - :class:`lhotse.cut.CutSet` """ id: str tracks: List[MixTrack] transforms: Optional[List[Dict]] = None @property def supervisions(self) -> List[SupervisionSegment]: """ Lists the supervisions of the underlying source cuts. Each segment start time will be adjusted by the track offset. """ return [ segment.with_offset(track.offset) for track in self.tracks for segment in track.cut.supervisions ] @property def start(self) -> Seconds: return 0 @property def duration(self) -> Seconds: track_durations = (track.offset + track.cut.duration for track in self.tracks) return round(max(track_durations), ndigits=8) @property def channel(self) -> Union[int, List[int]]: num_channels = self.num_channels return list(range(num_channels)) if num_channels > 1 else 0 @property def has_features(self) -> bool: return self._first_non_padding_cut.has_features @property def has_recording(self) -> bool: return self._first_non_padding_cut.has_recording @property def has_video(self) -> bool: return self._first_non_padding_cut.has_video
[docs] def has(self, field: str) -> bool: return self._first_non_padding_cut.has(field)
@property def num_frames(self) -> Optional[int]: if self.has_features: return compute_num_frames( duration=self.duration, frame_shift=self.frame_shift, sampling_rate=self.sampling_rate, ) return None @property def frame_shift(self) -> Optional[Seconds]: return self.tracks[0].cut.frame_shift @property def sampling_rate(self) -> Optional[int]: return self.tracks[0].cut.sampling_rate @property def num_samples(self) -> Optional[int]: return compute_num_samples(self.duration, self.sampling_rate) @property def num_features(self) -> Optional[int]: return self.tracks[0].cut.num_features @property def num_channels(self) -> Optional[int]: # PaddingCut and MonoCut have 1 channel each, MultiCut has N. We don't support MixedCut with MixedCut tracks. return max(track.cut.num_channels for track in self.tracks) @property def features_type(self) -> Optional[str]: return self._first_non_padding_cut.features.type if self.has_features else None def __getattr__(self, name: str) -> Any: """ This magic function is called when the user tries to access an attribute of :class:`.MixedCut` that doesn't exist. It is used for accessing the custom attributes of cuts. We support exactly one scenario for mixed cuts: If :attr:`tracks` contains exactly one :class:`.MonoCut` object (and an arbitrary number of :class:`.PaddingCut` objects), we will look up the custom attributes of that cut. If one of the custom attributes is of type :class:`~lhotse.array.Array` or :class:`~lhotse.array.TemporalArray` we'll also support loading those arrays (see example below). Additionally, we will incorporate extra padding as dictated by padding cuts. Example: >>> cut = MonoCut('cut1', start=0, duration=4, channel=0) >>> cut.alignment = TemporalArray(...) >>> mixed_cut = cut.pad(10, pad_value_dict={'alignment': -1}) >>> ali = mixed_cut.load_alignment() """ # Python will sometimes try to call undefined magic functions, # just fail for them (e.g. __setstate__ when pickling). if name.startswith("__"): raise AttributeError() # Loading a custom array attribute + performing padding. if name.startswith("load_"): attr_name = name[5:] return partial(self.load_custom, attr_name) # Returning the contents of "mono_cut.custom[name]", # or raising AttributeError. try: ( non_padding_idx, mono_cut, ) = self._assert_one_data_cut_with_attr_and_return_it_with_track_index(name) return getattr(mono_cut, name) except AssertionError: raise AttributeError( f"No such attribute: '{name}' (note: custom attributes are not supported " f"when a MixedCut consists of more than one MonoCut with that attribute)." )
[docs] def load_custom(self, name: str) -> np.ndarray: """ Load custom data as numpy array. The custom data is expected to have been stored in cuts ``custom`` field as an :class:`~lhotse.array.Array` or :class:`~lhotse.array.TemporalArray` manifest. .. note:: It works with Array manifests stored via attribute assignments, e.g.: ``cut.my_custom_data = Array(...)``. .. warning:: For :class:`.MixedCut`, this will only work if the mixed cut consists of a single :class:`.MonoCut` and an arbitrary number of :class:`.PaddingCuts`. This is because it is generally undefined how to mix arbitrary arrays. :param name: name of the custom attribute. :return: a numpy array with the data (after padding). """ from lhotse.array import Array, pad_array ( non_padding_idx, mono_cut, ) = self._assert_one_data_cut_with_attr_and_return_it_with_track_index(name) # Use getattr to propagate AttributeError if "name" is not defined. manifest = getattr(mono_cut, name) # Check if the corresponding manifest for 'load_something' is of type # Array; if yes, just return the loaded data. # This is likely an embedding without a temporal dimension. if isinstance(manifest, Array): return mono_cut.load_custom(name) # We are loading either an array with a temporal dimension, or a recording: # We need to pad it. left_padding = self.tracks[non_padding_idx].offset padded_duration = self.duration # Then, check if it's a Recording. In that case we convert it to a cut, # leverage existing padding methods, and load padded audio data. if isinstance(manifest, Recording): return ( manifest.to_cut() .pad(duration=manifest.duration + left_padding, direction="left") .pad(duration=padded_duration, direction="right") .load_audio() ) # Load the array and retrieve the manifest from the only non-padding cut. # We'll also need to fetch the dict defining what padding value to use (if present). array = mono_cut.load_custom(name) try: pad_value_dict = [ t.cut for t in self.tracks if isinstance(t.cut, PaddingCut) ][0].custom pad_value = pad_value_dict[name] except: pad_value = DEFAULT_PADDING_VALUE return pad_array( array, temporal_dim=manifest.temporal_dim, frame_shift=manifest.frame_shift, offset=left_padding, padded_duration=padded_duration, pad_value=pad_value, )
def _assert_one_data_cut_with_attr_and_return_it_with_track_index( self, attr_name: str, ) -> Tuple[int, DataCut]: # TODO(pzelasko): consider relaxing this condition to # supporting mixed cuts that are not overlapping non_padding_cuts = [ (idx, t.cut) for idx, t in enumerate(self.tracks) if isinstance(t.cut, DataCut) ] non_padding_cuts_with_custom_attr = [ (idx, cut) for idx, cut in non_padding_cuts if cut.custom is not None and attr_name in cut.custom ] assert len(non_padding_cuts_with_custom_attr) == 1, ( f"This MixedCut has {len(non_padding_cuts_with_custom_attr)} non-padding cuts " f"with a custom attribute '{attr_name}'. We currently don't support mixing custom attributes. " f"Consider dropping the attribute on all but one of DataCuts. Problematic cut:\n{self}" ) non_padding_idx, mono_cut = non_padding_cuts_with_custom_attr[0] return non_padding_idx, mono_cut
[docs] def move_to_memory( self, audio_format: str = "flac", load_audio: bool = True, load_features: bool = True, load_custom: bool = True, ) -> "MixedCut": """ Load data (audio, features, or custom arrays) into memory and attach them to a copy of the manifest. This is useful when you want to store cuts together with the actual data in some binary format that enables sequential data reads. Audio is encoded with ``audio_format`` (compatible with ``torchaudio.save``), floating point features are encoded with lilcom, and other arrays are pickled. """ return fastcopy( self, tracks=[ fastcopy( t, cut=t.cut.move_to_memory( audio_format=audio_format, load_audio=load_audio, load_features=load_features, load_custom=load_custom, ), ) for t in self.tracks ], )
[docs] def to_mono( self, encoding: str = "flac", **kwargs, ) -> "Cut": """ Convert this MixedCut to a MonoCut by mixing all tracks and channels into a single one. The result audio array is stored in memory, and can be saved to disk by calling ``cut.save_audio(path, ...)`` on the result. .. hint:: the resulting MonoCut will have ``custom`` field populated with the ``custom`` value from the first track of the MixedCut. :param encoding: any of "wav", "flac", or "opus". :return: a new MonoCut instance. """ samples = self.load_audio(mono_downmix=True) stream = BytesIO() save_audio( stream, samples, self.sampling_rate, format=encoding, ) recording = Recording.from_bytes(stream.getvalue(), recording_id=self.id) return fastcopy( recording.to_cut(), supervisions=[fastcopy(s, channel=0) for s in self.supervisions], custom=self.tracks[0].cut.custom, )
[docs] def truncate( self, *, offset: Seconds = 0.0, duration: Optional[Seconds] = None, keep_excessive_supervisions: bool = True, preserve_id: bool = False, _supervisions_index: Optional[Dict[str, IntervalTree]] = None, ) -> Cut: """ Returns a new MixedCut that is a sub-region of the current MixedCut. This method truncates the underlying Cuts and modifies their offsets in the mix, as needed. Tracks that do not fit in the truncated cut are removed. Note that no operation is done on the actual features - it's only during the call to load_features() when the actual changes happen (a subset of features is loaded). :param offset: float (seconds), controls the start of the new cut relative to the current MixedCut's start. :param duration: optional float (seconds), controls the duration of the resulting MixedCut. By default, the duration is (end of the cut before truncation) - (offset). :param keep_excessive_supervisions: bool. Since trimming may happen inside a SupervisionSegment, the caller has an option to either keep or discard such supervisions. :param preserve_id: bool. Should the truncated cut keep the same ID or get a new, random one. :return: a new MixedCut instance. """ assert ( offset >= 0 ), f"Offset for truncate must be non-negative (provided {offset})." new_tracks = [] old_duration = self.duration new_mix_end = ( add_durations(old_duration, -offset, sampling_rate=self.sampling_rate) if duration is None else add_durations(offset, duration, sampling_rate=self.sampling_rate) ) for track in sorted(self.tracks, key=lambda t: t.offset): # First, determine how much of the beginning of the current track we're going to truncate: # when the track offset is larger than the truncation offset, we are not truncating the cut; # just decreasing the track offset. # 'cut_offset' determines how much we're going to truncate the Cut for the current track. cut_offset = max( add_durations(offset, -track.offset, sampling_rate=self.sampling_rate), 0, ) # 'track_offset' determines the new track's offset after truncation. track_offset = max( add_durations(track.offset, -offset, sampling_rate=self.sampling_rate), 0, ) # 'track_end' is expressed relative to the beginning of the mix # (not to be confused with the 'start' of the underlying MonoCut) track_end = add_durations( track.offset, track.cut.duration, sampling_rate=self.sampling_rate ) if track_end < offset: # Omit a Cut that ends before the truncation offset. continue cut_duration_decrease = 0 if track_end > new_mix_end: if duration is not None: cut_duration_decrease = add_durations( track_end, -new_mix_end, sampling_rate=self.sampling_rate ) else: cut_duration_decrease = add_durations( track_end, -old_duration, sampling_rate=self.sampling_rate ) # Compute the new Cut's duration after trimming the start and the end. new_duration = add_durations( track.cut.duration, -cut_offset, -cut_duration_decrease, sampling_rate=self.sampling_rate, ) if new_duration <= 0: # Omit a Cut that is completely outside the time span of the new truncated MixedCut. continue new_tracks.append( MixTrack( cut=track.cut.truncate( offset=cut_offset, duration=new_duration, keep_excessive_supervisions=keep_excessive_supervisions, preserve_id=preserve_id, _supervisions_index=_supervisions_index, ), offset=track_offset, snr=track.snr, ) ) # Edge case: no tracks with data left after truncation. This can happen if we truncated an offset region. # In this case, return a PaddingCut of the requested duration if len([t for t in new_tracks if not isinstance(t.cut, PaddingCut)]) == 0: return PaddingCut( id=self.id if preserve_id else str(uuid4()), duration=duration, sampling_rate=self.sampling_rate, feat_value=0.0, num_samples=compute_num_samples(duration, self.sampling_rate), ) if len(new_tracks) == 1: # The truncation resulted in just a single cut - simply return it. return new_tracks[0].cut new_cut = MixedCut( id=self.id if preserve_id else str(uuid4()), tracks=new_tracks ) # Final edge-case check. Scenario: # - some of the original MixedCut tracks had specified an SNR # - we truncated away the track that served as an SNR reference # - we are left only with PaddingCuts and MonoCuts that have specified SNR # Solution: # - find first non padding cut and reset its SNR to None (make it the new reference) if all( t.snr is not None or isinstance(t.cut, PaddingCut) for t in new_cut.tracks ): first_non_padding_track_idx = [ idx for idx, t in enumerate(new_cut.tracks) if not isinstance(t.cut, PaddingCut) ][0] new_cut.tracks[first_non_padding_track_idx] = fastcopy( new_cut.tracks[first_non_padding_track_idx], snr=None ) return new_cut
[docs] def extend_by( self, *, duration: Seconds, direction: str = "both", preserve_id: bool = False, pad_silence: bool = True, ) -> "MixedCut": """ This raises a ValueError since extending a MixedCut is not defined. :param duration: float (seconds), duration (in seconds) to extend the MixedCut. :param direction: string, 'left', 'right' or 'both'. Determines whether to extend on the left, right, or both sides. If 'both', extend on both sides by the duration specified in `duration`. :param preserve_id: bool. Should the extended cut keep the same ID or get a new, random one. :param pad_silence: bool. See usage in `lhotse.cut.MonoCut.extend_by`. :return: a new MixedCut instance. """ raise ValueError("The extend_by() method is not defined for a MixedCut.")
[docs] def pad( self, duration: Seconds = None, num_frames: int = None, num_samples: int = None, pad_feat_value: float = LOG_EPSILON, direction: str = "right", preserve_id: bool = False, pad_value_dict: Optional[Dict[str, Union[int, float]]] = None, ) -> Cut: """ Return a new MixedCut, padded with zeros in the recording, and ``pad_feat_value`` in each feature bin. The user can choose to pad either to a specific `duration`; a specific number of frames `max_frames`; or a specific number of samples `num_samples`. The three arguments are mutually exclusive. :param duration: The cut's minimal duration after padding. :param num_frames: The cut's total number of frames after padding. :param num_samples: The cut's total number of samples after padding. :param pad_feat_value: A float value that's used for padding the features. By default we assume a log-energy floor of approx. -23 (1e-10 after exp). :param direction: string, 'left', 'right' or 'both'. Determines whether the padding is added before or after the cut. :param preserve_id: When ``True``, preserves the cut ID from before padding. Otherwise, generates a new random ID (default). :param pad_value_dict: Optional dict that specifies what value should be used for padding arrays in custom attributes. :return: a padded MixedCut if duration is greater than this cut's duration, otherwise ``self``. """ from .set import pad return pad( self, duration=duration, num_frames=num_frames, num_samples=num_samples, pad_feat_value=pad_feat_value, direction=direction, preserve_id=preserve_id, pad_value_dict=pad_value_dict, )
[docs] def resample(self, sampling_rate: int, affix_id: bool = False) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily resample the audio while reading it. This operation will drop the feature manifest, if attached. It does not affect the supervision. :param sampling_rate: The new sampling rate. :param affix_id: Should we modify the ID (useful if both versions of the same cut are going to be present in a single manifest). :return: a modified copy of the current ``MixedCut``. """ assert self.has_recording, "Cannot resample a MixedCut without Recording." return MixedCut( id=f"{self.id}_rs{sampling_rate}" if affix_id else self.id, tracks=[ fastcopy(t, cut=t.cut.resample(sampling_rate)) for t in self.tracks ], )
[docs] def perturb_speed(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the speed while loading audio. The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts (and their Recordings and SupervisionSegments) are updated to reflect the shrinking/extending effect of speed. We are also updating the offsets of all underlying tracks. :param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_sp{factor}". :return: a modified copy of the current ``MixedCut``. """ # TODO(pzelasko): test most extensively for edge cases # Pre-conditions assert ( self.has_recording ), "Cannot perturb speed on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to perturb speed on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "speed perturbation." ) return MixedCut( id=f"{self.id}_sp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_speed(factor=factor, affix_id=affix_id), offset=round( perturb_num_samples( num_samples=compute_num_samples( track.offset, self.sampling_rate ), factor=factor, ) / self.sampling_rate, ndigits=8, ), ) for track in self.tracks ], )
[docs] def perturb_tempo(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the tempo while loading audio. Compared to speed perturbation, tempo preserves pitch. The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts (and their Recordings and SupervisionSegments) are updated to reflect the shrinking/extending effect of tempo. We are also updating the offsets of all underlying tracks. :param factor: The tempo will be adjusted this many times (e.g. factor=1.1 means 1.1x faster). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_tp{factor}". :return: a modified copy of the current ``MixedCut``. """ # TODO(pzelasko): test most extensively for edge cases # Pre-conditions assert ( self.has_recording ), "Cannot perturb tempo on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to perturb tempo on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "tempo perturbation." ) return MixedCut( id=f"{self.id}_tp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_tempo(factor=factor, affix_id=affix_id), offset=round( perturb_num_samples( num_samples=compute_num_samples( track.offset, self.sampling_rate ), factor=factor, ) / self.sampling_rate, ndigits=8, ), ) for track in self.tracks ], )
[docs] def perturb_volume(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the volume while loading audio. Recordings of the underlying Cuts are updated to reflect volume change. :param factor: The volume will be adjusted this many times (e.g. factor=1.1 means 1.1x louder). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_vp{factor}". :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot perturb volume on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to perturb volume on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "volume perturbation." ) return MixedCut( id=f"{self.id}_vp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_volume(factor=factor, affix_id=affix_id), ) for track in self.tracks ], )
[docs] def normalize_loudness( self, target: float, mix_first: bool = True, affix_id: bool = False ) -> "DataCut": """ Return a new ``MixedCut`` that will lazily apply loudness normalization. :param target: The target loudness in dBFS. :param mix_first: If true, we will mix the underlying cuts before applying loudness normalization. If false, we cannot guarantee that the resulting cut will have the target loudness. :param affix_id: When true, we will modify the ``DataCut.id`` field by affixing it with "_ln{target}". :return: a modified copy of the current ``DataCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply loudness normalization on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to normalize loudness on a MixedCut that references pre-computed features. " "The feature manifest will be detached, as we do not support feature-domain " "loudness normalization." ) self.features = None if mix_first: transforms = self.transforms.copy() if self.transforms is not None else [] transforms.append(LoudnessNormalization(target=target).to_dict()) return fastcopy( self, id=f"{self.id}_ln{target}" if affix_id else self.id, transforms=transforms, ) else: return MixedCut( id=f"{self.id}_ln{target}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.normalize_loudness( target=target, affix_id=affix_id ), ) for track in self.tracks ], )
[docs] def reverb_rir( self, rir_recording: Optional["Recording"] = None, normalize_output: bool = True, early_only: bool = False, affix_id: bool = True, rir_channels: List[int] = [0], room_rng_seed: Optional[int] = None, source_rng_seed: Optional[int] = None, mix_first: bool = True, ) -> "MixedCut": """ Return a new ``MixedCut`` that will convolve the audio with the provided impulse response. If no ``rir_recording`` is provided, we will generate an impulse response using a fast random generator (https://arxiv.org/abs/2208.04101). :param rir_recording: The impulse response to use for convolving. :param normalize_output: When true, output will be normalized to have energy as input. :param early_only: When true, only the early reflections (first 50 ms) will be used. :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_rvb". :param rir_channels: The channels of the impulse response to use. By default, first channel is used. If only one channel is specified, all tracks will be convolved with this channel. If a list is provided, it must contain as many channels as there are tracks such that each track will be convolved with one of the specified channels. :param room_rng_seed: Seed for the room configuration. :param source_rng_seed: Seed for the source position. :param mix_first: When true, the mixing will be done first before convolving with the RIR. This effectively means that all tracks will be convolved with the same RIR. If you are simulating multi-speaker mixtures, you should set this to False. :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply reverberation on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to reverberate a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "reverberation." ) assert rir_recording is None or all( c < rir_recording.num_channels for c in rir_channels ), "Invalid channel index in `rir_channels`." assert len(rir_channels) == 1 or len(rir_channels) == len( self.tracks ), "Invalid number of channels in `rir_channels`, must be either 1 or equal to the number of tracks." # There are 2 ways to apply RIRs: # 1. Mix the tracks first, then apply RIRs. This is same as applying the same RIR # to all tracks. It does not make sense if all tracks belong to different speakers, # but it is useful for cases when we have a mixture of MonoCut and PaddingCut, # and we want to apply the same RIR to all of them. # 2. Apply RIRs to each track separately. This is useful when we want to simulate # different speakers in the same room. # First simulate the room config (will only be used if RIR is not provided) uuid4_str = str(uuid4()) # The room RNG seed is based on the cut ID. This ensures that all tracks in the # mixed cut will have the same room configuration. if room_rng_seed is None: room_rng_seed = hash_str_to_int(uuid4_str + self.id) # The source RNG seed is based on the track ID. This ensures that each track # will have a different source position. source_rng_seeds = [source_rng_seed] * len(self.tracks) if source_rng_seed is None: source_rng_seeds = [ hash_str_to_int(uuid4_str + track.cut.id) for track in self.tracks ] source_rng_seed = source_rng_seeds[0] # Apply same RIR to all tracks after mixing (default) if mix_first: if rir_recording is None: from lhotse.augmentation.utils import FastRandomRIRGenerator rir_generator = FastRandomRIRGenerator( sr=self.sampling_rate, room_seed=room_rng_seed, source_seed=source_rng_seed, ) else: rir_generator = None transforms = self.transforms.copy() if self.transforms is not None else [] transforms.append( ReverbWithImpulseResponse( rir=rir_recording, normalize_output=normalize_output, early_only=early_only, rir_channels=rir_channels if rir_channels is not None else [0], rir_generator=rir_generator, ).to_dict() ) return fastcopy( self, id=f"{self.id}_rvb" if affix_id else self.id, transforms=transforms, ) # Apply RIRs to each track separately. Note that we do not pass a `mix_first` # argument below since it is True by default. if len(rir_channels) == 1: rir_channels = rir_channels * len(self.tracks) return MixedCut( id=f"{self.id}_rvb" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.reverb_rir( rir_recording=rir_recording, normalize_output=normalize_output, early_only=early_only, affix_id=affix_id, rir_channels=[channel], room_rng_seed=room_rng_seed, source_rng_seed=seed, ), ) for track, channel, seed in zip( self.tracks, rir_channels, source_rng_seeds ) ], )
[docs] @rich_exception_info def load_features(self, mixed: bool = True) -> Optional[np.ndarray]: """ Loads the features of the source cuts and mixes them on-the-fly. :param mixed: when True (default), the features are mixed together (as defined in the mixing function for the extractor). This could result in either a 2D or 3D array. For example, if all underlying tracks are single-channel, the output will be a 2D array of shape (num_frames, num_features). If any of the tracks are multi-channel, the output may be a 3D array of shape (num_frames, num_features, num_channels). :return: A numpy ndarray with features and with shape ``(num_frames, num_features)``, or ``(num_tracks, num_frames, num_features)`` """ if not self.has_features: return None first_cut = self.tracks[0].cut # First, check for a simple scenario: just a single cut with padding. # When that is the case, we don't have to instantiate a feature extractor, # because we are not performing any actual mixing. # That makes life simpler for the users who have a custom feature extractor, # but don't actually care about feature-domain mixing; just want to pad. if mixed and all(isinstance(t.cut, PaddingCut) for t in self.tracks[1:]): padding_val = self.tracks[1].cut.feat_value first_cut_feats = first_cut.load_features() if first_cut_feats.ndim == 2: # 2D features feats = np.ones((self.num_frames, self.num_features)) * padding_val else: # 3D features feats = ( np.ones( (self.num_frames, self.num_features, first_cut_feats.shape[-1]) ) * padding_val ) feats[: first_cut.num_frames, ...] = first_cut_feats return feats # When there is more than one "regular" cut, we will perform an actual mix. # First, we have to make sure that the reference energy levels are appropriate. # They might not be if the first track is a padding track. reference_feats = None reference_energy = None reference_pos, reference_cut = [ (idx, t.cut) for idx, t in enumerate(self.tracks) if not isinstance(t.cut, PaddingCut) and t.snr is None ][0] feature_extractor = create_default_feature_extractor( reference_cut.features.type ) if first_cut.id != reference_cut.id: reference_feats = reference_cut.load_features() reference_energy = feature_extractor.compute_energy(reference_feats) # The mix itself. mixer = FeatureMixer( feature_extractor=create_default_feature_extractor( self._first_non_padding_cut.features.type ), base_feats=first_cut.load_features(), frame_shift=first_cut.frame_shift, reference_energy=reference_energy, ) for pos, track in enumerate(self.tracks[1:], start=1): if pos == reference_pos and reference_feats is not None: feats = reference_feats # manual caching to avoid duplicated I/O else: feats = track.cut.load_features() mixer.add_to_mix( feats=feats, snr=track.snr, offset=track.offset, sampling_rate=track.cut.sampling_rate, ) if mixed: # Checking for some edge cases below. feats = mixer.mixed_feats # Note: The slicing below is a work-around for an edge-case # when two cuts have durations that ended with 0.005 (e.g. 10.125 and 5.715) # - then, the feature extractor "squeezed in" a last extra frame and the simple # relationship between num_frames and duration we strived for is not true; # i.e. the duration is 10.125 + 5.715 = 15.84, but the number of frames is # 1013 + 572 = 1585. If the frame_shift is 0.01, we have gained an extra 0.01s... if feats.shape[0] - self.num_frames == 1: feats = feats[: self.num_frames, :] # TODO(pzelasko): This can sometimes happen in a MixedCut with >= 5 different Cuts, # with a regular MonoCut at the end, when the mix offsets are floats with a lot of decimals. # For now we're duplicating the last frame to match the declared "num_frames" of this cut. if feats.shape[0] - self.num_frames == -1: feats = np.concatenate((feats, feats[-1:, :]), axis=0) assert feats.shape[0] == self.num_frames, ( "Inconsistent number of frames in a MixedCut: please report " "this issue at https://github.com/lhotse-speech/lhotse/issues " "showing the output of print(cut) or str(cut) on which" "load_features() was called." ) return feats else: return mixer.unmixed_feats
[docs] @rich_exception_info def load_audio( self, mixed: bool = True, mono_downmix: bool = False ) -> Optional[np.ndarray]: """ Loads the audios of the source cuts and mix them on-the-fly. :param mixed: When True (default), returns a mix of the underlying tracks. This will return a numpy array with shape ``(num_channels, num_samples)``, where ``num_channels`` is determined by the ``num_channels`` property of the MixedCut. Otherwise returns a numpy array with the number of channels equal to the total number of channels across all tracks in the MixedCut. For example, if it contains a MultiCut with 2 channels and a MonoCut with 1 channel, the returned array will have shape ``(3, num_samples)``. :param mono_downmix: If the MixedCut contains > 1 channels (for e.g. when one of its tracks is a MultiCut), this parameter controls whether the returned array will be down-mixed to a single channel. This down-mixing is done by summing the channels together. :return: A numpy ndarray with audio samples and with shape ``(num_channels, num_samples)`` """ if not self.has_recording: return None first_cut = self.tracks[0].cut # First, we have to make sure that the reference energy levels are appropriate. # They might not be if the first track is a padding track. reference_audio = None reference_energy = None reference_pos, reference_cut = [ (idx, t.cut) for idx, t in enumerate(self.tracks) if not isinstance(t.cut, PaddingCut) and t.snr is None ][0] if first_cut.id != reference_cut.id: reference_audio = reference_cut.load_audio() reference_energy = audio_energy(reference_audio) mixer = AudioMixer( self.tracks[0].cut.load_audio(), sampling_rate=self.tracks[0].cut.sampling_rate, reference_energy=reference_energy, base_offset=self.tracks[0].offset, ) for pos, track in enumerate(self.tracks[1:], start=1): if pos == reference_pos and reference_audio is not None: audio = reference_audio # manual caching to avoid duplicated I/O else: audio = track.cut.load_audio() mixer.add_to_mix( audio=audio, snr=track.snr, offset=track.offset, ) # Flattening a MixedCut without MultiCut tracks has no effect mono_downmix = mono_downmix and any( track.type == "MultiCut" for track in self.tracks ) # Flattening a MixedCut without mixed=True has no effect mono_downmix = mono_downmix and mixed if mixed: # Off-by-one errors can happen during mixing due to imperfect float arithmetic and rounding; # we will fix them on-the-fly so that the manifest does not lie about the num_samples. audio = mixer.mixed_mono_audio if mono_downmix else mixer.mixed_audio tol_samples = compute_num_samples( get_audio_duration_mismatch_tolerance(), sampling_rate=self.sampling_rate, ) num_samples_diff = audio.shape[1] - self.num_samples if 0 < num_samples_diff < tol_samples: audio = audio[:, : self.num_samples] if -tol_samples < num_samples_diff < 0: audio = np.pad(audio, [(0, 0), (0, -num_samples_diff)], mode="reflect") assert audio.shape[1] == self.num_samples, ( f"Inconsistent number of samples in a MixedCut. Expected {self.num_samples} " f"but the output of mix has {audio.shape[1]}. Please report " f"this issue at https://github.com/lhotse-speech/lhotse/issues " f"showing the cut below. MixedCut:\n{self}" ) # We'll apply the transforms now (if any). transforms = [ AudioTransform.from_dict(params) for params in self.transforms or [] ] for tfn in transforms: audio = tfn(audio, self.sampling_rate) else: audio = mixer.unmixed_audio return audio
@property def video(self) -> Optional[VideoInfo]: if self.has_video: v = self._first_non_padding_cut.video return v.copy_with(num_frames=compute_num_samples(self.duration, v.fps)) return None
[docs] @rich_exception_info def load_video( self, with_audio: bool = True, mixed: bool = True, mono_downmix: bool = False, ) -> Optional[Tuple[torch.Tensor, Optional[torch.Tensor]]]: if not self.has_video: return None mixer = VideoMixer( self.tracks[0].cut.load_video(with_audio=False)[0], fps=self.video.fps, base_offset=self.tracks[0].offset, ) for pos, track in enumerate(self.tracks[1:], start=1): mixer.add_to_mix( video=track.cut.load_video(with_audio=False)[0], offset=track.offset, ) video = mixer.mixed_video if with_audio: # For now load audio separately to re-use the complex logic of load_audio # This means the same video file is potentially opened twice, but given the # cost of video decoding, the extra file open cost could be negligible. audio = self.load_audio(mixed=mixed, mono_downmix=mono_downmix) return video, torch.from_numpy(audio)
[docs] def plot_tracks_features(self): """ Display the feature matrix as an image. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt fig, axes = plt.subplots(len(self.tracks)) features = self.load_features(mixed=False) fmin, fmax = features.min(), features.max() for idx, ax in enumerate(axes): ax.imshow(np.flip(features[idx].transpose(1, 0), 0), vmin=fmin, vmax=fmax) return axes
[docs] def plot_tracks_audio(self): """ Display plots of the individual tracks' waveforms. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt audio = self.load_audio(mixed=False) fig, axes = plt.subplots(len(self.tracks), sharex=False, sharey=True) for idx, (track, ax) in enumerate(zip(self.tracks, axes)): samples = audio[idx].squeeze(0) ax.plot(np.linspace(0, self.duration, len(samples)), samples) for supervision in track.cut.supervisions: supervision = supervision.trim(track.cut.duration) ax.axvspan( track.offset + supervision.start, track.offset + supervision.end, color="green", alpha=0.1, ) return axes
[docs] def drop_features(self) -> "MixedCut": """Return a copy of the current :class:`MixedCut`, detached from ``features``.""" assert ( self.has_recording ), f"Cannot detach features from a MixedCut with no Recording (cut ID = {self.id})." return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_features()) for t in self.tracks] )
[docs] def drop_recording(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``recording``.""" assert ( self.has_features ), f"Cannot detach recording from a MixedCut with no Features (cut ID = {self.id})." return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_recording()) for t in self.tracks] )
[docs] def drop_supervisions(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``.""" return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_supervisions()) for t in self.tracks], )
[docs] def drop_alignments(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``.""" return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_alignments()) for t in self.tracks], )
[docs] def compute_and_store_features( self, extractor: FeatureExtractor, storage: FeaturesWriter, augment_fn: Optional[AugmentFn] = None, mix_eagerly: bool = True, ) -> DataCut: """ Compute the features from this cut, store them on disk, and create a new `MonoCut` object with the feature manifest attached. This cut has to be able to load audio. :param extractor: a ``FeatureExtractor`` instance used to compute the features. :param storage: a ``FeaturesWriter`` instance used to store the features. :param augment_fn: an optional callable used for audio augmentation. :param mix_eagerly: when False, extract and store the features for each track separately, and mix them dynamically when loading the features. When True, mix the audio first and store the mixed features, returning a new ``MonoCut`` instance with the same ID. The returned ``MonoCut`` will not have a ``Recording`` attached. :return: a new ``MonoCut`` instance if ``mix_eagerly`` is True, or returns ``self`` with each of the tracks containing the ``Features`` manifests. """ if mix_eagerly: from .mono import MonoCut features_info = extractor.extract_from_samples_and_store( samples=self.load_audio(), storage=storage, sampling_rate=self.sampling_rate, offset=0, channel=0, augment_fn=augment_fn, ) features_info.recording_id = self.id return MonoCut( id=self.id, start=0, duration=self.duration, channel=0, supervisions=[ fastcopy(s, recording_id=self.id) for s in self.supervisions ], features=features_info, recording=None, custom=self.custom if hasattr(self, "custom") else None, ) else: # mix lazily new_tracks = [ MixTrack( cut=track.cut.compute_and_store_features( extractor=extractor, storage=storage, augment_fn=augment_fn, ), offset=track.offset, snr=track.snr, ) for track in self.tracks ] return MixedCut(id=self.id, tracks=new_tracks)
[docs] def fill_supervision( self, add_empty: bool = True, shrink_ok: bool = False ) -> "MixedCut": """ Fills the whole duration of a cut with a supervision segment. If the cut has one supervision, its start is set to 0 and duration is set to ``cut.duration``. Note: this may either expand a supervision that was shorter than a cut, or shrink a supervision that exceeds the cut. If there are no supervisions, we will add an empty one when ``add_empty==True``, otherwise we won't change anything. If there are two or more supervisions, we will raise an exception. .. note:: For :class:`.MixedCut`, we expect that only one track contains a supervision. That supervision will be expanded to cover the full MixedCut's duration. :param add_empty: should we add an empty supervision with identical time bounds as the cut. :param shrink_ok: should we raise an error if a supervision would be shrank as a result of calling this method. """ n_sups = len(self.supervisions) if n_sups == 0: if not add_empty: return self first_non_padding_idx = [ idx for idx, t in enumerate(self.tracks) if isinstance(t.cut, DataCut) ][0] new_tracks = [ fastcopy( t, cut=fastcopy( t.cut, supervisions=[ SupervisionSegment( id=self.id, recording_id=t.cut.recording_id, start=-t.offset, duration=self.duration, channel=-1, ) ], ), ) if idx == first_non_padding_idx else t for idx, t in enumerate(self.tracks) ] else: assert ( n_sups == 1 ), f"Cannot expand more than one supervision (found {len(self.supervisions)}." new_tracks = [] for t in self.tracks: if len(t.cut.supervisions) == 0: new_tracks.append(t) else: sup = t.cut.supervisions[0] if not shrink_ok and ( sup.start < -t.offset or sup.end > self.duration ): raise ValueError( f"Cannot shrink supervision (start={sup.start}, end={sup.end}) to cut " f"(start=0, duration={t.cut.duration}) because the argument `shrink_ok` is `False`. " f"Note: this check prevents accidental data loss for speech recognition, " f"as supervision exceeding a cut indicates there might be some spoken content " f"beyond cuts start or end (an ASR model would be trained to predict more text than " f"spoken in the audio). If this is okay, set `shrink_ok` to `True`." ) new_tracks.append( fastcopy( t, cut=fastcopy( t.cut, supervisions=[ fastcopy( sup, start=-t.offset, duration=self.duration ) ], ), ) ) return fastcopy(self, tracks=new_tracks)
[docs] def map_supervisions( self, transform_fn: Callable[[SupervisionSegment], SupervisionSegment] ) -> Cut: """ Modify the SupervisionSegments by `transform_fn` of this MixedCut. :param transform_fn: a function that modifies a supervision as an argument. :return: a modified MixedCut. """ new_mixed_cut = fastcopy(self) for track in new_mixed_cut.tracks: if isinstance(track.cut, PaddingCut): continue track.cut.supervisions = [ segment.map(transform_fn) for segment in track.cut.supervisions ] return new_mixed_cut
[docs] def merge_supervisions( self, merge_policy: str = "delimiter", custom_merge_fn: Optional[Callable[[str, Iterable[Any]], Any]] = None, ) -> "MixedCut": """ Return a copy of the cut that has all of its supervisions merged into a single segment. The new start is the start of the earliest superivion, and the new duration is a minimum spanning duration for all the supervisions. The text fields are concatenated with a whitespace. .. note:: If you're using individual tracks of a mixed cut, note that this transform drops all the supervisions in individual tracks and assigns the merged supervision in the first :class:`.DataCut` found in ``self.tracks``. :param merge_policy: one of "keep_first" or "delimiter". If "keep_first", we keep only the first segment's field value, otherwise all string fields (including IDs) are prefixed with "cat#" and concatenated with a hash symbol "#". This is also applied to ``custom`` fields. Fields with a ``None`` value are omitted. :param custom_merge_fn: a function that will be called to merge custom fields values. We expect ``custom_merge_fn`` to handle all possible custom keys. When not provided, we will treat all custom values as strings. It will be called roughly like: ``custom_merge_fn(custom_key, [s.custom[custom_key] for s in sups])`` """ merge_func_ = partial( merge_items_with_delimiter, delimiter="#", return_first=(merge_policy == "keep_first"), ) # "m" stands for merged in variable names below if custom_merge_fn is not None: # Merge custom fields with the user-provided function. merge_custom = custom_merge_fn else: # Merge the string representations of custom fields. merge_custom = lambda k, vs: merge_func_(map(str, vs)) sups = sorted(self.supervisions, key=lambda s: s.start) if len(sups) <= 1: return self # the sampling rate is arbitrary, ensures there are no float precision errors mstart = sups[0].start mend = sups[-1].end mduration = add_durations(mend, -mstart, sampling_rate=self.sampling_rate) custom_keys = set( k for s in sups if s.custom is not None for k in s.custom.keys() ) alignment_keys = set( k for s in sups if s.alignment is not None for k in s.alignment.keys() ) if any(overlaps(s1, s2) for s1, s2 in zip(sups, sups[1:])) and any( s.text is not None for s in sups ): warnings.warn( "You are merging overlapping supervisions that have text transcripts. " "The result is likely to be unusable if you are going to train speech " f"recognition models (cut id: {self.id})." ) msup = SupervisionSegment( id=merge_func_(s.id for s in sups), # Make merged recording_id is a mix of recording_ids. recording_id=merge_func_(s.recording_id for s in sups), start=mstart, duration=mduration, # Hardcode -1 to indicate no specific channel, as the supervisions might have # come from different channels in their original recordings. channel=-1, text=" ".join(s.text for s in sups if s.text), speaker=merge_func_(s.speaker for s in sups if s.speaker), language=merge_func_(s.language for s in sups if s.language), gender=merge_func_(s.gender for s in sups if s.gender), custom={ k: merge_custom( k, ( s.custom[k] for s in sups if s.custom is not None and k in s.custom ), ) for k in custom_keys }, alignment={ # Concatenate the lists of alignment units. k: reduce( add, ( s.alignment[k] for s in sups if s.alignment is not None and k in s.alignment ), ) for k in alignment_keys }, ) new_cut = self.drop_supervisions() new_cut._first_non_padding_cut.supervisions = [msup] return new_cut
[docs] def filter_supervisions( self, predicate: Callable[[SupervisionSegment], bool] ) -> Cut: """ Modify cut to store only supervisions accepted by `predicate` Example: >>> cut = cut.filter_supervisions(lambda s: s.id in supervision_ids) >>> cut = cut.filter_supervisions(lambda s: s.duration < 5.0) >>> cut = cut.filter_supervisions(lambda s: s.text is not None) :param predicate: A callable that accepts `SupervisionSegment` and returns bool :return: a modified MixedCut """ new_mixed_cut = fastcopy( self, tracks=[ fastcopy(track, cut=track.cut.filter_supervisions(predicate)) for track in self.tracks ], ) return new_mixed_cut
[docs] @staticmethod def from_dict(data: dict) -> "MixedCut": if "type" in data: data.pop("type") return MixedCut( id=data["id"], tracks=[MixTrack.from_dict(track) for track in data["tracks"]], )
[docs] def with_features_path_prefix(self, path: Pathlike) -> "MixedCut": if not self.has_features: return self return MixedCut( id=self.id, tracks=[ fastcopy(t, cut=t.cut.with_features_path_prefix(path)) for t in self.tracks ], )
[docs] def with_recording_path_prefix(self, path: Pathlike) -> "MixedCut": if not self.has_recording: return self return MixedCut( id=self.id, tracks=[ fastcopy(t, cut=t.cut.with_recording_path_prefix(path)) for t in self.tracks ], )
@property def _first_non_padding_cut(self) -> DataCut: return self._first_non_padding_track.cut @property def _first_non_padding_track(self) -> MixTrack: return [t for t in self.tracks if not isinstance(t.cut, PaddingCut)][0]