import logging
import multiprocessing
import pickle
import warnings
from abc import ABCMeta, abstractmethod
from concurrent.futures.process import ProcessPoolExecutor
from dataclasses import asdict, dataclass, is_dataclass
from itertools import chain
from math import isclose
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Type, Union
import numpy as np
import torch
from tqdm.auto import tqdm
from lhotse.audio import Recording
from lhotse.augmentation import AugmentFn
from lhotse.features.io import FeaturesWriter, get_reader
from lhotse.serialization import Serializable, load_yaml, save_to_yaml
from lhotse.utils import (Pathlike, Seconds, asdict_nonull, compute_num_frames, exactly_one_not_null, fastcopy,
ifnone, split_sequence,
uuid4)
FEATURE_EXTRACTORS = {}
[docs]@dataclass(order=True)
class Features:
"""
Represents features extracted for some particular time range in a given recording and channel.
It contains metadata about how it's stored: storage_type describes "how to read it", for now
it supports numpy arrays serialized with np.save, as well as arrays compressed with lilcom;
storage_path is the path to the file on the local filesystem.
"""
# Useful information about the features - their type (fbank, mfcc) and shape
type: str
num_frames: int
num_features: int
frame_shift: Seconds
sampling_rate: int
# Information about the time range of the features.
start: Seconds
duration: Seconds
# Parameters related to storage - they define how to load the feature matrix.
# Storage type defines which features reader type should be instantiated
# e.g. 'lilcom_files', 'numpy_files', 'lilcom_hdf5'
storage_type: str
# Storage path is either the path to some kind of archive (like HDF5 file) or a path
# to a directory holding files with feature matrices (exact semantics depend on storage_type).
storage_path: str
# Storage key is either the key used to retrieve a feautre matrix from an archive like HDF5,
# or the name of the file in a directory (exact semantics depend on the storage_type).
storage_key: str
# Information which recording and channels were used to extract the features.
# When ``recording_id`` and ``channels`` are ``None``, it means that the
# features were extracted from a cut (e.g. a ``MixedCut``), which might have consisted
# of multiple recordings.
recording_id: Optional[str] = None
channels: Optional[Union[int, List[int]]] = None
@property
def end(self) -> Seconds:
return self.start + self.duration
[docs] def load(
self,
start: Optional[Seconds] = None,
duration: Optional[Seconds] = None,
) -> np.ndarray:
# noinspection PyArgumentList
storage = get_reader(self.storage_type)(self.storage_path)
left_offset_frames, right_offset_frames = 0, None
if start is None:
start = self.start
# In case the caller requested only a sub-span of the features, trim them.
# Left trim
if start < self.start - 1e-5:
raise ValueError(f"Cannot load features for recording {self.recording_id} starting from {start}s. "
f"The available range is ({self.start}, {self.end}) seconds.")
if not isclose(start, self.start):
left_offset_frames = compute_num_frames(start - self.start, frame_shift=self.frame_shift,
sampling_rate=self.sampling_rate)
# Right trim
if duration is not None:
right_offset_frames = left_offset_frames + compute_num_frames(duration, frame_shift=self.frame_shift,
sampling_rate=self.sampling_rate)
# Load and return the features (subset) from the storage
return storage.read(
self.storage_key,
left_offset_frames=left_offset_frames,
right_offset_frames=right_offset_frames
)
[docs] def with_path_prefix(self, path: Pathlike) -> 'Features':
return fastcopy(self, storage_path=str(Path(path) / self.storage_path))
[docs] def to_dict(self) -> dict:
return asdict_nonull(self)
[docs] @staticmethod
def from_dict(data: dict) -> 'Features':
# The "storage_type" check is to ensure that the "data" dict actually contains
# the data for a "Features" object, and not something else.
# Some Lhotse utilities try to "guess" what is the right object type via trial-and-error,
# and would have created a false alarm here.
if 'frame_shift' not in data and 'storage_type' in data:
warnings.warn('The "frame_shift" field was not found in a feature manifest; '
'we\'ll try to infer it for now, but you should recreate the manifests.')
data['frame_shift'] = round(data['duration'] / data['num_frames'], ndigits=3)
return Features(**data)
[docs]class FeatureSet(Serializable, Sequence[Features]):
"""
Represents a feature manifest, and allows to read features for given recordings
within particular channels and time ranges.
It also keeps information about the feature extractor parameters used to obtain this set.
When a given recording/time-range/channel is unavailable, raises a KeyError.
"""
[docs] def __init__(self, features: List[Features] = None) -> None:
self.features = sorted(ifnone(features, []))
def __eq__(self, other: 'FeatureSet') -> bool:
return self.features == other.features
[docs] @staticmethod
def from_features(features: Iterable[Features]) -> 'FeatureSet':
return FeatureSet(list(features)) # just for consistency with other *Sets
[docs] @staticmethod
def from_dicts(data: Iterable[dict]) -> 'FeatureSet':
return FeatureSet(features=[Features.from_dict(feature_data) for feature_data in data])
[docs] def to_dicts(self) -> Iterable[dict]:
return (f.to_dict() for f in self)
[docs] def with_path_prefix(self, path: Pathlike) -> 'FeatureSet':
return FeatureSet.from_features(f.with_path_prefix(path) for f in self)
[docs] def split(self, num_splits: int, shuffle: bool = False, drop_last: bool = False) -> List['FeatureSet']:
"""
Split the :class:`~lhotse.FeatureSet` into ``num_splits`` pieces of equal size.
:param num_splits: Requested number of splits.
:param shuffle: Optionally shuffle the recordings order first.
:param drop_last: determines how to handle splitting when ``len(seq)`` is not divisible
by ``num_splits``. When ``False`` (default), the splits might have unequal lengths.
When ``True``, it may discard the last element in some splits to ensure they are
equally long.
:return: A list of :class:`~lhotse.FeatureSet` pieces.
"""
return [
FeatureSet.from_features(subset) for subset in
split_sequence(self, num_splits=num_splits, shuffle=shuffle, drop_last=drop_last)
]
[docs] def subset(self, first: Optional[int] = None, last: Optional[int] = None) -> 'FeatureSet':
"""
Return a new ``FeatureSet`` according to the selected subset criterion.
Only a single argument to ``subset`` is supported at this time.
:param first: int, the number of first supervisions to keep.
:param last: int, the number of last supervisions to keep.
:return: a new ``FeatureSet`` with the subset results.
"""
assert exactly_one_not_null(first, last), "subset() can handle only one non-None arg."
if first is not None:
assert first > 0
if first > len(self):
logging.warning(f'FeatureSet has only {len(self)} items but first {first} required; '
f'not doing anything.')
return self
return FeatureSet.from_features(self.features[:first])
if last is not None:
assert last > 0
if last > len(self):
logging.warning(f'FeatureSet has only {len(self)} items but last {last} required; '
f'not doing anything.')
return self
return FeatureSet.from_features(self.features[-last:])
[docs] def find(
self,
recording_id: str,
channel_id: int = 0,
start: Seconds = 0.0,
duration: Optional[Seconds] = None,
leeway: Seconds = 0.05
) -> Features:
"""
Find and return a Features object that best satisfies the search criteria.
Raise a KeyError when no such object is available.
:param recording_id: str, requested recording ID.
:param channel_id: int, requested channel.
:param start: float, requested start time in seconds for the feature chunk.
:param duration: optional float, requested duration in seconds for the feature chunk.
By default, return everything from the start.
:param leeway: float, controls how strictly we have to match the requested start and duration criteria.
It is necessary to keep a small positive value here (default 0.05s), as there might be differences between
the duration of recording/supervision segment, and the duration of features. The latter one is constrained
to be a multiple of frame_shift, while the former can be arbitrary.
:return: a Features object satisfying the search criteria.
"""
if duration is not None:
end = start + duration
# TODO: naive linear search; will likely require optimization
candidates = self._index_by_recording_id_and_cache()[recording_id]
candidates = (
f for f in candidates
if f.channels == channel_id
and f.start - leeway <= start < f.end + leeway
# filter edge case: start 1.5, features available till 1.0, duration is None
)
if duration is not None:
candidates = (f for f in candidates if f.end >= end - leeway)
candidates = list(candidates)
if not candidates:
raise KeyError(
f"No features available for recording '{recording_id}', channel {channel_id} in time range [{start}s,"
f" {'end' if duration is None else duration}s]")
# in case there is more than one candidate feature segment, select the best fit
# by minimizing the MSE of the time markers...
if duration is not None:
feature_info = min(candidates, key=lambda f: (start - f.start) ** 2 + (end - f.end) ** 2)
else:
feature_info = min(candidates, key=lambda f: (start - f.start) ** 2)
return feature_info
# This is a cache that significantly speeds up repeated ``find()`` queries.
_features_by_recording_id: Optional[Dict[str, List[Features]]] = None
def _index_by_recording_id_and_cache(self):
if self._features_by_recording_id is None:
from cytoolz import groupby
self._features_by_recording_id = groupby(lambda feat: feat.recording_id, self)
return self._features_by_recording_id
[docs] def load(
self,
recording_id: str,
channel_id: int = 0,
start: Seconds = 0.0,
duration: Optional[Seconds] = None,
) -> np.ndarray:
"""
Find a Features object that best satisfies the search criteria and load the features as a numpy ndarray.
Raise a KeyError when no such object is available.
"""
feature_info = self.find(
recording_id=recording_id,
channel_id=channel_id,
start=start,
duration=duration
)
features = feature_info.load(start=start, duration=duration)
return features
[docs] def compute_global_stats(self, storage_path: Optional[Pathlike] = None) -> Dict[str, np.ndarray]:
"""
Compute the global means and standard deviations for each feature bin in the manifest.
It follows the implementation in scikit-learn:
https://github.com/scikit-learn/scikit-learn/blob/0fb307bf39bbdacd6ed713c00724f8f871d60370/sklearn/utils/extmath.py#L715
which follows the paper:
"Algorithms for computing the sample variance: analysis and recommendations", by Chan, Golub, and LeVeque.
:param storage_path: an optional path to a file where the stats will be stored with pickle.
:return a dict of ``{'norm_means': np.ndarray, 'norm_stds': np.ndarray}`` with the
shape of the arrays equal to the number of feature bins in this manifest.
"""
return compute_global_stats(feature_manifests=self, storage_path=storage_path)
def __repr__(self) -> str:
return f'FeatureSet(len={len(self)})'
def __iter__(self) -> Iterable[Features]:
return iter(self.features)
def __getitem__(self, i: int) -> Features:
return self.features[i]
def __len__(self) -> int:
return len(self.features)
def __add__(self, other: 'FeatureSet') -> 'FeatureSet':
return FeatureSet(features=self.features + other.features)
[docs]class FeatureSetBuilder:
"""
An extended constructor for the FeatureSet. Think of it as a class wrapper for a feature extraction script.
It consumes an iterable of Recordings, extracts the features specified by the FeatureExtractor config,
and saves stores them on the disk.
Eventually, we plan to extend it with the capability to extract only the features in
specified regions of recordings and to perform some time-domain data augmentation.
"""
[docs] def __init__(
self,
feature_extractor: FeatureExtractor,
storage: FeaturesWriter,
augment_fn: Optional[AugmentFn] = None
):
self.feature_extractor = feature_extractor
self.storage = storage
self.augment_fn = augment_fn
[docs] def process_and_store_recordings(
self,
recordings: Sequence[Recording],
output_manifest: Optional[Pathlike] = None,
num_jobs: int = 1
) -> FeatureSet:
if num_jobs == 1:
# Avoid spawning subprocesses for single threaded processing
feature_set = FeatureSet.from_features(
tqdm(
chain.from_iterable(
map(self._process_and_store_recording, recordings)
),
total=len(recordings),
desc='Extracting and storing features'
)
)
else:
with ProcessPoolExecutor(num_jobs, mp_context=multiprocessing.get_context('spawn')) as ex:
feature_set = FeatureSet.from_features(
tqdm(
chain.from_iterable(
ex.map(self._process_and_store_recording, recordings)
),
total=len(recordings),
desc='Extracting and storing features in parallel'
)
)
if output_manifest is not None:
feature_set.to_file(output_manifest)
return feature_set
def _process_and_store_recording(
self,
recording: Recording,
) -> List[Features]:
results = []
for channel in recording.channel_ids:
results.append(self.feature_extractor.extract_from_recording_and_store(
recording=recording,
storage=self.storage,
channels=channel,
augment_fn=self.augment_fn,
))
return results
[docs]def store_feature_array(
feats: np.ndarray,
storage: FeaturesWriter,
) -> str:
"""
Store ``feats`` array on disk, using ``lilcom`` compression by default.
:param feats: a numpy ndarray containing features.
:param storage: a ``FeaturesWriter`` object to use for array storage.
:return: a path to the file containing the stored array.
"""
feats_id = str(uuid4())
storage_key = storage.write(feats_id, feats)
return storage_key
[docs]def compute_global_stats(
feature_manifests: Iterable[Features],
storage_path: Optional[Pathlike] = None
) -> Dict[str, np.ndarray]:
"""
Compute the global means and standard deviations for each feature bin in the manifest.
It performs only a single pass over the data and iteratively updates the estimate of the
means and variances.
We follow the implementation in scikit-learn:
https://github.com/scikit-learn/scikit-learn/blob/0fb307bf39bbdacd6ed713c00724f8f871d60370/sklearn/utils/extmath.py#L715
which follows the paper:
"Algorithms for computing the sample variance: analysis and recommendations", by Chan, Golub, and LeVeque.
:param feature_manifests: an iterable of ``Features`` objects.
:param storage_path: an optional path to a file where the stats will be stored with pickle.
:return a dict of ``{'norm_means': np.ndarray, 'norm_stds': np.ndarray}`` with the
shape of the arrays equal to the number of feature bins in this manifest.
"""
feature_manifests = iter(feature_manifests)
first = next(feature_manifests)
total_sum = np.zeros((first.num_features,), dtype=np.float64)
total_unnorm_var = np.zeros((first.num_features,), dtype=np.float64)
total_frames = 0
with np.errstate(divide='ignore', invalid='ignore'):
for features in chain([first], feature_manifests):
# Read the features
arr = features.load().astype(np.float64)
# Update the sum for the means
curr_sum = arr.sum(axis=0)
updated_total_sum = total_sum + curr_sum
# Update the number of frames
curr_frames = arr.shape[0]
updated_total_frames = total_frames + curr_frames
# Update the unnormalized variance
total_over_curr_frames = total_frames / curr_frames
curr_unnorm_var = np.var(arr, axis=0) * curr_frames
if total_frames > 0:
total_unnorm_var = (
total_unnorm_var + curr_unnorm_var +
total_over_curr_frames / updated_total_frames *
(total_sum / total_over_curr_frames - curr_sum) ** 2)
else:
total_unnorm_var = curr_unnorm_var
total_sum = updated_total_sum
total_frames = updated_total_frames
stats = {
'norm_means': total_sum / total_frames,
'norm_stds': np.sqrt(total_unnorm_var / total_frames)
}
if storage_path is not None:
with open(storage_path, 'wb') as f:
pickle.dump(stats, f)
return stats