Source code for lhotse.cut.base

from pathlib import Path
from typing import Callable, Dict, List, Literal, Optional, Set, Tuple

import numpy as np
import torch
from intervaltree import Interval, IntervalTree

from lhotse.audio import AudioSource, Recording, VideoInfo
from lhotse.audio.backend import save_audio
from lhotse.augmentation import AugmentFn
from lhotse.features import FeatureExtractor
from lhotse.supervision import SupervisionSegment
from lhotse.utils import (
    Decibels,
    Pathlike,
    Seconds,
    SetContainingAnything,
    add_durations,
    asdict_nonull,
    compute_num_samples,
    compute_num_windows,
    compute_start_duration_for_extended_cut,
    fastcopy,
    ifnone,
    is_torchaudio_available,
    overlaps,
    to_hashable,
)

# One of the design principles for Cuts is a maximally "lazy" implementation, e.g. when mixing Cuts,
# we'd rather sum the feature matrices only after somebody actually calls "load_features". It helps to avoid
# an excessive storage size for data augmented in various ways.


[docs] class Cut: """ .. caution:: :class:`~lhotse.cut.Cut` is just an abstract class -- the actual logic is implemented by its child classes (scroll down for references). :class:`~lhotse.cut.Cut` is a base class for audio cuts. An "audio cut" is a subset of a :class:`~lhotse.audio.Recording` -- it can also be thought of as a "view" or a pointer to a chunk of audio. It is not limited to audio data -- cuts may also point to (sub-spans of) precomputed :class:`~lhotse.features.base.Features`. Cuts are different from :class:`~lhotse.supervision.SupervisionSegment` in that they may be arbitrarily longer or shorter than supervisions; cuts may even contain multiple supervisions for creating contextual training data, and unsupervised regions that provide real or synthetic acoustic background context for the supervised segments. The following example visualizes how a cut may represent a part of a single-channel recording with two utterances and some background noise in between:: Recording |-------------------------------------------| "Hey, Matt!" "Yes?" "Oh, nothing" |----------| |----| |-----------| Cut1 |------------------------| This scenario can be represented in code, using :class:`~lhotse.cut.MonoCut`, as:: >>> from lhotse import Recording, SupervisionSegment, MonoCut >>> rec = Recording(id='rec1', duration=10.0, sampling_rate=8000, num_samples=80000, sources=[...]) >>> sups = [ ... SupervisionSegment(id='sup1', recording_id='rec1', start=0, duration=3.37, text='Hey, Matt!'), ... SupervisionSegment(id='sup2', recording_id='rec1', start=4.5, duration=0.9, text='Yes?'), ... SupervisionSegment(id='sup3', recording_id='rec1', start=6.9, duration=2.9, text='Oh, nothing'), ... ] >>> cut = MonoCut(id='rec1-cut1', start=0.0, duration=6.0, channel=0, recording=rec, ... supervisions=[sups[0], sups[1]]) .. note:: All Cut classes assume that the :class:`~lhotse.supervision.SupervisionSegment` time boundaries are relative to the beginning of the cut. E.g. if the underlying :class:`~lhotse.audio.Recording` starts at 0s (always true), the cut starts at 100s, and the SupervisionSegment inside the cut starts at 3s, it really did start at 103rd second of the recording. In some cases, the supervision might have a negative start, or a duration exceeding the duration of the cut; this means that the supervision in the recording extends beyond the cut. Cut allows to check and read audio data or features data:: >>> assert cut.has_recording >>> samples = cut.load_audio() >>> if cut.has_features: ... feats = cut.load_features() It can be visualized, and listened to, inside Jupyter Notebooks:: >>> cut.plot_audio() >>> cut.play_audio() >>> cut.plot_features() Cuts can be used with Lhotse's :class:`~lhotse.features.base.FeatureExtractor` to compute features. >>> from lhotse import Fbank >>> feats = cut.compute_features(extractor=Fbank()) It is also possible to use a :class:`~lhotse.features.io.FeaturesWriter` to store the features and attach their manifest to a copy of the cut:: >>> from lhotse import LilcomChunkyWriter >>> with LilcomChunkyWriter('feats.lca') as storage: ... cut_with_feats = cut.compute_and_store_features( ... extractor=Fbank(), ... storage=storage ... ) Cuts have several methods that allow their manipulation, transformation, and mixing. Some examples (see the respective methods documentation for details):: >>> cut_2_to_4s = cut.truncate(offset=2, duration=2) >>> cut_padded = cut.pad(duration=10.0) >>> cut_extended = cut.extend_by(duration=5.0, direction='both') >>> cut_mixed = cut.mix(other_cut, offset_other_by=5.0, snr=20) >>> cut_append = cut.append(other_cut) >>> cut_24k = cut.resample(24000) >>> cut_sp = cut.perturb_speed(1.1) >>> cut_vp = cut.perturb_volume(2.) >>> cut_rvb = cut.reverb_rir(rir_recording) .. note:: All cut transformations are performed lazily, on-the-fly, upon calling ``load_audio`` or ``load_features``. The stored waveforms and features are untouched. .. caution:: Operations on cuts are not mutating -- they return modified copies of :class:`.Cut` objects, leaving the original object unmodified. A :class:`.Cut` that contains multiple segments (:class:`SupervisionSegment`) can be decayed into smaller cuts that correspond directly to supervisions:: >>> smaller_cuts = cut.trim_to_supervisions() Cuts can be detached from parts of their metadata:: >>> cut_no_feat = cut.drop_features() >>> cut_no_rec = cut.drop_recording() >>> cut_no_sup = cut.drop_supervisions() Finally, cuts provide convenience methods to compute feature frame and audio sample masks for supervised regions:: >>> sup_frames = cut.supervisions_feature_mask() >>> sup_samples = cut.supervisions_audio_mask() See also: - :class:`lhotse.cut.MonoCut` - :class:`lhotse.cut.MixedCut` - :class:`lhotse.cut.CutSet` """ # The following is the list of members and properties implemented by the child classes. # They are not abstract properties because dataclasses do not work well with the "abc" module. id: str start: Seconds duration: Seconds sampling_rate: int supervisions: List[SupervisionSegment] num_samples: Optional[int] num_frames: Optional[int] num_features: Optional[int] frame_shift: Optional[Seconds] features_type: Optional[str] has_recording: bool has_features: bool has_video: bool video: Optional[VideoInfo] # The following is the list of methods implemented by the child classes. # They are not abstract methods because dataclasses do not work well with the "abc" module. # Check a specific child class for their documentation. from_dict: Callable[[Dict], "Cut"] load_audio: Callable[[], np.ndarray] load_video: Callable[[], Tuple[torch.Tensor, Optional[torch.Tensor]]] load_features: Callable[[], np.ndarray] compute_and_store_features: Callable drop_features: Callable drop_recording: Callable drop_supervisions: Callable truncate: Callable pad: Callable extend_by: Callable resample: Callable perturb_speed: Callable perturb_tempo: Callable perturb_volume: Callable reverb_rir: Callable map_supervisions: Callable merge_supervisions: Callable filter_supervisions: Callable fill_supervision: Callable with_features_path_prefix: Callable with_recording_path_prefix: Callable @property def end(self) -> Seconds: return add_durations( self.start, self.duration, sampling_rate=self.sampling_rate )
[docs] def to_dict(self) -> dict: d = asdict_nonull(self) return {**d, "type": type(self).__name__}
@property def has_overlapping_supervisions(self) -> bool: if len(self.supervisions) < 2: return False from cytoolz import sliding_window for left, right in sliding_window( 2, sorted(self.supervisions, key=lambda s: s.start) ): if overlaps(left, right): return True return False @property def trimmed_supervisions(self) -> List[SupervisionSegment]: """ Return the supervisions in this Cut that have modified time boundaries so as not to exceed the Cut's start or end. Note that when ``cut.supervisions`` is called, the supervisions may have negative ``start`` values that indicate the supervision actually begins before the cut, or ``end`` values that exceed the Cut's duration (it means the supervision continued in the original recording after the Cut's ending). .. caution:: For some tasks such as speech recognition (ASR), trimmed supervisions could result in corrupted training data. This is because a part of the transcript might actually reside outside of the cut. """ return [s.trim(self.duration) for s in self.supervisions]
[docs] def split(self, timestamp: Seconds) -> Tuple["Cut", "Cut"]: """ Split a cut into two cuts at ``timestamp``, which is measured from the start of the cut. For example, a [0s - 10s] cut split at 4s yields: - left cut [0s - 4s] - right cut [4s - 10s] """ assert 0 < timestamp < self.duration, f"0 < {timestamp} < {self.duration}" left = self.truncate(duration=timestamp) right = self.truncate(offset=timestamp) return left, right
[docs] def mix( self, other: "Cut", offset_other_by: Seconds = 0.0, allow_padding: bool = False, snr: Optional[Decibels] = None, preserve_id: Optional[str] = None, ) -> "Cut": """Refer to :function:`~lhotse.cut.mix` documentation.""" from .set import mix return mix( self, other, offset=offset_other_by, allow_padding=allow_padding, snr=snr, preserve_id=preserve_id, )
[docs] def append( self, other: "Cut", snr: Optional[Decibels] = None, preserve_id: Optional[str] = None, ) -> "Cut": """ Append the ``other`` Cut after the current Cut. Conceptually the same as ``mix`` but with an offset matching the current cuts length. Optionally scale down (positive SNR) or scale up (negative SNR) the ``other`` cut. Returns a MixedCut, which only keeps the information about the mix; actual mixing is performed during the call to ``load_features``. :param preserve_id: optional string ("left", "right"). When specified, append will preserve the cut ID of the left- or right-hand side argument. Otherwise, a new random ID is generated. """ from .set import mix return mix(self, other, offset=self.duration, snr=snr, preserve_id=preserve_id)
[docs] def compute_features( self, extractor: FeatureExtractor, augment_fn: Optional[AugmentFn] = None, ) -> np.ndarray: """ Compute the features from this cut. This cut has to be able to load audio. :param extractor: a ``FeatureExtractor`` instance used to compute the features. :param augment_fn: optional ``WavAugmenter`` instance for audio augmentation. :return: a numpy ndarray with the computed features. """ samples = self.load_audio() if augment_fn is not None: samples = augment_fn(samples, self.sampling_rate) return extractor.extract(samples, self.sampling_rate)
[docs] def plot_audio(self): """ Display a plot of the waveform. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt samples = self.load_audio().squeeze() fig, ax = plt.subplots() ax.plot(np.linspace(0, self.duration, len(samples)), samples) for supervision in self.supervisions: supervision = supervision.trim(self.duration) ax.axvspan(supervision.start, supervision.end, color="green", alpha=0.1) return ax
[docs] def play_audio(self): """ Display a Jupyter widget that allows to listen to the waveform. Works only in Jupyter notebook/lab or similar (e.g. Colab). """ from IPython.display import Audio samples = self.load_audio().squeeze() return Audio(samples, rate=self.sampling_rate)
[docs] def plot_features(self): """ Display the feature matrix as an image. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt features = np.flip(self.load_features().transpose(1, 0), 0) return plt.matshow(features)
[docs] def plot_alignment(self, alignment_type: str = "word"): """ Display the alignment on top of a spectrogram. Requires matplotlib to be installed. """ import matplotlib.pyplot as plt from lhotse import Fbank from lhotse.utils import compute_num_frames assert ( len(self.supervisions) == 1 ), "Cannot plot alignment: there has to be exactly one supervision in a Cut." sup = self.supervisions[0] assert ( sup.alignment is not None and alignment_type in sup.alignment ), f"Cannot plot alignment: missing alignment field or alignment type '{alignment_type}'" fbank = Fbank() sampling_rate = fbank.extractor.sampling_rate feats = self.resample(sampling_rate).compute_features(fbank) speaker = sup.speaker or "<unknown>" language = sup.language or "<unknown>" fig = plt.matshow(np.flip(feats.transpose(1, 0), 0)) plt.title( "Cut ID:" + self.id + ", Speaker:" + speaker + ", Language:" + language ) plt.tick_params( axis="both", which="major", labelbottom=True, labeltop=False, bottom=True, top=False, ) for idx, item in enumerate(sup.alignment[alignment_type]): is_even = bool(idx % 2) end_frame = compute_num_frames( item.end, frame_shift=fbank.frame_shift, sampling_rate=sampling_rate, ) plt.text( end_frame - 4, 70 if is_even else 45, item.symbol, fontsize=12, color="w", rotation="vertical", ) plt.axvline(end_frame, color="k") plt.show()
[docs] def trim_to_supervisions( self, keep_overlapping: bool = True, min_duration: Optional[Seconds] = None, context_direction: Literal["center", "left", "right", "random"] = "center", keep_all_channels: bool = False, ) -> "CutSet": # noqa: F821 """ Splits the current :class:`.Cut` into as many cuts as there are supervisions (:class:`.SupervisionSegment`). These cuts have identical start times and durations as the supervisions. When there are overlapping supervisions, they can be kept or discarded via ``keep_overlapping`` flag. For example, the following cut:: Cut |-----------------| Sup1 |----| Sup2 |-----------| is transformed into two cuts:: Cut1 |----| Sup1 |----| Sup2 |-| Cut2 |-----------| Sup1 |-| Sup2 |-----------| For the case of a multi-channel cut with multiple supervisions, we can either trim while respecting the supervision channels (in which case output cut has the same channels as the supervision) or ignore the channels (in which case output cut has the same channels as the input cut). .. hint:: If the resulting trimmed cut contains a single supervision, we set the cut id to the ``id`` of this supervision, for better compatibility with downstream tools, e.g. comparing the hypothesis of ASR with the reference in icefall. .. hint:: If a MultiCut is trimmed and the resulting trimmed cut contains a single channel, we convert it to a MonoCut. :param keep_overlapping: when ``False``, it will discard parts of other supervisions that overlap with the main supervision. In the illustration above, it would discard ``Sup2`` in ``Cut1`` and ``Sup1`` in ``Cut2``. In this mode, we guarantee that there will always be exactly one supervision per cut. :param min_duration: An optional duration in seconds; specifying this argument will extend the cuts that would have been shorter than ``min_duration`` with actual acoustic context in the recording/features. If there are supervisions present in the context, they are kept when ``keep_overlapping`` is true. If there is not enough context, the returned cut will be shorter than ``min_duration``. If the supervision segment is longer than ``min_duration``, the return cut will be longer. :param context_direction: Which direction should the cut be expanded towards to include context. The value of "center" implies equal expansion to left and right; random uniformly samples a value between "left" and "right". :param keep_all_channels: If ``True``, the output cut will have the same channels as the input cut. By default, the trimmed cut will have the same channels as the supervision. :return: a list of cuts. """ from .mixed import MixedCut from .multi import MultiCut from .set import CutSet cuts = [] supervisions_index = self.index_supervisions(index_mixed_tracks=True) for segment in self.supervisions: if min_duration is None: # Cut boundaries are equal to the supervision segment boundaries. new_start, new_duration = segment.start, segment.duration else: # Cut boundaries will be extended with some acoustic context. new_start, new_duration = compute_start_duration_for_extended_cut( start=segment.start, duration=segment.duration, new_duration=min_duration, direction=context_direction, ) trimmed = self.truncate( offset=new_start, duration=new_duration, keep_excessive_supervisions=keep_overlapping, _supervisions_index=supervisions_index, ) if not keep_overlapping: # Ensure that there is exactly one supervision per cut. trimmed = trimmed.filter_supervisions(lambda s: s.id == segment.id) if not keep_all_channels and not isinstance(trimmed, MixedCut): # For MixedCut, we can't change the channels since it is defined by the # number of channels in underlying tracks. # Ensure that all supervisions have the same channel. assert ( len(set(to_hashable(s.channel) for s in trimmed.supervisions)) == 1 ), ( "Trimmed cut has supervisions with different channels. Either set " "`keep_all_channels=True` to keep original channels or `keep_overlapping=False` " "to retain only 1 supervision per trimmed cut." ) trimmed.channel = trimmed.supervisions[0].channel # If we have a single-channel MultiCut, we will convert it into a MonoCut. if isinstance(trimmed, MultiCut) and trimmed.num_channels == 1: trimmed = trimmed.to_mono()[0] if len(trimmed.supervisions) == 1: # If the trimmed cut contains a single supervision, we set the cut id to # the id of this supervision. trimmed.id = segment.id cuts.append(trimmed) return CutSet.from_cuts(cuts)
[docs] def trim_to_alignments( self, type: str, max_pause: Optional[Seconds] = None, max_segment_duration: Optional[Seconds] = None, delimiter: str = " ", keep_all_channels: bool = False, ) -> "CutSet": # noqa: F821 """ Splits the current :class:`.Cut` into its constituent alignment items (:class:`.AlignmentItem`). These cuts have identical start times and durations as the alignment item. Additionally, the `max_pause` option can be used to merge alignment items that are separated by a pause shorter than `max_pause`. If `max_segment_duration` is specified, we will keep merging consecutive segments until the duration of the merged segment exceeds `max_segment_duration`. For the case of a multi-channel cut with multiple alignments, we can either trim while respecting the supervision channels (in which case output cut has the same channels as the supervision) or ignore the channels (in which case output cut has the same channels as the input cut). .. hint:: If the resulting trimmed cut contains a single supervision, we set the cut id to the ``id`` of this supervision, for better compatibility with downstream tools, e.g. comparing the hypothesis of ASR with the reference in icefall. .. hint:: If a MultiCut is trimmed and the resulting trimmed cut contains a single channel, we convert it to a MonoCut. .. hint:: If you have a Cut with multiple supervision segments and you want to trim it to the word-level alignment, you can use the :meth:`.Cut.merge_supervisions` method first to merge the supervisions into a single one, followed by the :meth:`.Cut.trim_to_alignments` method. For example:: >>> cut = cut.merge_supervisions(type='word', delimiter=' ') >>> cut = cut.trim_to_alignments(type='word', max_pause=1.0) .. hint:: The above technique can also be used to segment long cuts into roughly equal duration segments, while respecting alignment boundaries. For example, to split a Cut into 10s segments, you can do:: >>> cut = cut.merge_supervisions(type='word', delimiter=' ') >>> cut = cut.trim_to_alignments(type='word', max_pause=10.0, max_segment_duration=10.0) :param type: The type of the alignment to trim to (e.g. "word"). :param max_pause: The maximum pause allowed between the alignments to merge them. If ``None``, no merging will be performed. [default: None] :param delimiter: The delimiter to use when joining the alignment items. :param keep_all_channels: If ``True``, the output cut will have the same channels as the input cut. By default, the trimmed cut will have the same channels as the supervision. :param num_jobs: Number of parallel workers to process the cuts. :return: a CutSet object. """ from lhotse.supervision import AlignmentItem if max_pause is None: # Set to a negative value so that no merging is performed. max_pause = -1.0 if max_segment_duration is None: # Set to the cut duration so that resulting segments are always smaller. max_segment_duration = self.duration # For the implementation, we first create new supervisions for the cut, and then # use the `trim_to_supervisions` method to do the actual trimming. new_supervisions = [] for segment in self.supervisions: if ( segment.alignment is None or type not in segment.alignment or not segment.alignment[type] ): continue alignments = sorted(segment.alignment[type], key=lambda a: a.start) # Merge the alignments if needed. We also keep track of the indices of the # merged alignments in the original list. This is needed to create the # `alignment` field in the new supervisions. # NOTE: We use the `AlignmentItem` class here for convenience --- the merged # alignments are not actual alignment items, but rather just a way to keep # track of merged segments. merged_alignments = [(alignments[0], [0])] for i, item in enumerate(alignments[1:]): # If alignment item is blank, skip it. Sometimes, blank alignment items # are used to denote pauses in the utterance. if item.symbol.strip() == "": continue prev_item, prev_indices = merged_alignments[-1] if ( item.start - prev_item.end <= max_pause and item.end - prev_item.start <= max_segment_duration ): new_item = AlignmentItem( symbol=delimiter.join([prev_item.symbol, item.symbol]), start=prev_item.start, duration=item.end - prev_item.start, ) merged_alignments[-1] = (new_item, prev_indices + [i + 1]) else: merged_alignments.append((item, [i + 1])) # Create new supervisions for the merged alignments. for i, (item, indices) in enumerate(merged_alignments): new_supervisions.append( SupervisionSegment( id=f"{segment.id}-{i}", recording_id=segment.recording_id, start=item.start - self.start, # relative to the cut duration=item.duration, channel=segment.channel, text=item.symbol, language=segment.language, speaker=segment.speaker, gender=segment.gender, alignment={type: [alignments[j] for j in indices]}, ) ) # Create a copy of this CutSet so that original supervisions do not get modified. new_cuts = fastcopy(self, supervisions=new_supervisions) return new_cuts.trim_to_supervisions( keep_overlapping=False, keep_all_channels=keep_all_channels, )
[docs] def trim_to_supervision_groups( self, max_pause: Seconds = 0.0, ) -> "CutSet": # noqa: F821 """ Return a new CutSet with Cuts based on supervision groups. A supervision group is a set of supervisions with no gaps between them (or gaps shorter than ``max_pause``). This is similar to the concept of an `utterance group` as described in this paper: https://arxiv.org/abs/2211.00482 For example, the following cut:: Cut ╔═════════════════════════════════════════════════════════════════════════════════╗ ║┌──────────────────────┐ ┌────────┐ ║ ║│ Hello this is John. │ │ Hi │ ║ ║└──────────────────────┘ └────────┘ ║ ║ ┌──────────────────────────────────┐ ┌───────────────────┐║ ║ │ Hey, John. How are you? │ │ What do you do? │║ ║ └──────────────────────────────────┘ └───────────────────┘║ ╚═════════════════════════════════════════════════════════════════════════════════╝ is transformed into two cuts:: Cut 1 Cut 2 ╔════════════════════════════════════════════════╗ ╔═══════════════════════════╗ ║┌──────────────────────┐ ║ ║┌────────┐ ║ ║│ Hello this is John. │ ║ ║│ Hi │ ║ ║└──────────────────────┘ ║ ║└────────┘ ║ ║ ┌──────────────────────────────────┐║ ║ ┌───────────────────┐║ ║ │ Hey, John. How are you? │║ ║ │ What do you do? │║ ║ └──────────────────────────────────┘║ ║ └───────────────────┘║ ╚════════════════════════════════════════════════╝ ╚═══════════════════════════╝ For the case of a multi-channel cut with multiple supervisions, we keep all the channels in the recording. :param max_pause: An optional duration in seconds; if the gap between two supervisions is longer than this, they will be treated as separate groups. By default, this is set to 0.0, which means that no gaps are allowed between supervisions. :return: a ``CutSet``. """ from .set import CutSet if not self.supervisions: return CutSet([self]) supervisions = sorted(self.supervisions, key=lambda s: s.start) supervision_group = [supervisions[0]] cur_end = supervisions[0].end new_cuts = [] group_idx = 0 for sup in supervisions[1:]: if sup.start - cur_end <= max_pause: supervision_group.append(sup) cur_end = max(cur_end, sup.end) else: offset = supervision_group[0].start duration = add_durations( cur_end, -offset, sampling_rate=self.sampling_rate ) new_cuts.append( self.truncate( offset=offset, duration=duration, keep_excessive_supervisions=False, ).with_id(f"{self.id}-{max_pause}-{group_idx}") ) group_idx += 1 supervision_group = [sup] cur_end = sup.end # Add the last group. if len(supervision_group) > 0: offset = supervision_group[0].start duration = add_durations(cur_end, -offset, sampling_rate=self.sampling_rate) new_cuts.append( self.truncate( offset=offset, duration=duration, keep_excessive_supervisions=False, ).with_id(f"{self.id}-{max_pause}-{group_idx}") ) # The total number of supervisions should be the same. assert sum(len(c.supervisions) for c in new_cuts) == len(self.supervisions), ( "The total number of supervisions decreased after trimming to supervision groups.\n" f"{sum(len(c.supervisions) for c in new_cuts)} != {len(self.supervisions)}\n" "This is likely a bug. Please report it here: https://github.com/lhotse-speech/lhotse/issues/new, " "and provide the following information:\n" f"original cut: {self},\n" f"new cuts: {new_cuts}" ) return CutSet.from_cuts(new_cuts)
[docs] def cut_into_windows( self, duration: Seconds, hop: Optional[Seconds] = None, keep_excessive_supervisions: bool = True, ) -> "CutSet": # noqa: F821 """ Return a list of shorter cuts, made by traversing this cut in windows of ``duration`` seconds by ``hop`` seconds. The last window might have a shorter duration if there was not enough audio, so you might want to use either filter or pad the results. :param duration: Desired duration of the new cuts in seconds. :param hop: Shift between the windows in the new cuts in seconds. :param keep_excessive_supervisions: bool. When a cut is truncated in the middle of a supervision segment, should the supervision be kept. :return: a list of cuts made from shorter duration windows. """ from .set import CutSet if not hop: hop = duration if self.has_video: assert (duration * self.video.fps).is_integer(), ( f"[cut.id={self.id}] Window duration must be defined to result in an integer number of video frames " f"(duration={duration} * fps={self.video.fps} = {duration * self.video.fps})." ) assert (hop * self.video.fps).is_integer(), ( "[cut.id={self.id}] Window hop must be defined to result in an integer number of video frames " f"(hop={hop} * fps={self.video.fps} = {hop* self.video.fps})." ) new_cuts = [] n_windows = compute_num_windows(self.duration, duration, hop) # Operation without an Interval Tree is O(nm), where `n` is the number # of resulting windows, and `m` is the number of supervisions in the cut. # With an Interval Tree, we can do it in O(nlog(m) + m) supervisions_index = self.index_supervisions(index_mixed_tracks=True) for i in range(n_windows): new_cuts.append( self.truncate( offset=hop * i, duration=duration, keep_excessive_supervisions=keep_excessive_supervisions, _supervisions_index=supervisions_index, ).with_id(f"{self.id}-{i}") ) return CutSet.from_cuts(new_cuts)
[docs] def index_supervisions( self, index_mixed_tracks: bool = False, keep_ids: Optional[Set[str]] = None ) -> Dict[str, IntervalTree]: """ Create a two-level index of supervision segments. It is a mapping from a Cut's ID to an interval tree that contains the supervisions of that Cut. The interval tree can be efficiently queried for overlapping and/or enveloping segments. It helps speed up some operations on Cuts of very long recordings (1h+) that contain many supervisions. :param index_mixed_tracks: Should the tracks of MixedCut's be indexed as additional, separate entries. :param keep_ids: If specified, we will only index the supervisions with the specified IDs. :return: a mapping from Cut ID to an interval tree of SupervisionSegments. """ from .mixed import MixedCut keep_ids = ifnone(keep_ids, SetContainingAnything()) indexed = { self.id: IntervalTree( Interval(s.start, s.end, s) for s in self.supervisions if s.id in keep_ids ) } if index_mixed_tracks: if isinstance(self, MixedCut): for track in self.tracks: indexed[track.cut.id] = IntervalTree( Interval(s.start, s.end, s) for s in track.cut.supervisions if s.id in keep_ids ) return indexed
[docs] def save_audio( self, storage_path: Pathlike, format: Optional[str] = None, encoding: Optional[str] = None, augment_fn: Optional[AugmentFn] = None, **kwargs, ) -> "Cut": """ Store this cut's waveform as audio recording to disk. :param storage_path: The path to location where we will store the audio recordings. :param format: Audio format argument supported by ``torchaudio.save`` or ``soundfile.write``. Please refer to the relevant library's documentation depending on which audio backend you're using. :param encoding: Audio encoding argument supported by ``torchaudio.save`` or ``soundfile.write``. Please refer to the relevant library's documentation depending on which audio backend you're using. :param augment_fn: an optional callable used for audio augmentation. Be careful with the types of augmentations used: if they modify the start/end/duration times of the cut and its supervisions, you will end up with incorrect supervision information when using this API. E.g. for speed perturbation, use ``CutSet.perturb_speed()`` instead. :param kwargs: additional arguments passed to ``Cut.load_audio()``. Example, if saving a MixedCut, we can specify `mono_downmix=True` to downmix the tracks to mono before saving. :return: a new Cut instance. """ storage_path = Path(storage_path) samples = self.load_audio(**kwargs) # TODO(@desh2608): Here, `samples` may be a list of arrays (for example, the case # when cut is a MixedCut and we specify `mixed=False`). For such a case, we should # save all tracks separately. if augment_fn is not None: samples = augment_fn(samples, self.sampling_rate) save_audio( storage_path, samples, sampling_rate=self.sampling_rate, format=format, encoding=encoding, ) recording = Recording( id=storage_path.stem, sampling_rate=self.sampling_rate, num_samples=samples.shape[1], duration=samples.shape[1] / self.sampling_rate, sources=[ AudioSource( type="file", channels=list(range(self.num_channels)), source=str(storage_path), ) ], ) return fastcopy( recording.to_cut(), id=self.id, supervisions=self.supervisions, custom=self.custom if hasattr(self, "custom") else None, features=self.features if self.has_features else None, )
[docs] def speakers_feature_mask( self, min_speaker_dim: Optional[int] = None, speaker_to_idx_map: Optional[Dict[str, int]] = None, use_alignment_if_exists: Optional[str] = None, ) -> np.ndarray: """ Return a matrix of per-speaker activity in a cut. The matrix shape is (num_speakers, num_frames), and its values are 0 for nonspeech **frames** and 1 for speech **frames** for each respective speaker. This is somewhat inspired by the TS-VAD setup: https://arxiv.org/abs/2005.07272 :param min_speaker_dim: optional int, when specified it will enforce that the matrix shape is at least that value (useful for datasets like CHiME 6 where the number of speakers is always 4, but some cuts might have less speakers than that). :param speaker_to_idx_map: optional dict mapping speaker names (strings) to their global indices (ints). Useful when you want to preserve the order of the speakers (e.g. speaker XYZ is always mapped to index 2) :param use_alignment_if_exists: optional str, key for alignment type to use for generating the mask. If not exists, fall back on supervision time spans. """ assert self.has_features, ( f"No features available. " f"Can't compute supervisions feature mask for cut with ID: {self.id}." ) if speaker_to_idx_map is None: speaker_to_idx_map = { spk: idx for idx, spk in enumerate( sorted(set(s.speaker for s in self.supervisions)) ) } num_speakers = len(speaker_to_idx_map) if min_speaker_dim is not None: num_speakers = min(min_speaker_dim, num_speakers) mask = np.zeros((num_speakers, self.num_frames)) for supervision in self.supervisions: speaker_idx = speaker_to_idx_map[supervision.speaker] if ( use_alignment_if_exists and supervision.alignment and use_alignment_if_exists in supervision.alignment ): for ali in supervision.alignment[use_alignment_if_exists]: st = round(ali.start / self.frame_shift) if ali.start > 0 else 0 et = ( round(ali.end / self.frame_shift) if ali.end < self.duration else self.num_frames ) mask[speaker_idx, st:et] = 1 else: st = ( round(supervision.start / self.frame_shift) if supervision.start > 0 else 0 ) et = ( round(supervision.end / self.frame_shift) if supervision.end < self.duration else self.num_frames ) mask[speaker_idx, st:et] = 1 return mask
[docs] def speakers_audio_mask( self, min_speaker_dim: Optional[int] = None, speaker_to_idx_map: Optional[Dict[str, int]] = None, use_alignment_if_exists: Optional[str] = None, ) -> np.ndarray: """ Return a matrix of per-speaker activity in a cut. The matrix shape is (num_speakers, num_samples), and its values are 0 for nonspeech **samples** and 1 for speech **samples** for each respective speaker. This is somewhat inspired by the TS-VAD setup: https://arxiv.org/abs/2005.07272 :param min_speaker_dim: optional int, when specified it will enforce that the matrix shape is at least that value (useful for datasets like CHiME 6 where the number of speakers is always 4, but some cuts might have less speakers than that). :param speaker_to_idx_map: optional dict mapping speaker names (strings) to their global indices (ints). Useful when you want to preserve the order of the speakers (e.g. speaker XYZ is always mapped to index 2) :param use_alignment_if_exists: optional str, key for alignment type to use for generating the mask. If not exists, fall back on supervision time spans. """ assert self.has_recording, ( f"No recording available. " f"Can't compute supervisions audio mask for cut with ID: {self.id}." ) if speaker_to_idx_map is None: speaker_to_idx_map = { spk: idx for idx, spk in enumerate( sorted(set(s.speaker for s in self.supervisions)) ) } num_speakers = len(speaker_to_idx_map) if min_speaker_dim is not None: num_speakers = min(min_speaker_dim, num_speakers) mask = np.zeros((num_speakers, self.num_samples)) for supervision in self.supervisions: speaker_idx = speaker_to_idx_map[supervision.speaker] if ( use_alignment_if_exists and supervision.alignment and use_alignment_if_exists in supervision.alignment ): for ali in supervision.alignment[use_alignment_if_exists]: st = ( compute_num_samples(ali.start, self.sampling_rate) if ali.start > 0 else 0 ) et = ( compute_num_samples(ali.end, self.sampling_rate) if ali.end < self.duration else compute_num_samples(self.duration, self.sampling_rate) ) mask[speaker_idx, st:et] = 1 else: st = ( compute_num_samples(supervision.start, self.sampling_rate) if supervision.start > 0 else 0 ) et = ( compute_num_samples(supervision.end, self.sampling_rate) if supervision.end < self.duration else compute_num_samples(self.duration, self.sampling_rate) ) mask[speaker_idx, st:et] = 1 return mask
[docs] def supervisions_feature_mask( self, use_alignment_if_exists: Optional[str] = None ) -> np.ndarray: """ Return a 1D numpy array with value 1 for **frames** covered by at least one supervision, and 0 for **frames** not covered by any supervision. :param use_alignment_if_exists: optional str, key for alignment type to use for generating the mask. If not exists, fall back on supervision time spans. """ from .set import compute_supervisions_frame_mask return compute_supervisions_frame_mask( self, use_alignment_if_exists=use_alignment_if_exists )
[docs] def supervisions_audio_mask( self, use_alignment_if_exists: Optional[str] = None ) -> np.ndarray: """ Return a 1D numpy array with value 1 for **samples** covered by at least one supervision, and 0 for **samples** not covered by any supervision. :param use_alignment_if_exists: optional str, key for alignment type to use for generating the mask. If not exists, fall back on supervision time spans. """ assert self.has_recording, ( f"No recording available. " f"Can't compute supervisions audio mask for cut with ID: {self.id}." ) mask = np.zeros(self.num_samples, dtype=np.float32) for supervision in self.supervisions: if ( use_alignment_if_exists and supervision.alignment and use_alignment_if_exists in supervision.alignment ): for ali in supervision.alignment[use_alignment_if_exists]: st = round(ali.start * self.sampling_rate) if ali.start > 0 else 0 et = ( round(ali.end * self.sampling_rate) if ali.end < self.duration else round(self.duration * self.sampling_rate) ) mask[st:et] = 1.0 else: st = ( round(supervision.start * self.sampling_rate) if supervision.start > 0 else 0 ) et = ( round(supervision.end * self.sampling_rate) if supervision.end < self.duration else round(self.duration * self.sampling_rate) ) mask[st:et] = 1.0 return mask
[docs] def with_id(self, id_: str) -> "Cut": """Return a copy of the Cut with a new ID.""" return fastcopy(self, id=id_)