Source code for lhotse.cut.multi

import logging
import warnings
from dataclasses import dataclass
from functools import reduce
from itertools import groupby
from operator import add
from typing import Any, Callable, Iterable, List, Optional, Union

import numpy as np

from lhotse.audio import Recording
from lhotse.cut.data import DataCut
from lhotse.features import Features
from lhotse.supervision import SupervisionSegment
from lhotse.utils import (
    add_durations,
    fastcopy,
    is_equal_or_contains,
    merge_items_with_delimiter,
    overlaps,
    rich_exception_info,
    to_list,
)


[docs]@dataclass class MultiCut(DataCut): """ :class:`~lhotse.cut.MultiCut` is a :class:`~lhotse.cut.Cut` that is analogous to the MonoCut. While MonoCut represents a single channel of a recording, MultiCut represents multi-channel recordings where supervisions may or may not be shared across channels. It is intended to be used to store, for example, segments of a microphone array recording. The following diagrams illustrate some examples for MultiCut usage: >>> 2-channel telephone recording with 2 supervisions, one for each channel (e.g., Switchboard): ╔══════════════════════════════ MultiCut ═════════════════╗ ║ ┌──────────────────────────┐ ║ Channel 1 ──╬─│ Hello this is John. │──────────────────────────────╬──────── ║ └──────────────────────────┘ ║ ║ ┌──────────────────────────┐║ Channel 2 ──╬───────────────────────────────│ Hey, John. How are you? │╠──────── ║ └──────────────────────────┘║ ╚═══════════════════════════════════════════════════════════╝ >>> Multi-array multi-microphone recording with shared supervisions (e.g., CHiME-6), along with close-talk microphones (A and B are distant arrays, C is close-talk): ╔═══════════════════════════════════════════════════════════════════════════╗ ║ ┌───────────────────┐ ┌───────────────────┐ ║ A-1 ──╬─┤ ├─────────────────────────┤ ├───────╬─ ║ │ What did you do? │ │I cleaned my room. │ ║ A-2 ──╬─┤ ├─────────────────────────┤ ├───────╬─ ║ └───────────────────┘ ┌───────────────────┐ └───────────────────┘ ║ B-1 ──╬────────────────────────┤Yeah, we were going├──────────────────────────────╬─ ║ │ to the mall. │ ║ B-2 ──╬────────────────────────┤ ├──────────────────────────────╬─ ║ └───────────────────┘ ┌───────────────────┐ ║ C ──╬─────────────────────────────────────────────────────┤ Right. ├─╬─ ║ └───────────────────┘ ║ ╚════════════════════════════════ MultiCut ═══════════════════════════════╝ By definition, a MultiCut has the same attributes as a MonoCut. The key difference is that the Recording object has multiple channels, and the Supervision objects may correspond to any of these channels. The channels that the MultiCut can be a subset of the Recording channels, but must be a superset of the Supervision channels. See also: - :class:`lhotse.cut.Cut` - :class:`lhotse.cut.MonoCut` - :class:`lhotse.cut.CutSet` - :class:`lhotse.cut.MixedCut` """ channel: List[int] @property def num_channels(self) -> int: return len(to_list(self.channel))
[docs] @rich_exception_info def load_features( self, channel: Optional[Union[int, List[int]]] = None ) -> Optional[np.ndarray]: """ Load the features from the underlying storage and cut them to the relevant [begin, duration] region of the current MultiCut. :param channel: The channel to load the features for. If None, all channels will be loaded. """ if self.has_features: feats = self.features.load( start=self.start, duration=self.duration, channel_id=self.channel if channel is None else channel, ) # Note: we forgive off-by-one errors in the feature matrix frames # due to various hard-to-predict floating point arithmetic issues. # If needed, we will remove or duplicate the last frame to be # consistent with the manifests declared "num_frames". if feats.shape[0] - self.num_frames == 1: feats = feats[: self.num_frames, ...] elif feats.shape[0] - self.num_frames == -1: feats = np.concatenate((feats, feats[-1:, ...]), axis=0) return feats return None
[docs] @rich_exception_info def load_audio( self, channel: Optional[Union[int, List[int]]] = None ) -> Optional[np.ndarray]: """ Load the audio by locating the appropriate recording in the supplied Recording. The audio is trimmed to the [begin, end] range specified by the MultiCut. :return: a numpy ndarray with audio samples, with shape (C <channel>, N <samples>) """ if self.has_recording: return self.recording.load_audio( channels=self.channel if channel is None else channel, offset=self.start, duration=self.duration, ) return None
[docs] def reverb_rir( self, rir_recording: Optional["Recording"] = None, normalize_output: bool = True, early_only: bool = False, affix_id: bool = True, rir_channels: List[int] = [0], ) -> "MultiCut": """ Return a new ``MultiCut`` that will convolve the audio with the provided impulse response. If the `rir_recording` is multi-channel, the `rir_channels` argument determines which channels will be used. This list must be of the same length as the number of channels in the `MultiCut`. If no ``rir_recording`` is provided, we will generate an impulse response using a fast random generator (https://arxiv.org/abs/2208.04101), only if the MultiCut has exactly one channel. At the moment we do not support simulation of multi-channel impulse responses. :param rir_recording: The impulse response to use for convolving. :param normalize_output: When true, output will be normalized to have energy as input. :param early_only: When true, only the early reflections (first 50 ms) will be used. :param affix_id: When true, we will modify the ``MonoCut.id`` field by affixing it with "_rvb". :param rir_channels: The channels of the impulse response to use. First channel is used by default. If multiple channels are specified, this will produce a MixedCut instead of a MonoCut. :return: a modified copy of the current ``MonoCut``. """ # Pre-conditions assert ( self.has_recording ), "Cannot apply reverberation on a MultiCut without Recording." if self.has_features: logging.warning( "Attempting to reverberate a MultiCut that references pre-computed features. " "The feature manifest will be detached, as we do not support feature-domain " "reverberation." ) self.features = None if rir_recording is None: assert self.num_channels == 1, ( "We do not support reverberation simulation for multi-channel recordings. " "Please provide an impulse response." ) rir_channels = [0] else: assert all( c < rir_recording.num_channels for c in rir_channels ), "Invalid channel index in `rir_channels`." recording_rvb = self.recording.reverb_rir( rir_recording=rir_recording, normalize_output=normalize_output, early_only=early_only, affix_id=affix_id, rir_channels=rir_channels, ) # Match the supervision's id (and it's underlying recording id). supervisions_rvb = [ s.reverb_rir( affix_id=affix_id, ) for s in self.supervisions ] return fastcopy( self, id=f"{self.id}_rvb" if affix_id else self.id, recording=recording_rvb, supervisions=supervisions_rvb, )
[docs] def merge_supervisions( self, merge_channels: bool = True, custom_merge_fn: Optional[Callable[[str, Iterable[Any]], Any]] = None, ) -> "MultiCut": """ Return a copy of the cut that has all of its supervisions merged into a single segment. The ``channel`` attribute of all the segments in this case will be set to the union of all channels. If ``merge_channels`` is set to ``False``, the supervisions will be merged into a single segment per channel group. The ``channel`` attribute will not change in this case. The new start is the start of the earliest superivion, and the new duration is a minimum spanning duration for all the supervisions. The text fields are concatenated with a whitespace, and all other string fields (including IDs) are prefixed with "cat#" and concatenated with a hash symbol "#". This is also applied to ``custom`` fields. Fields with a ``None`` value are omitted. :param merge_channels: If true, we will merge all supervisions into a single segment. If false, we will merge supervisions per channel group. Default: True. :param custom_merge_fn: a function that will be called to merge custom fields values. We expect ``custom_merge_fn`` to handle all possible custom keys. When not provided, we will treat all custom values as strings. It will be called roughly like: ``custom_merge_fn(custom_key, [s.custom[custom_key] for s in sups])`` """ # "m" stands for merged in variable names below if custom_merge_fn is not None: # Merge custom fields with the user-provided function. merge_custom = custom_merge_fn else: # Merge the string representations of custom fields. merge_custom = lambda k, vs: merge_items_with_delimiter(map(str, vs)) sups = sorted(self.supervisions, key=lambda s: s.start) if len(sups) <= 1: return self if merge_channels: # Merge all supervisions into a single segment. all_channels = set() for s in sups: c = set(to_list(s.channel)) all_channels.update(c) all_channels = sorted(all_channels) sups_by_channel = {tuple(all_channels): sups} # `set` is not hashable else: # Merge supervisions per channel group. sups_by_channel = { tuple(c): list(csups) for c, csups in groupby( sorted(sups, key=lambda s: s.channel), key=lambda s: s.channel ) } msups = [] text_overlap_warning = False for channel, csups in sups_by_channel.items(): mstart = csups[0].start mend = csups[-1].end mduration = add_durations(mend, -mstart, sampling_rate=self.sampling_rate) custom_keys = set( k for s in csups if s.custom is not None for k in s.custom.keys() ) alignment_keys = set( k for s in csups if s.alignment is not None for k in s.alignment.keys() ) if ( any(overlaps(s1, s2) for s1, s2 in zip(csups, csups[1:])) and any(s.text is not None for s in csups) and not text_overlap_warning ): warnings.warn( "You are merging overlapping supervisions that have text transcripts. " "The result is likely to be unusable if you are going to train speech " f"recognition models (cut id: {self.id})." ) text_overlap_warning = True msups.append( SupervisionSegment( id=merge_items_with_delimiter(s.id for s in csups), recording_id=csups[0].recording_id, start=mstart, duration=mduration, channel=list(channel), text=" ".join(s.text for s in csups if s.text), speaker=merge_items_with_delimiter( s.speaker for s in csups if s.speaker ), language=merge_items_with_delimiter( s.language for s in csups if s.language ), gender=merge_items_with_delimiter( s.gender for s in csups if s.gender ), custom={ k: merge_custom( k, ( s.custom[k] for s in csups if s.custom is not None and k in s.custom ), ) for k in custom_keys }, alignment={ # Concatenate the lists of alignment units. k: reduce( add, ( s.alignment[k] for s in csups if s.alignment is not None and k in s.alignment ), ) for k in alignment_keys }, ) ) return fastcopy(self, supervisions=msups)
[docs] @staticmethod def from_mono(*cuts: DataCut) -> "MultiCut": """ Convert one or more MonoCut to a MultiCut. If multiple mono cuts are provided, they must match in all fields except the channel. Each cut must have a distinct channel. :param cuts: the input cut(s). :return: a MultiCut with a single track. """ from .mono import MonoCut assert all(isinstance(c, MonoCut) for c in cuts), "All cuts must be MonoCuts" assert ( sum( 1 for _ in groupby(cuts, key=lambda c: (c.recording_id, c.start, c.end)) ) == 1 ), "Cuts must match in all fields except channel" assert len(set(c.channel for c in cuts)) == len( cuts ), "All cuts must have a distinct channel" data = cuts[0].to_dict() data.pop("type") return MultiCut( **{ **data, "channel": sorted([c.channel for c in cuts]), "supervisions": [s for c in cuts for s in c.supervisions], } )
[docs] def to_mono(self) -> List["DataCut"]: """ Convert a MultiCut to a list of MonoCuts, one for each channel. """ from .mono import MonoCut return [ MonoCut( id=f"{self.id}-{channel}", recording=self.recording, start=self.start, duration=self.duration, channel=channel, supervisions=[ fastcopy(s, channel=channel) for s in self.supervisions if is_equal_or_contains(s.channel, channel) ], ) for channel in to_list(self.channel) ]
[docs] @staticmethod def from_dict(data: dict) -> "MultiCut": from lhotse.serialization import deserialize_custom_field # Remove "type" field if exists. data.pop("type", None) features = ( Features.from_dict(data.pop("features")) if "features" in data else None ) recording = ( Recording.from_dict(data.pop("recording")) if "recording" in data else None ) supervision_infos = data.pop("supervisions") if "supervisions" in data else [] if "custom" in data: deserialize_custom_field(data["custom"]) if "type" in data: data.pop("type") return MultiCut( **data, features=features, recording=recording, supervisions=[SupervisionSegment.from_dict(s) for s in supervision_infos], )