Source code for lhotse.shar.writers.array

import codecs
import json
from io import BytesIO
from typing import List, Literal, Optional, Union

import lilcom
import numpy as np

from lhotse import Features
from lhotse.array import Array, TemporalArray
from lhotse.shar.utils import to_shar_placeholder
from lhotse.shar.writers.tar import TarWriter


[docs] class ArrayTarWriter: """ ArrayTarWriter writes numpy arrays or PyTorch tensors into a tar archive that is automatically sharded. For floating point tensors, we support the option to use `lilcom` compression. Note that `lilcom` is only suitable for log-space features such as log-Mel filter banks. Example:: >>> with ArrayTarWriter("some_dir/fbank.%06d.tar", shard_size=100, compression="lilcom") as w: ... w.write("fbank1", fbank1_array) ... w.write("fbank2", fbank2_array) # etc. It would create files such as ``some_dir/fbank.000000.tar``, ``some_dir/fbank.000001.tar``, etc. It's also possible to use ``ArrayTarWriter`` with automatic sharding disabled:: >>> with ArrayTarWriter("some_dir/fbank.tar", shard_size=None, compression="numpy") as w: ... w.write("fbank1", fbank1_array) ... w.write("fbank2", fbank2_array) # etc. See also: :class:`~lhotse.shar.writers.tar.TarWriter`, :class:`~lhotse.shar.writers.audio.AudioTarWriter` """
[docs] def __init__( self, pattern: str, shard_size: Optional[int] = 1000, compression: Literal["numpy", "lilcom"] = "numpy", lilcom_tick_power: int = -5, ): self.compression = compression self.tar_writer = TarWriter(pattern, shard_size) self.lilcom_tick_power = lilcom_tick_power
def __enter__(self): self.tar_writer.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.tar_writer.close()
@property def output_paths(self) -> List[str]: return self.tar_writer.output_paths
[docs] def write_placeholder(self, key: str) -> None: self.tar_writer.write(f"{key}.nodata", BytesIO()) self.tar_writer.write(f"{key}.nometa", BytesIO(), count=False)
[docs] def write( self, key: str, value: np.ndarray, manifest: Union[Features, Array, TemporalArray], ) -> None: # Write binary data if self.compression == "lilcom": assert np.issubdtype( value.dtype, np.floating ), "Lilcom compression supports only floating-point arrays." data = lilcom.compress(value, tick_power=self.lilcom_tick_power) stream = BytesIO(data) ext = ".llc" else: stream = BytesIO() np.save(stream, value, allow_pickle=False) ext = ".npy" self.tar_writer.write(key + ext, stream) # Write text manifest afterwards manifest = to_shar_placeholder(manifest) json_stream = BytesIO() print( json.dumps(manifest.to_dict()), file=codecs.getwriter("utf-8")(json_stream), ) json_stream.seek(0) self.tar_writer.write(f"{key}.json", json_stream, count=False)