import warnings
from dataclasses import dataclass
from io import BytesIO, FileIO
from pathlib import Path
from subprocess import PIPE, run
from typing import List, Optional, Tuple, Union
import numpy as np
import torch
from lhotse.audio.backend import read_audio
from lhotse.audio.utils import (
DurationMismatchError,
VideoInfo,
VideoLoadingError,
get_audio_duration_mismatch_tolerance,
)
from lhotse.caching import AudioCache
from lhotse.utils import (
Pathlike,
Seconds,
SmartOpen,
asdict_nonull,
compute_num_samples,
fastcopy,
)
PathOrFilelike = Union[str, BytesIO, FileIO]
[docs]
@dataclass
class AudioSource:
"""
AudioSource represents audio data that can be retrieved from somewhere.
"""
type: str
"""
The type of audio source. Supported types are:
- 'file' (supports most standard audio encodings, possibly multi-channel)
- 'command' [unix pipe] (supports most standard audio encodings, possibly multi-channel)
- 'url' (any URL type that is supported by "smart_open" library, e.g. http/https/s3/gcp/azure/etc.)
- 'memory' (any format, read from a binary string attached to 'source' member of AudioSource)
- 'shar' (indicates a placeholder that will be filled later when using Lhotse Shar data format)
"""
channels: List[int]
"""
A list of integer channel IDs available in this AudioSource.
"""
source: Union[str, bytes]
"""
The actual source to read from. The contents depend on the ``type`` field,
but in general it can be a path, a URL, or the encoded binary data itself.
"""
video: Optional[VideoInfo] = None
"""
Optional information about the video contained in this source, if any.
"""
@property
def has_video(self) -> bool:
return self.video is not None
[docs]
def load_audio(
self,
offset: Seconds = 0.0,
duration: Optional[Seconds] = None,
force_opus_sampling_rate: Optional[int] = None,
) -> np.ndarray:
"""
Load the AudioSource (from files, commands, or URLs) with soundfile,
accounting for many audio formats and multi-channel inputs.
Returns numpy array with shapes: (n_samples,) for single-channel,
(n_channels, n_samples) for multi-channel.
Note: The elements in the returned array are in the range [-1.0, 1.0]
and are of dtype `np.float32`.
:param force_opus_sampling_rate: This parameter is only used when we detect an OPUS file.
It will tell ffmpeg to resample OPUS to this sampling rate.
"""
source = self._prepare_for_reading(offset=offset, duration=duration)
samples, sampling_rate = read_audio(
source,
offset=offset,
duration=duration,
force_opus_sampling_rate=force_opus_sampling_rate,
)
# explicit sanity check for duration as soundfile does not complain here
if duration is not None:
num_samples = (
samples.shape[0] if len(samples.shape) == 1 else samples.shape[1]
)
available_duration = num_samples / sampling_rate
if (
available_duration < duration - get_audio_duration_mismatch_tolerance()
): # set the allowance as 1ms to avoid float error
raise DurationMismatchError(
f"Requested more audio ({duration}s) than available ({available_duration}s)"
)
return samples.astype(np.float32)
[docs]
def load_video(
self,
offset: Seconds = 0.0,
duration: Optional[Seconds] = None,
with_audio: bool = True,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
import torchaudio
try:
# Open the video file for reading.
stream = torchaudio.io.StreamReader(self.source)
# Collect the information about available video and audio streams.
num_streams = stream.num_src_streams
audio_streams = {}
video_streams = {}
for stream_idx in range(num_streams):
info = stream.get_src_stream_info(stream_idx)
if info.media_type == "video":
video_streams[stream_idx] = info
elif info.media_type == "audio":
audio_streams[stream_idx] = info
else:
raise RuntimeError(f"Unexpected media_type: {info}")
assert (
len(video_streams) != 0
), "The file does not seem to have any video streams."
assert (
len(video_streams) < 2
), f"Lhotse currently does not support more than one video stream in a file (found {len(video_streams)})."
assert len(audio_streams) < 2, (
f"Lhotse currently does not support more than one audio stream in a file (found {len(video_streams)})."
f" Note: it's different than multi-channel which is generally supported."
)
# Add an ffmpeg output video stream to perform reading in chunks.
((video_stream_idx, video_stream),) = list(video_streams.items())
frames_per_chunk = round(video_stream.frame_rate)
video_chunk_duration = frames_per_chunk * self.video.frame_length
stream.add_basic_video_stream(
frames_per_chunk,
stream_index=video_stream_idx,
height=self.video.height, # re-scale if requested via overriding self.video.height
width=self.video.width, # re-scale if requested via overriding self.video.width
)
if with_audio and len(audio_streams) > 0:
((audio_stream_idx, audio_stream),) = list(audio_streams.items())
samples_per_chunk = round(
audio_stream.sample_rate * video_chunk_duration
)
stream.add_basic_audio_stream(
samples_per_chunk,
stream_index=audio_stream_idx,
)
stream.seek(offset)
video_chunks = []
audio_chunks = []
decoded_duration = 0.0
for chunk in stream.stream():
if duration is not None and decoded_duration >= duration:
break
video_chunk = chunk[0]
chunk_size = video_chunk.size(0)
current_chunk_duration = chunk_size / video_stream.frame_rate
if (
duration is not None
and decoded_duration + current_chunk_duration > duration
):
keep_frames = compute_num_samples(
video_chunk_duration - current_chunk_duration,
video_stream.frame_rate,
)
video_chunk = video_chunk[:keep_frames]
video_chunks.append(video_chunk)
if with_audio:
audio_chunk = chunk[1]
if (
duration is not None
and decoded_duration + current_chunk_duration > duration
):
keep_samples = compute_num_samples(
video_chunk_duration - current_chunk_duration,
audio_stream.sample_rate,
)
audio_chunk = audio_chunk[:keep_samples]
audio_chunks.append(audio_chunk.T)
decoded_duration += current_chunk_duration
if not video_chunks:
return (
torch.zeros(
0, 3, video_stream.height, video_stream.width, dtype=torch.uint8
),
None,
)
output_video = torch.cat(video_chunks, dim=0) # T x C x H x W
output_audio = None
if with_audio:
output_audio = torch.cat(audio_chunks, dim=1) # C x T
return output_video, output_audio
except Exception as e:
raise VideoLoadingError(
f"Reading video from '{self.source if not isinstance(self.source, bytes) else 'memory'}' failed. "
f"Details: {type(e)}: {str(e)}"
)
[docs]
def with_video_resolution(self, width: int, height: int) -> "AudioSource":
return fastcopy(self, video=self.video.copy_with(width=width, height=height))
[docs]
def with_path_prefix(self, path: Pathlike) -> "AudioSource":
if self.type != "file":
return self
return fastcopy(self, source=str(Path(path) / self.source))
[docs]
def to_dict(self) -> dict:
return asdict_nonull(self)
[docs]
@staticmethod
def from_dict(data) -> "AudioSource":
return AudioSource(**data)
def __repr__(self):
return (
f"AudioSource(type='{self.type}', channels={self.channels}, "
f"source='{self.source if isinstance(self.source, str) else '<binary-data>'}')"
)
def _prepare_for_reading(
self, offset: Seconds, duration: Optional[Seconds]
) -> PathOrFilelike:
"""
Validates `self.type` and prepares the actual source for audio reading.
Returns either a path or a file-like object opened in binary mode,
that can be handled by :func:`lhotse.audio.backend.read_audio`.
"""
assert self.type in (
"file",
"command",
"url",
"memory",
"shar",
), f"Unexpected AudioSource type: '{self.type}'"
source = self.source
if self.type == "command":
if (offset != 0.0 or duration is not None) and not AudioCache.enabled():
warnings.warn(
"You requested a subset of a recording that is read from disk via a bash command. "
"Expect large I/O overhead if you are going to read many chunks like these, "
"since every time we will read the whole file rather than its subset."
"You can use `lhotse.set_caching_enabled(True)` to mitigate the overhead."
)
# Let's assume 'self.source' is a pipe-command with unchangeable file,
# never a microphone-stream or a live-stream.
audio_bytes = AudioCache.try_cache(self.source)
if not audio_bytes:
audio_bytes = run(self.source, shell=True, stdout=PIPE).stdout
AudioCache.add_to_cache(self.source, audio_bytes)
source = BytesIO(audio_bytes)
elif self.type == "url":
if offset != 0.0 or duration is not None and not AudioCache.enabled():
warnings.warn(
"You requested a subset of a recording that is read from URL. "
"Expect large I/O overhead if you are going to read many chunks like these, "
"since every time we will download the whole file rather than its subset."
"You can use `lhotse.set_caching_enabled(True)` to mitigate the overhead."
)
# Let's assume 'self.source' is url to unchangeable file,
# never a microphone-stream or a live-stream.
audio_bytes = AudioCache.try_cache(self.source)
if not audio_bytes:
with SmartOpen.open(self.source, "rb") as f:
audio_bytes = f.read()
AudioCache.add_to_cache(self.source, audio_bytes)
source = BytesIO(audio_bytes)
elif self.type == "memory":
assert isinstance(self.source, bytes), (
"Corrupted manifest: specified AudioSource type is 'memory', "
f"but 'self.source' attribute is not of type 'bytes' (found: '{type(self.source).__name__}')."
)
source = BytesIO(self.source)
elif self.type == "shar":
raise RuntimeError(
"Inconsistent state: found an AudioSource with Lhotse Shar placeholder "
"that was not filled during deserialization."
)
return source