Source code for lhotse.cut.mixed

import logging
import warnings
from dataclasses import dataclass
from functools import partial, reduce
from io import BytesIO
from operator import add
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    Iterable,
    List,
    Literal,
    Optional,
    Tuple,
    Union,
)

import numpy as np
import torch
from intervaltree import IntervalTree

from lhotse.array import Array, TemporalArray
from lhotse.audio import Recording, VideoInfo, get_audio_duration_mismatch_tolerance
from lhotse.audio.backend import save_audio
from lhotse.audio.mixer import AudioMixer, VideoMixer, audio_energy
from lhotse.augmentation import (
    AudioTransform,
    AugmentFn,
    LoudnessNormalization,
    ReverbWithImpulseResponse,
)
from lhotse.augmentation.compress import Codec
from lhotse.cut.base import Cut
from lhotse.cut.data import DataCut
from lhotse.cut.padding import PaddingCut
from lhotse.features import (
    FeatureExtractor,
    FeatureMixer,
    create_default_feature_extractor,
)
from lhotse.features.base import Features
from lhotse.features.io import FeaturesWriter
from lhotse.image import Image
from lhotse.supervision import SupervisionSegment
from lhotse.utils import (
    DEFAULT_PADDING_VALUE,
    LOG_EPSILON,
    Decibels,
    Pathlike,
    Seconds,
    add_durations,
    compute_num_frames,
    compute_num_samples,
    fastcopy,
    hash_str_to_int,
    merge_items_with_delimiter,
    overlaps,
    perturb_num_samples,
    rich_exception_info,
    uuid4,
)


[docs]@dataclass class MixTrack: """ Represents a single track in a mix of Cuts. Points to a specific DataCut or PaddingCut and holds information on how to mix it with other Cuts, relative to the first track in a mix. """ cut: Cut type: str = None offset: Seconds = 0.0 snr: Optional[Decibels] = None tag: Optional[str] = None is_snr_reference: bool = False mute: bool = False def __post_init__(self): self.type = type(self.cut).__name__
[docs] @staticmethod def from_dict(data: dict): from .set import deserialize_cut # Take out `type` from data dict and put it into the `cut` dict. cut_dict = data.pop("cut") cut_dict["type"] = data.pop("type") return MixTrack(deserialize_cut(cut_dict), **data)
[docs] def to_dict(self) -> Dict: ans = { "cut": self.cut.to_dict(), "type": self.type, "offset": self.offset, } if self.snr is not None: ans["snr"] = self.snr if self.tag is not None: ans["tag"] = self.tag if self.is_snr_reference: ans["is_snr_reference"] = self.is_snr_reference if self.mute: ans["mute"] = self.mute return ans
[docs]@dataclass class MixedCut(Cut): """ :class:`~lhotse.cut.MixedCut` is a :class:`~lhotse.cut.Cut` that actually consists of multiple other cuts. Its primary purpose is to allow time-domain and feature-domain augmentation via mixing the training cuts with noise, music, and babble cuts. The actual mixing operations are performed on-the-fly. Internally, :class:`~lhotse.cut.MixedCut` holds other cuts in multiple tracks (:class:`~lhotse.cut.MixTrack`), each with its own offset and SNR that is relative to the first track. Please refer to the documentation of :class:`~lhotse.cut.Cut` to learn more about using cuts. In addition to methods available in :class:`~lhotse.cut.Cut`, :class:`~lhotse.cut.MixedCut` provides the methods to read all of its tracks audio and features as separate channels: >>> cut = MixedCut(...) >>> mono_features = cut.load_features() >>> assert len(mono_features.shape) == 2 >>> multi_features = cut.load_features(mixed=False) >>> # Now, the first dimension is the channel. >>> assert len(multi_features.shape) == 3 .. note:: MixedCut is different from MultiCut, which is intended to represent multi-channel recordings that share the same supervisions. .. note:: Each track in a MixedCut can be either a MonoCut, MultiCut, or PaddingCut. .. note:: The ``transforms`` field is a list of dictionaries that describe the transformations that should be applied to the track after mixing. See also: - :class:`lhotse.cut.Cut` - :class:`lhotse.cut.MonoCut` - :class:`lhotse.cut.MultiCut` - :class:`lhotse.cut.CutSet` """ id: str tracks: List[MixTrack] transforms: Optional[List[AudioTransform]] = None @property def supervisions(self) -> List[SupervisionSegment]: """ Lists the supervisions of the underlying source cuts. Each segment start time will be adjusted by the track offset. """ return [ segment.with_offset(track.offset) for track in _get_audible_tracks(self) for segment in track.cut.supervisions ] @property def start(self) -> Seconds: return 0 @property def duration(self) -> Seconds: track_durations = ( track.offset + track.cut.duration for track in _get_audible_tracks(self) ) return round(max(track_durations), ndigits=8) @property def channel(self) -> Union[int, List[int]]: num_channels = self.num_channels return list(range(num_channels)) if num_channels > 1 else 0 @property def has_features(self) -> bool: return self._first_non_padding_cut.has_features @property def has_recording(self) -> bool: return self._first_non_padding_cut.has_recording @property def has_video(self) -> bool: return self._first_non_padding_cut.has_video @property def is_in_memory(self) -> bool: return any(track.cut.is_in_memory for track in _get_audible_tracks(self))
[docs] def has(self, field: str) -> bool: return self._first_non_padding_cut.has(field)
@property def num_frames(self) -> Optional[int]: if self.has_features: return compute_num_frames( duration=self.duration, frame_shift=self.frame_shift, sampling_rate=self.sampling_rate, ) return None @property def frame_shift(self) -> Optional[Seconds]: return self._first_non_padding_cut.frame_shift @property def sampling_rate(self) -> Optional[int]: return self._first_non_padding_cut.sampling_rate @property def num_samples(self) -> Optional[int]: return compute_num_samples(self.duration, self.sampling_rate) @property def num_features(self) -> Optional[int]: return self._first_non_padding_cut.num_features @property def num_channels(self) -> Optional[int]: # PaddingCut and MonoCut have 1 channel each, MultiCut has N. We don't support MixedCut with MixedCut tracks. return max(track.cut.num_channels for track in _get_audible_tracks(self)) @property def features_type(self) -> Optional[str]: return self._first_non_padding_cut.features.type if self.has_features else None
[docs] def to_dict(self) -> dict: ans = { "id": self.id, "tracks": [t.to_dict() for t in self.tracks], "type": type(self).__name__, } if self.transforms: ans["transforms"] = [t.to_dict() for t in self.transforms] return ans
[docs] def unmix(self, tag: Optional[str] = None) -> List[Cut]: """ Split this mixed cut into time-aligned constituent cuts. When ``tag`` is ``None``, this returns one cut per non-padding audible track. Each returned cut preserves the original offsets and overall duration, so the loaded audio/features can be summed to reconstruct the original mix. When ``tag`` is provided, this returns exactly two cuts in order: ``[without_tag, with_tag]``. Tracks are grouped by whether their :attr:`MixTrack.tag` matches ``tag``. For exact SNR preservation, the grouped outputs may carry an internal muted SNR-reference track that is ignored by the public track views but retained for mixing math. :param tag: Optional track-group label to split on. :return: A list of one cut per track, or two grouped cuts when ``tag`` is provided. """ tracks = [ track for track in _get_audible_tracks(self) if not isinstance(track.cut, PaddingCut) ] if tag is None: return [_to_unmixed_cut(self, [track]) for track in tracks] without_tag = [track for track in tracks if track.tag != tag] with_tag = [track for track in tracks if track.tag == tag] return [ _to_unmixed_cut(self, without_tag), _to_unmixed_cut(self, with_tag), ]
[docs] def iter_data( self, ) -> Generator[ Tuple[str, Union[Recording, Features, Array, TemporalArray, Image]], None, None ]: """ Iterate over each data piece attached to this cut. Returns a generator yielding tuples of ``(key, manifest)``, where ``key`` is the name of the attribute under which ``manifest`` is found. ``manifest`` is of type :class:`~lhotse.Recording`, :class:`~lhotse.Features`, :class:`~lhotse.TemporalArray`, :class:`~lhotse.Array`, or :class:`~lhotse.Image`. For example, if ``key`` is ``recording``, then ``manifest`` is ``self.recording``. """ return self._first_non_padding_cut.iter_data()
def __setattr__(self, key: str, value: Any) -> None: """ This magic function is called when the user tries to set an attribute. We use it as syntactic sugar to store custom attributes in ``self._first_non_padding_cut.custom`` field, so that they can be (de)serialized later. Setting a ``None`` value will remove the attribute from ``custom``. .. note:: MixedCut doesn't have its own ``custom`` field, and by convention always refers to the ``custom`` field on its first non padding cut. """ if key in self.__dataclass_fields__: super().__setattr__(key, value) else: setattr(self._first_non_padding_cut, key, value) def __getattr__(self, name: str) -> Any: """ This magic function is called when the user tries to access an attribute of :class:`.MixedCut` that doesn't exist. It is used for accessing the custom attributes of cuts. We support exactly one scenario for mixed cuts: If :attr:`tracks` contains exactly one :class:`.MonoCut` object (and an arbitrary number of :class:`.PaddingCut` objects), we will look up the custom attributes of that cut. If one of the custom attributes is of type :class:`~lhotse.array.Array` or :class:`~lhotse.array.TemporalArray` we'll also support loading those arrays (see example below). Additionally, we will incorporate extra padding as dictated by padding cuts. Example: >>> cut = MonoCut('cut1', start=0, duration=4, channel=0) >>> cut.alignment = TemporalArray(...) >>> mixed_cut = cut.pad(10, pad_value_dict={'alignment': -1}) >>> ali = mixed_cut.load_alignment() """ # Python will sometimes try to call undefined magic functions, # just fail for them (e.g. __setstate__ when pickling). if name.startswith("__"): raise AttributeError() # Loading a custom array attribute + performing padding. if name.startswith("load_"): attr_name = name[5:] return partial(self.load_custom, attr_name) if name == "custom": # Merge custom dicts of underlying data. ans = {} for t in _get_audible_tracks(self): if cstm := t.cut.custom: ans.update(cstm) return ans # Returning the contents of "mono_cut.custom[name]", # or raising AttributeError. tracks_with_attr = self._get_tracks_with_custom_attr(name) if tracks_with_attr: _, mono_cut = tracks_with_attr[0] return getattr(mono_cut, name) raise AttributeError(f"No such attribute: '{name}'")
[docs] def has_custom(self, name: str) -> bool: tracks_with_attr = self._get_tracks_with_custom_attr(name) if not tracks_with_attr: return False _, mono_cut = tracks_with_attr[0] return hasattr(mono_cut, name)
[docs] def load_custom(self, name: str) -> np.ndarray: """ Load custom data as numpy array. The custom data is expected to have been stored in cuts ``custom`` field as an :class:`~lhotse.array.Array` or :class:`~lhotse.array.TemporalArray` manifest. .. note:: It works with Array manifests stored via attribute assignments, e.g.: ``cut.my_custom_data = Array(...)``. .. note:: For :class:`.MixedCut` with :class:`~lhotse.audio.Recording`-type custom attributes, this supports multiple non-overlapping tracks (e.g. from :meth:`Cut.append`). The audio from each track's custom Recording is loaded and placed at the correct offset in the output buffer, similar to how :meth:`load_audio` works for the main recording. .. warning:: For :class:`~lhotse.array.Array` (non-temporal) and :class:`~lhotse.array.TemporalArray` custom attributes, this will only work if the mixed cut has a single non-padding track with that attribute. :param name: name of the custom attribute. :return: a numpy array with the data (after padding). """ from lhotse.array import Array, pad_array tracks_with_attr = self._get_tracks_with_custom_attr(name) assert len(tracks_with_attr) > 0, ( f"No non-padding tracks with custom attribute '{name}' found " f"in this MixedCut." ) # Use getattr to propagate AttributeError if "name" is not defined. first_idx, first_cut = tracks_with_attr[0] manifest = getattr(first_cut, name) # Multiple tracks with the attribute: only supported for Recording-type. if len(tracks_with_attr) > 1: if isinstance(manifest, Recording): return self._load_custom_recording_multi_track(name, tracks_with_attr) raise ValueError( f"This MixedCut has {len(tracks_with_attr)} non-padding tracks " f"with a custom attribute '{name}'. Mixing custom attributes is only " f"supported for Recording-type attributes. Problematic cut:\n{self}" ) # Single track with the attribute — existing behavior. # Check if the corresponding manifest for 'load_something' is of type # Array; if yes, just return the loaded data. # This is likely an embedding without a temporal dimension. if isinstance(manifest, Array): return first_cut.load_custom(name) # We are loading either an array with a temporal dimension, or a recording: # We need to pad it. left_padding = self.tracks[first_idx].offset padded_duration = self.duration # Then, check if it's a Recording. In that case we convert it to a cut, # leverage existing padding methods, and load padded audio data. if isinstance(manifest, Recording): return ( manifest.to_cut() .pad(duration=manifest.duration + left_padding, direction="left") .pad(duration=padded_duration, direction="right") .load_audio() ) # Load the array and retrieve the manifest from the only non-padding cut. # We'll also need to fetch the dict defining what padding value to use (if present). array = first_cut.load_custom(name) try: pad_value_dict = [ t.cut for t in self.tracks if isinstance(t.cut, PaddingCut) ][0].custom pad_value = pad_value_dict[name] except: pad_value = DEFAULT_PADDING_VALUE return pad_array( array, temporal_dim=manifest.temporal_dim, frame_shift=manifest.frame_shift, offset=left_padding, padded_duration=padded_duration, pad_value=pad_value, )
def _load_custom_recording_multi_track( self, name: str, tracks_with_attr: list, ) -> np.ndarray: """ Load and combine audio from a custom Recording attribute across multiple tracks. This handles the case where a MixedCut was created via :meth:`Cut.append` and each constituent cut has its own custom Recording (e.g., ``target_audio`` at a potentially different sampling rate than the main recording). """ first_idx, first_cut = tracks_with_attr[0] first_audio = first_cut.load_custom(name) first_recording = getattr(first_cut, name) custom_sr = first_recording.sampling_rate mixer = AudioMixer( base_audio=first_audio, sampling_rate=custom_sr, base_offset=self.tracks[first_idx].offset, ) for track_idx, cut in tracks_with_attr[1:]: audio = cut.load_custom(name) mixer.add_to_mix( audio=audio, offset=self.tracks[track_idx].offset, ) audio = mixer.mixed_audio # Fix off-by-one due to float arithmetic, same as in load_audio(). expected_num_samples = compute_num_samples(self.duration, custom_sr) tol_samples = compute_num_samples( get_audio_duration_mismatch_tolerance(), custom_sr ) num_samples_diff = audio.shape[1] - expected_num_samples if 0 < num_samples_diff < tol_samples: audio = audio[:, :expected_num_samples] if -tol_samples < num_samples_diff < 0: audio = np.pad(audio, [(0, 0), (0, -num_samples_diff)]) return audio def _get_tracks_with_custom_attr( self, attr_name: str, ) -> list: """ Return a list of ``(track_idx, cut)`` tuples for all non-padding tracks that have a custom attribute with the given name. """ return [ (idx, t.cut) for idx, t in enumerate(self.tracks) if isinstance(t.cut, DataCut) and not t.mute and t.cut.custom is not None and attr_name in t.cut.custom ]
[docs] def move_to_memory( self, audio_format: str = "flac", load_audio: bool = True, load_features: bool = True, load_custom: bool = True, ) -> "MixedCut": """ Load data (audio, features, or custom arrays) into memory and attach them to a copy of the manifest. This is useful when you want to store cuts together with the actual data in some binary format that enables sequential data reads. Audio is encoded with ``audio_format`` (compatible with ``torchaudio.save``), floating point features are encoded with lilcom, and other arrays are pickled. """ return fastcopy( self, tracks=[ fastcopy( t, cut=t.cut.move_to_memory( audio_format=audio_format, load_audio=load_audio, load_features=load_features, load_custom=load_custom, ), ) for t in self.tracks ], )
[docs] def to_mono( self, encoding: str = "flac", **kwargs, ) -> "Cut": """ Convert this MixedCut to a MonoCut by mixing all tracks and channels into a single one. The result audio array is stored in memory, and can be saved to disk by calling ``cut.save_audio(path, ...)`` on the result. .. hint:: the resulting MonoCut will have ``custom`` field populated with the ``custom`` value from the first track of the MixedCut. :param encoding: any of "wav", "flac", or "opus". :return: a new MonoCut instance. """ samples = self.load_audio(mono_downmix=True) stream = BytesIO() save_audio( stream, samples, self.sampling_rate, format=encoding, ) recording = Recording.from_bytes(stream.getvalue(), recording_id=self.id) return fastcopy( recording.to_cut(), supervisions=[fastcopy(s, channel=0) for s in self.supervisions], custom=self._first_non_padding_track.cut.custom, )
[docs] def truncate( self, *, offset: Seconds = 0.0, duration: Optional[Seconds] = None, keep_excessive_supervisions: bool = True, preserve_id: bool = False, _supervisions_index: Optional[Dict[str, IntervalTree]] = None, ) -> Cut: """ Returns a new MixedCut that is a sub-region of the current MixedCut. This method truncates the underlying Cuts and modifies their offsets in the mix, as needed. Tracks that do not fit in the truncated cut are removed. Note that no operation is done on the actual features - it's only during the call to load_features() when the actual changes happen (a subset of features is loaded). :param offset: float (seconds), controls the start of the new cut relative to the current MixedCut's start. :param duration: optional float (seconds), controls the duration of the resulting MixedCut. By default, the duration is (end of the cut before truncation) - (offset). :param keep_excessive_supervisions: bool. Since trimming may happen inside a SupervisionSegment, the caller has an option to either keep or discard such supervisions. :param preserve_id: bool. Should the truncated cut keep the same ID or get a new, random one. :return: a new MixedCut instance. """ assert ( offset >= 0 ), f"Offset for truncate must be non-negative (provided {offset})." old_duration = self.duration new_mix_end = ( add_durations(old_duration, -offset, sampling_rate=self.sampling_rate) if duration is None else add_durations(offset, duration, sampling_rate=self.sampling_rate) ) def truncate_track(track: MixTrack) -> Optional[MixTrack]: # First, determine how much of the beginning of the current track we're going to truncate: # when the track offset is larger than the truncation offset, we are not truncating the cut; # just decreasing the track offset. # 'cut_offset' determines how much we're going to truncate the Cut for the current track. cut_offset = max( add_durations(offset, -track.offset, sampling_rate=self.sampling_rate), 0, ) # 'track_offset' determines the new track's offset after truncation. track_offset = max( add_durations(track.offset, -offset, sampling_rate=self.sampling_rate), 0, ) # 'track_end' is expressed relative to the beginning of the mix # (not to be confused with the 'start' of the underlying MonoCut) track_end = add_durations( track.offset, track.cut.duration, sampling_rate=self.sampling_rate ) if track_end < offset: # Omit a Cut that ends before the truncation offset. return None cut_duration_decrease = 0 if track_end > new_mix_end: if duration is not None: cut_duration_decrease = add_durations( track_end, -new_mix_end, sampling_rate=self.sampling_rate ) else: cut_duration_decrease = add_durations( track_end, -old_duration, sampling_rate=self.sampling_rate ) # Compute the new Cut's duration after trimming the start and the end. new_duration = add_durations( track.cut.duration, -cut_offset, -cut_duration_decrease, sampling_rate=self.sampling_rate, ) if new_duration <= 0: # Omit a Cut that is completely outside the time span of the new truncated MixedCut. return None return MixTrack( cut=track.cut.truncate( offset=cut_offset, duration=new_duration, keep_excessive_supervisions=keep_excessive_supervisions, preserve_id=preserve_id, _supervisions_index=_supervisions_index, ), offset=track_offset, snr=track.snr, tag=track.tag, is_snr_reference=track.is_snr_reference, mute=track.mute, ) new_tracks = [ new_track for new_track in ( truncate_track(track) for track in sorted(self.tracks, key=lambda t: t.offset) ) if new_track is not None ] # Edge case: no tracks with data left after truncation. This can happen if we truncated an offset region. # In this case, return a PaddingCut of the requested duration if len([t for t in new_tracks if not isinstance(t.cut, PaddingCut)]) == 0: return PaddingCut( id=self.id if preserve_id else str(uuid4()), duration=duration, sampling_rate=self.sampling_rate, feat_value=0.0, num_samples=compute_num_samples(duration, self.sampling_rate), ) if len(new_tracks) == 1: # The truncation resulted in just a single cut - simply return it. return new_tracks[0].cut new_cut = MixedCut( id=self.id if preserve_id else str(uuid4()), tracks=new_tracks, ) # Final edge-case check. Scenario: # - some of the original MixedCut tracks had specified an SNR # - we truncated away the track that served as an SNR reference # - we are left only with PaddingCuts and MonoCuts that have specified SNR # Solution: # - find first non padding cut and reset its SNR to None (make it the new reference) if not any(track.is_snr_reference for track in new_cut.tracks) and all( t.snr is not None or isinstance(t.cut, PaddingCut) for t in new_cut.tracks ): first_non_padding_track_idx = [ idx for idx, t in enumerate(new_cut.tracks) if not isinstance(t.cut, PaddingCut) ][0] new_cut.tracks[first_non_padding_track_idx] = fastcopy( new_cut.tracks[first_non_padding_track_idx], snr=None, is_snr_reference=True, ) return new_cut
[docs] def extend_by( self, *, duration: Seconds, direction: str = "both", preserve_id: bool = False, pad_silence: bool = True, ) -> "MixedCut": """ This raises a ValueError since extending a MixedCut is not defined. :param duration: float (seconds), duration (in seconds) to extend the MixedCut. :param direction: string, 'left', 'right' or 'both'. Determines whether to extend on the left, right, or both sides. If 'both', extend on both sides by the duration specified in `duration`. :param preserve_id: bool. Should the extended cut keep the same ID or get a new, random one. :param pad_silence: bool. See usage in `lhotse.cut.MonoCut.extend_by`. :return: a new MixedCut instance. """ raise ValueError("The extend_by() method is not defined for a MixedCut.")
[docs] def pad( self, duration: Seconds = None, num_frames: int = None, num_samples: int = None, pad_feat_value: float = LOG_EPSILON, direction: str = "right", preserve_id: bool = False, pad_value_dict: Optional[Dict[str, Union[int, float]]] = None, ) -> Cut: """ Return a new MixedCut, padded with zeros in the recording, and ``pad_feat_value`` in each feature bin. The user can choose to pad either to a specific `duration`; a specific number of frames `num_frames`; or a specific number of samples `num_samples`. The three arguments are mutually exclusive. :param duration: The cut's minimal duration after padding. :param num_frames: The cut's total number of frames after padding. :param num_samples: The cut's total number of samples after padding. :param pad_feat_value: A float value that's used for padding the features. By default we assume a log-energy floor of approx. -23 (1e-10 after exp). :param direction: string, 'left', 'right' or 'both'. Determines whether the padding is added before or after the cut. :param preserve_id: When ``True``, preserves the cut ID from before padding. Otherwise, generates a new random ID (default). :param pad_value_dict: Optional dict that specifies what value should be used for padding arrays in custom attributes. :return: a padded MixedCut if duration is greater than this cut's duration, otherwise ``self``. """ from .set import pad return pad( self, duration=duration, num_frames=num_frames, num_samples=num_samples, pad_feat_value=pad_feat_value, direction=direction, preserve_id=preserve_id, pad_value_dict=pad_value_dict, )
[docs] def resample( self, sampling_rate: int, affix_id: bool = False, recording_field: Optional[str] = None, ) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily resample the audio while reading it. This operation will drop the feature manifest, if attached. It does not affect the supervision. :param sampling_rate: The new sampling rate. :param affix_id: Should we modify the ID (useful if both versions of the same cut are going to be present in a single manifest). :param recording_field: which recording field to resample. :return: a modified copy of the current ``MixedCut``. """ assert self.has_recording, "Cannot resample a MixedCut without Recording." return MixedCut( id=f"{self.id}_rs{sampling_rate}" if affix_id else self.id, tracks=[ fastcopy( t, cut=t.cut.resample(sampling_rate, recording_field=recording_field), ) for t in self.tracks ], )
[docs] def compress( self, codec: Codec = "opus", compression_level: float = 0.99, compress_custom_fields: bool = False, ): """ Return a copy of this Cut that has Recordings in its sub-Cuts processed by a lossy encoding. :param codec: The codec to use for compression. Supported codecs are "opus", "mp3", "vorbis", "gsm". :param compression_level: The level of compression (from 0.0 to 1.0, higher values correspond to higher compression). :param compress_custom_fields: Whether to also compress any custom recording fields in sub-Cuts. :return: A modified :class:`~lhotse.MixedCut` containing audio processed by a codec """ assert self.has_recording, "Cannot compress a MixedCut without a Recording." return MixedCut( id=self.id, tracks=[ fastcopy( t, cut=t.cut.compress( codec, compression_level, compress_custom_fields ), ) for t in self.tracks ], )
[docs] def perturb_speed(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the speed while loading audio. The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts (and their Recordings and SupervisionSegments) are updated to reflect the shrinking/extending effect of speed. We are also updating the offsets of all underlying tracks. :param factor: The speed will be adjusted this many times (e.g. factor=1.1 means 1.1x faster). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_sp{factor}". :return: a modified copy of the current ``MixedCut``. """ # TODO(pzelasko): test most extensively for edge cases # Pre-conditions assert ( self.has_recording ), "Cannot perturb speed on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to perturb speed on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "speed perturbation." ) return MixedCut( id=f"{self.id}_sp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_speed(factor=factor, affix_id=affix_id), offset=round( perturb_num_samples( num_samples=compute_num_samples( track.offset, self.sampling_rate ), factor=factor, ) / self.sampling_rate, ndigits=8, ), ) for track in self.tracks ], )
[docs] def perturb_tempo(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the tempo while loading audio. Compared to speed perturbation, tempo preserves pitch. The ``num_samples``, ``start`` and ``duration`` fields of the underlying Cuts (and their Recordings and SupervisionSegments) are updated to reflect the shrinking/extending effect of tempo. We are also updating the offsets of all underlying tracks. :param factor: The tempo will be adjusted this many times (e.g. factor=1.1 means 1.1x faster). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_tp{factor}". :return: a modified copy of the current ``MixedCut``. """ # TODO(pzelasko): test most extensively for edge cases # Pre-conditions assert ( self.has_recording ), "Cannot perturb tempo on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to perturb tempo on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "tempo perturbation." ) return MixedCut( id=f"{self.id}_tp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_tempo(factor=factor, affix_id=affix_id), offset=round( perturb_num_samples( num_samples=compute_num_samples( track.offset, self.sampling_rate ), factor=factor, ) / self.sampling_rate, ndigits=8, ), ) for track in self.tracks ], )
[docs] def perturb_volume(self, factor: float, affix_id: bool = True) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily perturb the volume while loading audio. Recordings of the underlying Cuts are updated to reflect volume change. :param factor: The volume will be adjusted this many times (e.g. factor=1.1 means 1.1x louder). :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_vp{factor}". :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot perturb volume on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to perturb volume on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "volume perturbation." ) return MixedCut( id=f"{self.id}_vp{factor}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.perturb_volume(factor=factor, affix_id=affix_id), ) for track in self.tracks ], )
[docs] def clip_amplitude( self, hard: bool = False, gain_db: float = 0.0, normalize: bool = True, oversampling: Optional[int] = 2, affix_id: bool = True, ) -> "MixedCut": """ Return a new ``MixedCut`` that will lazily apply clipping while loading audio. Recordings of the underlying Cuts are updated to reflect clipping change. :param hard: If True, apply hard clipping (sharp cutoff); otherwise, apply soft clipping (saturation). :param gain_db: The amount of gain in decibels to apply before clipping. :param normalize: If True, normalize the input signal to 0 dBFS before applying clipping. :param oversampling: If provided, we will oversample the input signal by the given integer factor before applying saturation and then downsample back to the original sampling rate. :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_cl{gain_db}". :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply saturation on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to apply saturation on a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "saturation." ) return MixedCut( id=f"{self.id}_cl{gain_db}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.clip_amplitude( hard=hard, gain_db=gain_db, normalize=normalize, oversampling=oversampling, affix_id=affix_id, ), ) for track in self.tracks ], )
[docs] def normalize_loudness( self, target: float, mix_first: bool = True, affix_id: bool = False ) -> "DataCut": """ Return a new ``MixedCut`` that will lazily apply loudness normalization. :param target: The target loudness in dBFS. :param mix_first: If true, we will mix the underlying cuts before applying loudness normalization. If false, we cannot guarantee that the resulting cut will have the target loudness. :param affix_id: When true, we will modify the ``DataCut.id`` field by affixing it with "_ln{target}". :return: a modified copy of the current ``DataCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply loudness normalization on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to normalize loudness on a MixedCut that references pre-computed features. " "The feature manifest will be detached, as we do not support feature-domain " "loudness normalization." ) self.features = None if mix_first: transforms = self.transforms.copy() if self.transforms is not None else [] transforms.append(LoudnessNormalization(target=target)) return fastcopy( self, id=f"{self.id}_ln{target}" if affix_id else self.id, transforms=transforms, ) else: return MixedCut( id=f"{self.id}_ln{target}" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.normalize_loudness( target=target, affix_id=affix_id ), ) for track in self.tracks ], )
[docs] def reverb_rir( self, rir_recording: Optional["Recording"] = None, normalize_output: bool = True, early_only: bool = False, affix_id: bool = True, rir_channels: List[int] = [0], room_rng_seed: Optional[int] = None, source_rng_seed: Optional[int] = None, mix_first: bool = True, ) -> "MixedCut": """ Return a new ``MixedCut`` that will convolve the audio with the provided impulse response. If no ``rir_recording`` is provided, we will generate an impulse response using a fast random generator (https://arxiv.org/abs/2208.04101). :param rir_recording: The impulse response to use for convolving. :param normalize_output: When true, output will be normalized to have energy as input. :param early_only: When true, only the early reflections (first 50 ms) will be used. :param affix_id: When true, we will modify the ``MixedCut.id`` field by affixing it with "_rvb". :param rir_channels: The channels of the impulse response to use. By default, first channel is used. If only one channel is specified, all tracks will be convolved with this channel. If a list is provided, it must contain as many channels as there are tracks such that each track will be convolved with one of the specified channels. :param room_rng_seed: Seed for the room configuration. :param source_rng_seed: Seed for the source position. :param mix_first: When true, the mixing will be done first before convolving with the RIR. This effectively means that all tracks will be convolved with the same RIR. If you are simulating multi-speaker mixtures, you should set this to False. :return: a modified copy of the current ``MixedCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply reverberation on a MixedCut without Recording." if self.has_features: logging.warning( "Attempting to reverberate a MixedCut that references pre-computed features. " "The feature manifest(s) will be detached, as we do not support feature-domain " "reverberation." ) assert rir_recording is None or all( c < rir_recording.num_channels for c in rir_channels ), "Invalid channel index in `rir_channels`." audible_tracks = _get_audible_tracks(self) assert len(rir_channels) == 1 or len(rir_channels) == len( audible_tracks ), "Invalid number of channels in `rir_channels`, must be either 1 or equal to the number of tracks." # There are 2 ways to apply RIRs: # 1. Mix the tracks first, then apply RIRs. This is same as applying the same RIR # to all tracks. It does not make sense if all tracks belong to different speakers, # but it is useful for cases when we have a mixture of MonoCut and PaddingCut, # and we want to apply the same RIR to all of them. # 2. Apply RIRs to each track separately. This is useful when we want to simulate # different speakers in the same room. # First simulate the room config (will only be used if RIR is not provided) uuid4_str = str(uuid4()) # The room RNG seed is based on the cut ID. This ensures that all tracks in the # mixed cut will have the same room configuration. if room_rng_seed is None: room_rng_seed = hash_str_to_int(uuid4_str + self.id) # The source RNG seed is based on the track ID. This ensures that each track # will have a different source position. source_rng_seeds = [source_rng_seed] * len(self.tracks) if source_rng_seed is None: source_rng_seeds = [ hash_str_to_int(uuid4_str + track.cut.id) for track in self.tracks ] source_rng_seed = source_rng_seeds[0] # Apply same RIR to all tracks after mixing (default) if mix_first: if rir_recording is None: from lhotse.augmentation.utils import FastRandomRIRGenerator rir_generator = FastRandomRIRGenerator( sr=self.sampling_rate, room_seed=room_rng_seed, source_seed=source_rng_seed, ) else: rir_generator = None transforms = self.transforms.copy() if self.transforms is not None else [] transforms.append( ReverbWithImpulseResponse( rir=rir_recording, normalize_output=normalize_output, early_only=early_only, rir_channels=rir_channels if rir_channels is not None else [0], rir_generator=rir_generator, ) ) return fastcopy( self, id=f"{self.id}_rvb" if affix_id else self.id, transforms=transforms, ) # Apply RIRs to each track separately. Note that we do not pass a `mix_first` # argument below since it is True by default. if len(rir_channels) == 1: rir_channels = rir_channels * len(self.tracks) else: audible_channels = iter(rir_channels) rir_channels = [ next(audible_channels) if not track.mute else rir_channels[0] for track in self.tracks ] return MixedCut( id=f"{self.id}_rvb" if affix_id else self.id, tracks=[ fastcopy( track, cut=track.cut.reverb_rir( rir_recording=rir_recording, normalize_output=normalize_output, early_only=early_only, affix_id=affix_id, rir_channels=[channel], room_rng_seed=room_rng_seed, source_rng_seed=seed, ), ) for track, channel, seed in zip( self.tracks, rir_channels, source_rng_seeds ) ], )
[docs] @rich_exception_info def load_features(self, mixed: bool = True) -> Optional[np.ndarray]: """ Loads the features of the source cuts and mixes them on-the-fly. :param mixed: when True (default), the features are mixed together (as defined in the mixing function for the extractor). This could result in either a 2D or 3D array. For example, if all underlying tracks are single-channel, the output will be a 2D array of shape (num_frames, num_features). If any of the tracks are multi-channel, the output may be a 3D array of shape (num_frames, num_features, num_channels). :return: A numpy ndarray with features and with shape ``(num_frames, num_features)``, or ``(num_tracks, num_frames, num_features)`` """ if not self.has_features: return None tracks = _get_audible_tracks(self) first_track = tracks[0] first_cut = first_track.cut # First, check for a simple scenario: just a single cut with padding. # When that is the case, we don't have to instantiate a feature extractor, # because we are not performing any actual mixing. # That makes life simpler for the users who have a custom feature extractor, # but don't actually care about feature-domain mixing; just want to pad. if ( mixed and first_track.snr is None and tracks[1:] and all(isinstance(t.cut, PaddingCut) for t in tracks[1:]) ): padding_val = tracks[1].cut.feat_value first_cut_feats = first_cut.load_features() if first_cut_feats.ndim == 2: # 2D features feats = np.ones((self.num_frames, self.num_features)) * padding_val else: # 3D features feats = ( np.ones( (self.num_frames, self.num_features, first_cut_feats.shape[-1]) ) * padding_val ) feats[: first_cut.num_frames, ...] = first_cut_feats return feats # When there is more than one "regular" cut, we will perform an actual mix. # First, we have to make sure that the reference energy levels are appropriate. # They might not be if the first track is a padding track. reference_feats = None reference_energy = None _, reference_track = _get_snr_reference_track(self) feature_extractor = create_default_feature_extractor( reference_track.cut.features_type ) if reference_track is not first_track: reference_feats = reference_track.cut.load_features() reference_energy = feature_extractor.compute_energy(reference_feats) # The mix itself. first_cut_feats = first_cut.load_features() first_cut_feats = _scale_features_for_snr( first_cut_feats, feature_extractor=feature_extractor, snr=first_track.snr, reference_energy=reference_energy, ) mixer = FeatureMixer( feature_extractor=feature_extractor, base_feats=first_cut_feats, frame_shift=first_cut.frame_shift, reference_energy=reference_energy, ) for track in tracks[1:]: if track is reference_track and reference_feats is not None: feats = reference_feats # manual caching to avoid duplicated I/O else: feats = track.cut.load_features() mixer.add_to_mix( feats=feats, snr=track.snr, offset=track.offset, sampling_rate=track.cut.sampling_rate, ) if mixed: # Checking for some edge cases below. feats = mixer.mixed_feats # Note: The slicing below is a work-around for an edge-case # when two cuts have durations that ended with 0.005 (e.g. 10.125 and 5.715) # - then, the feature extractor "squeezed in" a last extra frame and the simple # relationship between num_frames and duration we strived for is not true; # i.e. the duration is 10.125 + 5.715 = 15.84, but the number of frames is # 1013 + 572 = 1585. If the frame_shift is 0.01, we have gained an extra 0.01s... if feats.shape[0] - self.num_frames == 1: feats = feats[: self.num_frames, :] # TODO(pzelasko): This can sometimes happen in a MixedCut with >= 5 different Cuts, # with a regular MonoCut at the end, when the mix offsets are floats with a lot of decimals. # For now we're duplicating the last frame to match the declared "num_frames" of this cut. if feats.shape[0] - self.num_frames == -1: feats = np.concatenate((feats, feats[-1:, :]), axis=0) assert feats.shape[0] == self.num_frames, ( "Inconsistent number of frames in a MixedCut: please report " "this issue at https://github.com/lhotse-speech/lhotse/issues " "showing the output of print(cut) or str(cut) on which" "load_features() was called." ) return feats else: return mixer.unmixed_feats
[docs] @rich_exception_info def load_audio( self, mixed: bool = True, mono_downmix: bool = False ) -> Optional[np.ndarray]: """ Loads the audios of the source cuts and mix them on-the-fly. :param mixed: When True (default), returns a mix of the underlying tracks. This will return a numpy array with shape ``(num_channels, num_samples)``, where ``num_channels`` is determined by the ``num_channels`` property of the MixedCut. Otherwise returns a numpy array with the number of channels equal to the total number of channels across all tracks in the MixedCut. For example, if it contains a MultiCut with 2 channels and a MonoCut with 1 channel, the returned array will have shape ``(3, num_samples)``. :param mono_downmix: If the MixedCut contains > 1 channels (for e.g. when one of its tracks is a MultiCut), this parameter controls whether the returned array will be down-mixed to a single channel. This down-mixing is done by summing the channels together. :return: A numpy ndarray with audio samples and with shape ``(num_channels, num_samples)`` """ if not self.has_recording: return None tracks = _get_audible_tracks(self) first_track = tracks[0] first_cut = first_track.cut # First, we have to make sure that the reference energy levels are appropriate. # They might not be if the first track is a padding track. reference_audio = None reference_energy = None _, reference_track = _get_snr_reference_track(self) if reference_track is not first_track: reference_audio = reference_track.cut.load_audio() reference_energy = audio_energy(reference_audio) first_cut_audio = _scale_audio_for_snr( first_cut.load_audio(), snr=first_track.snr, reference_energy=reference_energy, ) mixer = AudioMixer( first_cut_audio, sampling_rate=first_cut.sampling_rate, reference_energy=reference_energy, base_offset=first_track.offset, ) for track in tracks[1:]: if track is reference_track and reference_audio is not None: audio = reference_audio # manual caching to avoid duplicated I/O else: audio = track.cut.load_audio() mixer.add_to_mix( audio=audio, snr=track.snr, offset=track.offset, ) # Flattening a MixedCut without MultiCut tracks has no effect mono_downmix = mono_downmix and any( track.type == "MultiCut" for track in tracks ) # Flattening a MixedCut without mixed=True has no effect mono_downmix = mono_downmix and mixed if mixed: # Off-by-one errors can happen during mixing due to imperfect float arithmetic and rounding; # we will fix them on-the-fly so that the manifest does not lie about the num_samples. audio = mixer.mixed_mono_audio if mono_downmix else mixer.mixed_audio tol_samples = compute_num_samples( get_audio_duration_mismatch_tolerance(), sampling_rate=self.sampling_rate, ) num_samples_diff = audio.shape[1] - self.num_samples if 0 < num_samples_diff < tol_samples: audio = audio[:, : self.num_samples] if -tol_samples < num_samples_diff < 0: audio = np.pad(audio, [(0, 0), (0, -num_samples_diff)], mode="reflect") assert audio.shape[1] == self.num_samples, ( f"Inconsistent number of samples in a MixedCut. Expected {self.num_samples} " f"but the output of mix has {audio.shape[1]}. Please report " f"this issue at https://github.com/lhotse-speech/lhotse/issues " f"showing the cut below. MixedCut:\n{self}" ) # We'll apply the transforms now (if any). transforms = [ tnfm if isinstance(tnfm, AudioTransform) else AudioTransform.from_dict(tnfm) for tnfm in self.transforms or [] ] for tfn in transforms: audio = tfn(audio, self.sampling_rate) else: audio = mixer.unmixed_audio return audio
@property def video(self) -> Optional[VideoInfo]: if self.has_video: v = self._first_non_padding_cut.video return v.copy_with(num_frames=compute_num_samples(self.duration, v.fps)) return None
[docs] @rich_exception_info def load_video( self, with_audio: bool = True, mixed: bool = True, mono_downmix: bool = False, ) -> Optional[Tuple[torch.Tensor, Optional[torch.Tensor]]]: if not self.has_video: return None tracks = _get_audible_tracks(self) mixer = VideoMixer( tracks[0].cut.load_video(with_audio=False)[0], fps=self.video.fps, base_offset=tracks[0].offset, ) for track in tracks[1:]: mixer.add_to_mix( video=track.cut.load_video(with_audio=False)[0], offset=track.offset, ) video = mixer.mixed_video if with_audio: # For now load audio separately to re-use the complex logic of load_audio # This means the same video file is potentially opened twice, but given the # cost of video decoding, the extra file open cost could be negligible. audio = self.load_audio(mixed=mixed, mono_downmix=mono_downmix) return video, torch.from_numpy(audio)
[docs] def plot_tracks_features(self): """ Display the feature matrix as an image. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt tracks = _get_audible_tracks(self) fig, axes = plt.subplots(len(tracks)) features = self.load_features(mixed=False) fmin, fmax = features.min(), features.max() for idx, ax in enumerate(axes): ax.imshow(np.flip(features[idx].transpose(1, 0), 0), vmin=fmin, vmax=fmax) return axes
[docs] def plot_tracks_audio(self): """ Display plots of the individual tracks' waveforms. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt audio = self.load_audio(mixed=False) tracks = _get_audible_tracks(self) fig, axes = plt.subplots(len(tracks), sharex=False, sharey=True) for idx, (track, ax) in enumerate(zip(tracks, axes)): samples = audio[idx].squeeze(0) ax.plot(np.linspace(0, self.duration, len(samples)), samples) for supervision in track.cut.supervisions: supervision = supervision.trim(track.cut.duration) ax.axvspan( track.offset + supervision.start, track.offset + supervision.end, color="green", alpha=0.1, ) return axes
[docs] def drop_features(self) -> "MixedCut": """Return a copy of the current :class:`MixedCut`, detached from ``features``.""" assert ( self.has_recording ), f"Cannot detach features from a MixedCut with no Recording (cut ID = {self.id})." return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_features()) for t in self.tracks], )
[docs] def drop_recording(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``recording``.""" assert ( self.has_features ), f"Cannot detach recording from a MixedCut with no Features (cut ID = {self.id})." return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_recording()) for t in self.tracks], )
[docs] def drop_supervisions(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``.""" return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_supervisions()) for t in self.tracks], )
[docs] def drop_alignments(self) -> "MixedCut": """Return a copy of the current :class:`.MixedCut`, detached from ``supervisions``.""" return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_alignments()) for t in self.tracks], )
[docs] def drop_in_memory_data(self) -> "MixedCut": """Return a copy of the current :class:`MixedCut`, which doesn't contain any in-memory data.""" return fastcopy( self, tracks=[fastcopy(t, cut=t.cut.drop_in_memory_data()) for t in self.tracks], )
[docs] def compute_and_store_features( self, extractor: FeatureExtractor, storage: FeaturesWriter, augment_fn: Optional[AugmentFn] = None, mix_eagerly: bool = True, ) -> DataCut: """ Compute the features from this cut, store them on disk, and create a new `MonoCut` object with the feature manifest attached. This cut has to be able to load audio. :param extractor: a ``FeatureExtractor`` instance used to compute the features. :param storage: a ``FeaturesWriter`` instance used to store the features. When the optional ``lilcom`` dependency is installed and on-disk size matters, ``LilcomChunkyWriter`` is the preferred backend. :param augment_fn: an optional callable used for audio augmentation. :param mix_eagerly: when False, extract and store the features for each track separately, and mix them dynamically when loading the features. When True, mix the audio first and store the mixed features, returning a new ``MonoCut`` instance with the same ID. The returned ``MonoCut`` will not have a ``Recording`` attached. :return: a new ``MonoCut`` instance if ``mix_eagerly`` is True, or returns ``self`` with each of the tracks containing the ``Features`` manifests. """ if mix_eagerly: from .mono import MonoCut features_info = extractor.extract_from_samples_and_store( samples=self.load_audio(), storage=storage, sampling_rate=self.sampling_rate, offset=0, channel=0, augment_fn=augment_fn, ) features_info.recording_id = self.id return MonoCut( id=self.id, start=0, duration=self.duration, channel=0, supervisions=[ fastcopy(s, recording_id=self.id) for s in self.supervisions ], features=features_info, recording=None, custom=self.custom if hasattr(self, "custom") else None, ) else: # mix lazily new_tracks = [ MixTrack( cut=track.cut.compute_and_store_features( extractor=extractor, storage=storage, augment_fn=augment_fn, ), offset=track.offset, snr=track.snr, tag=track.tag, is_snr_reference=track.is_snr_reference, mute=track.mute, ) for track in self.tracks ] return MixedCut( id=self.id, tracks=new_tracks, )
[docs] def fill_supervision( self, add_empty: bool = True, shrink_ok: bool = False ) -> "MixedCut": """ Fills the whole duration of a cut with a supervision segment. If the cut has one supervision, its start is set to 0 and duration is set to ``cut.duration``. Note: this may either expand a supervision that was shorter than a cut, or shrink a supervision that exceeds the cut. If there are no supervisions, we will add an empty one when ``add_empty==True``, otherwise we won't change anything. If there are two or more supervisions, we will raise an exception. .. note:: For :class:`.MixedCut`, we expect that only one track contains a supervision. That supervision will be expanded to cover the full MixedCut's duration. :param add_empty: should we add an empty supervision with identical time bounds as the cut. :param shrink_ok: should we raise an error if a supervision would be shrank as a result of calling this method. """ n_sups = len(self.supervisions) if n_sups == 0: if not add_empty: return self first_non_padding_idx = self.tracks.index(self._first_non_padding_track) new_tracks = [ fastcopy( t, cut=fastcopy( t.cut, supervisions=[ SupervisionSegment( id=self.id, recording_id=t.cut.recording_id, start=-t.offset, duration=self.duration, channel=-1, ) ], ), ) if idx == first_non_padding_idx else t for idx, t in enumerate(self.tracks) ] else: assert ( n_sups == 1 ), f"Cannot expand more than one supervision (found {len(self.supervisions)}." new_tracks = [] for t in self.tracks: if t.mute: new_tracks.append(t) continue if len(t.cut.supervisions) == 0: new_tracks.append(t) else: sup = t.cut.supervisions[0] if not shrink_ok and ( sup.start < -t.offset or sup.end > self.duration ): raise ValueError( f"Cannot shrink supervision (start={sup.start}, end={sup.end}) to cut " f"(start=0, duration={t.cut.duration}) because the argument `shrink_ok` is `False`. " f"Note: this check prevents accidental data loss for speech recognition, " f"as supervision exceeding a cut indicates there might be some spoken content " f"beyond cuts start or end (an ASR model would be trained to predict more text than " f"spoken in the audio). If this is okay, set `shrink_ok` to `True`." ) new_tracks.append( fastcopy( t, cut=fastcopy( t.cut, supervisions=[ fastcopy( sup, start=-t.offset, duration=self.duration ) ], ), ) ) return fastcopy(self, tracks=new_tracks)
[docs] def map_supervisions( self, transform_fn: Callable[[SupervisionSegment], SupervisionSegment] ) -> Cut: """ Modify the SupervisionSegments by `transform_fn` of this MixedCut. :param transform_fn: a function that modifies a supervision as an argument. :return: a modified MixedCut. """ new_mixed_cut = fastcopy(self) for track in new_mixed_cut.tracks: if isinstance(track.cut, PaddingCut) or track.mute: continue track.cut.supervisions = [ segment.map(transform_fn) for segment in track.cut.supervisions ] return new_mixed_cut
[docs] def merge_supervisions( self, merge_policy: str = "delimiter", custom_merge_fn: Optional[Callable[[str, Iterable[Any]], Any]] = None, ) -> "MixedCut": """ Return a copy of the cut that has all of its supervisions merged into a single segment. The new start is the start of the earliest superivion, and the new duration is a minimum spanning duration for all the supervisions. The text fields are concatenated with a whitespace. .. note:: If you're using individual tracks of a mixed cut, note that this transform drops all the supervisions in individual tracks and assigns the merged supervision in the first :class:`.DataCut` found in ``self.tracks``. :param merge_policy: one of "keep_first" or "delimiter". If "keep_first", we keep only the first segment's field value, otherwise all string fields (including IDs) are prefixed with "cat#" and concatenated with a hash symbol "#". This is also applied to ``custom`` fields. Fields with a ``None`` value are omitted. :param custom_merge_fn: a function that will be called to merge custom fields values. We expect ``custom_merge_fn`` to handle all possible custom keys. When not provided, we will treat all custom values as strings. It will be called roughly like: ``custom_merge_fn(custom_key, [s.custom[custom_key] for s in sups])`` """ merge_func_ = partial( merge_items_with_delimiter, delimiter="#", return_first=(merge_policy == "keep_first"), ) # "m" stands for merged in variable names below if custom_merge_fn is not None: # Merge custom fields with the user-provided function. merge_custom = custom_merge_fn else: # Merge the string representations of custom fields. merge_custom = lambda k, vs: merge_func_(map(str, vs)) sups = sorted(self.supervisions, key=lambda s: s.start) if len(sups) <= 1: return self # the sampling rate is arbitrary, ensures there are no float precision errors mstart = sups[0].start mend = sups[-1].end mduration = add_durations(mend, -mstart, sampling_rate=self.sampling_rate) custom_keys = set( k for s in sups if s.custom is not None for k in s.custom.keys() ) alignment_keys = set( k for s in sups if s.alignment is not None for k in s.alignment.keys() ) if any(overlaps(s1, s2) for s1, s2 in zip(sups, sups[1:])) and any( s.text is not None for s in sups ): warnings.warn( "You are merging overlapping supervisions that have text transcripts. " "The result is likely to be unusable if you are going to train speech " f"recognition models (cut id: {self.id})." ) msup = SupervisionSegment( id=merge_func_(s.id for s in sups), # Make merged recording_id is a mix of recording_ids. recording_id=merge_func_(s.recording_id for s in sups), start=mstart, duration=mduration, # Hardcode -1 to indicate no specific channel, as the supervisions might have # come from different channels in their original recordings. channel=-1, text=" ".join(s.text for s in sups if s.text), speaker=merge_func_(s.speaker for s in sups if s.speaker), language=merge_func_(s.language for s in sups if s.language), gender=merge_func_(s.gender for s in sups if s.gender), custom={ k: merge_custom( k, ( s.custom[k] for s in sups if s.custom is not None and k in s.custom ), ) for k in custom_keys }, alignment={ # Concatenate the lists of alignment units. k: reduce( add, ( s.alignment[k] for s in sups if s.alignment is not None and k in s.alignment ), ) for k in alignment_keys }, ) new_cut = self.drop_supervisions() new_cut._first_non_padding_cut.supervisions = [msup] return new_cut
[docs] def filter_supervisions( self, predicate: Callable[[SupervisionSegment], bool] ) -> Cut: """ Modify cut to store only supervisions accepted by `predicate` Example: >>> cut = cut.filter_supervisions(lambda s: s.id in supervision_ids) >>> cut = cut.filter_supervisions(lambda s: s.duration < 5.0) >>> cut = cut.filter_supervisions(lambda s: s.text is not None) :param predicate: A callable that accepts `SupervisionSegment` and returns bool :return: a modified MixedCut """ new_mixed_cut = fastcopy( self, tracks=[ fastcopy(track, cut=track.cut.filter_supervisions(predicate)) for track in self.tracks ], ) return new_mixed_cut
[docs] @staticmethod def from_dict(data: dict) -> "MixedCut": if "type" in data: data.pop("type") transforms = None if "transforms" in data: transforms = [AudioTransform.from_dict(t) for t in data["transforms"]] tracks = [MixTrack.from_dict(track) for track in data["tracks"]] if "snr_reference" in data: tracks.append( fastcopy( MixTrack.from_dict(data["snr_reference"]), is_snr_reference=True, mute=True, ) ) return MixedCut( id=data["id"], tracks=tracks, transforms=transforms, )
[docs] def with_features_path_prefix(self, path: Pathlike) -> "MixedCut": if not self.has_features: return self return MixedCut( id=self.id, tracks=[ fastcopy(t, cut=t.cut.with_features_path_prefix(path)) for t in self.tracks ], )
[docs] def with_recording_path_prefix(self, path: Pathlike) -> "MixedCut": if not self.has_recording: return self return MixedCut( id=self.id, tracks=[ fastcopy(t, cut=t.cut.with_recording_path_prefix(path)) for t in self.tracks ], )
@property def first_non_padding_cut(self) -> DataCut: return self._first_non_padding_track.cut @property def first_non_padding_track(self) -> MixTrack: return _get_first_non_padding_track(self) # Note: the private properties below are kept for backward compatibility. @property def _first_non_padding_cut(self) -> DataCut: return self.first_non_padding_cut @property def _first_non_padding_track(self) -> MixTrack: return self.first_non_padding_track
def _get_audible_tracks(mixed_cut: "MixedCut") -> List[MixTrack]: tracks = [track for track in mixed_cut.tracks if not track.mute] return tracks if tracks else mixed_cut.tracks def _get_first_non_padding_track(mixed_cut: "MixedCut") -> MixTrack: tracks = [ track for track in _get_audible_tracks(mixed_cut) if not isinstance(track.cut, PaddingCut) ] if tracks: return tracks[0] return _get_audible_tracks(mixed_cut)[0] def _get_snr_reference_track(mixed_cut: "MixedCut") -> Tuple[Optional[int], MixTrack]: for idx, track in enumerate(mixed_cut.tracks): if track.is_snr_reference: return idx, track for idx, track in enumerate(mixed_cut.tracks): if not isinstance(track.cut, PaddingCut) and track.snr is None: return idx, track raise ValueError( f"Cannot determine SNR reference track for MixedCut '{mixed_cut.id}'." ) def _ensure_explicit_snr_reference(tracks: List[MixTrack]) -> List[MixTrack]: if any(track.is_snr_reference for track in tracks): return tracks for idx, track in enumerate(tracks): if not isinstance(track.cut, PaddingCut) and track.snr is None: tracks[idx] = fastcopy(track, is_snr_reference=True) break return tracks def _scale_audio_for_snr( audio: np.ndarray, snr: Optional[Decibels], reference_energy: Optional[float], ) -> np.ndarray: if snr is None or reference_energy is None or reference_energy <= 0.0: return audio added_audio_energy = audio_energy(audio) if added_audio_energy <= 0.0: return audio target_energy = reference_energy * (10.0 ** (-snr / 10)) return np.sqrt(target_energy / added_audio_energy) * audio def _scale_features_for_snr( features: np.ndarray, feature_extractor: FeatureExtractor, snr: Optional[Decibels], reference_energy: Optional[float], ) -> np.ndarray: if snr is None or reference_energy is None or reference_energy <= 0.0: return features added_features_energy = feature_extractor.compute_energy(features) if added_features_energy <= 0.0: return features target_energy = reference_energy * (10.0 ** (-snr / 10)) return feature_extractor.scale(features, target_energy / added_features_energy) def _make_padding_cut(mixed_cut: "MixedCut") -> PaddingCut: return PaddingCut( id=str(uuid4()), duration=mixed_cut.duration, sampling_rate=mixed_cut.sampling_rate, feat_value=LOG_EPSILON, num_frames=mixed_cut.num_frames if mixed_cut.has_features else None, num_features=mixed_cut.num_features if mixed_cut.has_features else None, frame_shift=mixed_cut.frame_shift if mixed_cut.has_features else None, num_samples=mixed_cut.num_samples if mixed_cut.has_recording else None, video=mixed_cut.video if mixed_cut.has_video else None, ) def _to_unmixed_cut(mixed_cut: "MixedCut", tracks: List[MixTrack]) -> Cut: if not tracks: return _make_padding_cut(mixed_cut) tracks = _ensure_explicit_snr_reference([fastcopy(track) for track in tracks]) needs_reference = all(track.snr is not None for track in tracks) if needs_reference: _, reference_track = _get_snr_reference_track(mixed_cut) tracks.append(fastcopy(reference_track, is_snr_reference=True, mute=True)) cut = MixedCut( id=str(uuid4()), tracks=tracks, ) if cut.duration < mixed_cut.duration: cut = cut.pad(duration=mixed_cut.duration, preserve_id=True) return cut