import codecs
import json
from io import BytesIO
from typing import List, Literal, Optional, Tuple, Union
import numpy as np
import torch
from lhotse import Recording
from lhotse.audio.backend import (
LibsndfileBackend,
get_current_audio_backend,
save_audio,
)
from lhotse.augmentation import get_or_create_resampler
from lhotse.shar.utils import to_shar_placeholder
from lhotse.shar.writers.tar import TarWriter
from lhotse.utils import is_torchaudio_available
[docs]
class AudioTarWriter:
"""
AudioTarWriter writes audio examples in numpy arrays or PyTorch tensors into a tar archive
that is automatically sharded.
It is different from :class:`~lhotse.shar.writers.array.ArrayTarWriter` in that it supports
audio-specific compression mechanisms, such as ``flac``, ``opus``, ``mp3``, or ``wav``.
Example::
>>> with AudioTarWriter("some_dir/audio.%06d.tar", shard_size=100, format="mp3") as w:
... w.write("audio1", audio1_array)
... w.write("audio2", audio2_array) # etc.
It would create files such as ``some_dir/audio.000000.tar``, ``some_dir/audio.000001.tar``, etc.
It's also possible to use ``AudioTarWriter`` with automatic sharding disabled::
>>> with AudioTarWriter("some_dir/audio.tar", shard_size=None, format="flac") as w:
... w.write("audio1", audio1_array)
... w.write("audio2", audio2_array) # etc.
See also: :class:`~lhotse.shar.writers.tar.TarWriter`, :class:`~lhotse.shar.writers.array.ArrayTarWriter`
"""
[docs]
def __init__(
self,
pattern: str,
shard_size: Optional[int] = 1000,
format: Literal["wav", "flac", "mp3", "opus"] = "flac",
):
self.format = format
self.tar_writer = TarWriter(pattern, shard_size)
def __enter__(self):
self.tar_writer.__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def close(self):
self.tar_writer.close()
@property
def output_paths(self) -> List[str]:
return self.tar_writer.output_paths
[docs]
def write_placeholder(self, key: str) -> None:
self.tar_writer.write(f"{key}.nodata", BytesIO())
self.tar_writer.write(f"{key}.nometa", BytesIO(), count=False)
[docs]
def write(
self,
key: str,
value: np.ndarray,
sampling_rate: int,
manifest: Recording,
) -> None:
value, manifest, sampling_rate = self._maybe_resample(
value, manifest, sampling_rate
)
# Write binary data
stream = BytesIO()
save_audio(
dest=stream, src=value, sampling_rate=sampling_rate, format=self.format
)
self.tar_writer.write(f"{key}.{self.format}", stream)
# Write text manifest afterwards
manifest = to_shar_placeholder(manifest)
json_stream = BytesIO()
print(
json.dumps(manifest.to_dict()),
file=codecs.getwriter("utf-8")(json_stream),
)
json_stream.seek(0)
self.tar_writer.write(f"{key}.json", json_stream, count=False)
def _maybe_resample(
self,
audio: Union[torch.Tensor, np.ndarray],
manifest: Recording,
sampling_rate: int,
) -> Tuple[Union[np.ndarray, torch.Tensor], Recording, int]:
# Resampling is required for some versions of OPUS encoders.
# First resample the manifest which only adjusts the metadata;
# then resample the audio array to 48kHz.
OPUS_DEFAULT_SAMPLING_RATE = 48000
if (
self.format == "opus"
and is_torchaudio_available()
and not isinstance(get_current_audio_backend(), LibsndfileBackend)
and sampling_rate != OPUS_DEFAULT_SAMPLING_RATE
):
manifest = manifest.resample(OPUS_DEFAULT_SAMPLING_RATE)
audio = get_or_create_resampler(sampling_rate, OPUS_DEFAULT_SAMPLING_RATE)(
torch.as_tensor(audio)
)
return audio, manifest, OPUS_DEFAULT_SAMPLING_RATE
return audio, manifest, sampling_rate