Source code for lhotse.shar.writers.array

import codecs
import json
from io import BytesIO
from typing import List, Literal, Optional, Union

import numpy as np

from lhotse import Features
from lhotse.array import Array, TemporalArray
from lhotse.shar.utils import to_shar_placeholder
from lhotse.shar.writers.tar import TarWriter
from lhotse.utils import is_module_available


[docs]class ArrayTarWriter: """ ArrayTarWriter writes numpy arrays or PyTorch tensors into a tar archive that is automatically sharded. For floating point tensors, we support the option to use `lilcom` compression. Note that `lilcom` is only suitable for log-space features such as log-Mel filter banks. Example:: >>> with ArrayTarWriter("some_dir/fbank.%06d.tar", shard_size=100, compression="lilcom") as w: ... w.write("fbank1", fbank1_array) ... w.write("fbank2", fbank2_array) # etc. It would create files such as ``some_dir/fbank.000000.tar``, ``some_dir/fbank.000001.tar``, etc. The starting shard offset can be set using ``shard_offset`` parameter. The writer starts from 0 by default. It's also possible to use ``ArrayTarWriter`` with automatic sharding disabled:: >>> with ArrayTarWriter("some_dir/fbank.tar", shard_size=None, compression="numpy") as w: ... w.write("fbank1", fbank1_array) ... w.write("fbank2", fbank2_array) # etc. See also: :class:`~lhotse.shar.writers.tar.TarWriter`, :class:`~lhotse.shar.writers.audio.AudioTarWriter` """
[docs] def __init__( self, pattern: str, shard_size: Optional[int] = 1000, compression: Literal["numpy", "lilcom"] = "numpy", lilcom_tick_power: int = -5, shard_offset: int = 0, ): if compression == "lilcom" and not is_module_available("lilcom"): raise ImportError( "ArrayTarWriter with lilcom compression requires the 'lilcom' module. " "Install it or use compression='numpy'." ) self.compression = compression self.tar_writer = TarWriter(pattern, shard_size, shard_offset=shard_offset) self.lilcom_tick_power = lilcom_tick_power
def __enter__(self): self.tar_writer.__enter__() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.tar_writer.close()
@property def output_paths(self) -> List[str]: return self.tar_writer.output_paths
[docs] def write_placeholder(self, key: str) -> None: self.tar_writer.write(f"{key}.nodata", BytesIO()) self.tar_writer.write(f"{key}.nometa", BytesIO(), count=False)
[docs] def write( self, key: str, value: np.ndarray, manifest: Union[Features, Array, TemporalArray], ) -> None: # Write binary data if self.compression == "lilcom": assert np.issubdtype( value.dtype, np.floating ), "Lilcom compression supports only floating-point arrays." import lilcom data = lilcom.compress(value, tick_power=self.lilcom_tick_power) stream = BytesIO(data) ext = ".llc" else: stream = BytesIO() np.save(stream, value, allow_pickle=False) ext = ".npy" self.tar_writer.write(key + ext, stream) # Write text manifest afterwards manifest = to_shar_placeholder(manifest) json_stream = BytesIO() print( json.dumps(manifest.to_dict()), file=codecs.getwriter("utf-8")(json_stream), ) json_stream.seek(0) self.tar_writer.write(f"{key}.json", json_stream, count=False)