Source code for lhotse.recipes.chime6

"""
The CHiME-6 dataset is a collection of over 50 hours of conversational speech recordings
collected from twenty real dinner parties that have taken place in real homes. The
recordings have been made using multiple 4-channel microphone arrays and have been
fully transcribed.

The dataset features:

- simultaneous recordings from multiple microphone arrays;
- real conversation, i.e. talkers speaking in a relaxed and unscripted fashion;
- a range of room acoustics from 20 different homes each with two or three separate
  recording areas;
- real domestic noise backgrounds, e.g., kitchen appliances, air conditioning,
  movement, etc.

Fully-transcribed utterances are provided in continuous audio with ground truth speaker
labels and start/end time annotations for segmentation.

The dataset was used for the 5th and 6th CHiME Speech Separation and Recognition
Challenge. Further information and an open source baseline speech recognition system
are available online (http://spandh.dcs.shef.ac.uk/chime_challenge/chime2018).

NOTE: The CHiME-5 and CHiME-6 datasets are the same, with the only difference that
additional software was provided in CHiME-6 to perform array synchronization. We expect
that users have already downloaded the CHiME-5 dataset here:
https://licensing.sheffield.ac.uk/product/chime5

NOTE: Users can also additionally perform array synchronization as described here:
https://github.com/kaldi-asr/kaldi/blob/master/egs/chime6/s5_track1/local/generate_chime6_data.sh
We also provide this option in the `prepare_chime6` function.
"""

import itertools
import json
import logging
import os
import shutil
import subprocess
import tempfile
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait
from datetime import datetime as dt
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union

import numpy as np
from tqdm import tqdm

from lhotse import fix_manifests, validate_recordings_and_supervisions
from lhotse.audio import AudioSource, Recording, RecordingSet
from lhotse.recipes.utils import TimeFormatConverter, normalize_text_chime6
from lhotse.supervision import SupervisionSegment, SupervisionSet
from lhotse.utils import Pathlike, add_durations, resumable_download

# fmt: off
DATASET_PARTS = {
    "train": [
        "S03", "S04", "S05", "S06", "S07", "S08", "S12", "S13",
        "S16", "S17", "S18", "S19", "S20", "S22", "S23", "S24",
    ],
    "dev": ["S02", "S09"],
    "eval": ["S01", "S21"],
}

DATASET_PARTS_CHIME7 = {
    "train": [
        "S03", "S04", "S05", "S06", "S07", "S08", "S12", "S13",
        "S16", "S17", "S18", "S22", "S23", "S24",
    ],
    "dev": ["S02", "S09"],
    "eval": ["S01", "S19", "S20", "S21"],
}

# fmt: on
CHIME6_AUDIO_EDITS_JSON = "https://raw.githubusercontent.com/chimechallenge/chime6-synchronisation/master/chime6_audio_edits.json"
CHIME6_MD5SUM_FILE = "https://raw.githubusercontent.com/chimechallenge/chime6-synchronisation/master/audio_md5sums.txt"


[docs] def download_chime6( target_dir: Pathlike = ".", ) -> Path: """ Download the original dataset. This cannot be done automatically because of the license agreement. Please visit the following URL and download the dataset manually: https://licensing.sheffield.ac.uk/product/chime5 :param target_dir: Pathlike, the path of the dir to storage the dataset. :return: the path to downloaded and extracted directory with data. """ print("We cannot download the CHiME-6 dataset automatically.") print("Please visit the following URL and download the dataset manually:") print("https://licensing.sheffield.ac.uk/product/chime5") print("Then, please extract the tar files to the following directory:") print(f"{target_dir}") return Path(target_dir)
[docs] def prepare_chime6( corpus_dir: Pathlike, output_dir: Optional[Pathlike] = None, dataset_parts: Optional[Union[str, Sequence[str]]] = "all", mic: str = "mdm", use_reference_array: bool = False, perform_array_sync: bool = False, verify_md5_checksums: bool = False, num_jobs: int = 1, num_threads_per_job: int = 1, sox_path: Pathlike = "/usr/bin/sox", normalize_text: str = "kaldi", use_chime7_split: bool = False, ) -> Dict[str, Dict[str, Union[RecordingSet, SupervisionSet]]]: """ Returns the manifests which consist of the Recordings and Supervisions :param corpus_dir: Pathlike, the path of the data dir, either the original CHiME-5 data or the synchronized CHiME-6 data. If former, the `perform_array_sync` must be True. :param output_dir: Pathlike, the path where to write the manifests. :param mic: str, the microphone type to use, choose from "ihm" (close-talk) or "mdm" (multi-microphone array) settings. For MDM, there are 6 array devices with 4 channels each, so the resulting recordings will have 24 channels. :param use_reference_array: bool, if True, use the reference array for MDM setting. Only the supervision segments have the reference array information in the `channel` field. The recordings will still have all the channels in the array. Note that the train set does not have the reference array information. :param perform_array_sync: Bool, if True, perform array synchronization based on: https://github.com/chimechallenge/chime6-synchronisation :param num_jobs: int, the number of jobs to run in parallel for array synchronization. :param num_threads_per_job: int, number of threads to use per job for clock drift correction. Large values may require more memory, so we recommend using a job scheduler. :param sox_path: Pathlike, the path to the sox v14.4.2 binary. Note that different versions of sox may produce different results. :param normalize_text: str, the text normalization method, choose from "none", "upper", "kaldi". The "kaldi" method is the same as Kaldi's text normalization method for CHiME-6. :param verify_md5_checksums: bool, if True, verify the md5 checksums of the audio files. Note that this step is slow so we recommend only doing it once. It can be sped up by using the `num_jobs` argument. :param use_chime7_split: bool, if True, use the new split for CHiME-7 challenge. :return: a Dict whose key is the dataset part ("train", "dev" and "eval"), and the value is Dicts with the keys 'recordings' and 'supervisions'. NOTE: If `perform_array_sync` is True, the synchronized data will be written to `output_dir`/CHiME6. This may take a long time and the output will occupy approximately 160G of storage. We will also create a temporary directory for processing, so the required storage in total will be approximately 300G. """ import soundfile as sf assert mic in ["ihm", "mdm"], "mic must be either 'ihm' or 'mdm'." if output_dir: output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) if "all" in dataset_parts: dataset_parts = list(DATASET_PARTS.keys()) elif isinstance(dataset_parts, str): dataset_parts = [dataset_parts] assert set(dataset_parts).issubset( set(DATASET_PARTS.keys()) ), f"dataset_parts must be one of {list(DATASET_PARTS.keys())}. Found {dataset_parts}" sessions = list( itertools.chain.from_iterable([DATASET_PARTS[part] for part in dataset_parts]) ) if perform_array_sync: if not output_dir: raise ValueError( "If `perform_array_sync` is True, `output_dir` must be specified." ) chime6_dir = output_dir / "CHiME6" chime6_dir.mkdir(parents=True, exist_ok=True) # Create directories for train, dev, and test audio and transcriptions for part in dataset_parts: (chime6_dir / "audio" / part).mkdir(parents=True, exist_ok=True) (chime6_dir / "transcriptions" / part).mkdir(parents=True, exist_ok=True) # Check sox version sox_version = ( subprocess.check_output([sox_path, "--version"]) .decode("utf-8") .strip() .split(" ")[-1] ) assert sox_version == "v14.4.2", ( "The sox version must be 14.4.2. " "Please download the sox v14.4.2 binary from " "https://sourceforge.net/projects/sox/files/sox/14.4.2/ " "and specify the path to the binary with the `sox_path` argument." "You can also install it in a Conda environment with the following command: " "conda install -c conda-forge sox=14.4.2" ) chime6_array_synchronizer = Chime6ArraySynchronizer( corpus_dir=corpus_dir, output_dir=chime6_dir, sox_path=sox_path, num_workers=num_threads_per_job, ) num_jobs = min(num_jobs, len(sessions)) # since there are 20 sessions logging.info( f"Performing array synchronization with {num_jobs} jobs. This may " "take a long time." ) with ProcessPoolExecutor(max_workers=num_jobs) as ex: futures = [ ex.submit( chime6_array_synchronizer.synchronize_session, session=session, ) for session in sessions ] _ = wait(futures) else: chime6_dir = Path(corpus_dir) # Verify MD5 checksums for all audio files if requested if verify_md5_checksums: if _verify_md5_checksums(chime6_dir, num_workers=num_jobs, sessions=sessions): print("MD5 checksums verified. All OK.") else: raise RuntimeError( "MD5 checksums do not match. Please prepare the array-synchronized CHiME-6 " "dataset again." ) # Reference array is only applicable for MDM setting use_reference_array = use_reference_array and mic == "mdm" manifests = defaultdict(dict) for part in dataset_parts: recordings = [] supervisions = [] # Since CHiME-7 uses a different split, we need to change the sessions if use_chime7_split: DATASET_PARTS[part] = DATASET_PARTS_CHIME7[part] # Also, if the session is S19 or S20, we will look for its audio and transcriptions # in the train set, since it was originally in train. # First we create the recordings if mic == "ihm": global_spk_channel_map = {} for session in DATASET_PARTS[part]: part_ = ( "train" if use_chime7_split and session in ["S19", "S20"] else part ) audio_paths = [ p for p in (chime6_dir / "audio" / part_).rglob(f"{session}_P*.wav") ] if len(audio_paths) == 0: raise FileNotFoundError( f"No audio found for session {session} in {part_} set." ) sources = [] # NOTE: Each headset microphone is binaural for idx, audio_path in enumerate(audio_paths): sources.append( AudioSource( type="file", channels=[2 * idx, 2 * idx + 1], source=str(audio_path), ) ) spk_id = audio_path.stem.split("_")[1] global_spk_channel_map[(session, spk_id)] = [2 * idx, 2 * idx + 1] audio_sf = sf.SoundFile(str(audio_paths[0])) recordings.append( Recording( id=session, sources=sources, sampling_rate=int(audio_sf.samplerate), num_samples=audio_sf.frames, duration=audio_sf.frames / audio_sf.samplerate, ) ) else: for session in DATASET_PARTS[part]: part_ = ( "train" if use_chime7_split and session in ["S19", "S20"] else part ) audio_paths = [ p for p in (chime6_dir / "audio" / part_).rglob(f"{session}_U*.wav") ] sources = [] for idx, audio_path in enumerate(sorted(audio_paths)): sources.append( AudioSource(type="file", channels=[idx], source=str(audio_path)) ) audio_sf = sf.SoundFile(str(audio_paths[0])) recordings.append( Recording( id=session, sources=sources, sampling_rate=int(audio_sf.samplerate), num_samples=audio_sf.frames, duration=audio_sf.frames / audio_sf.samplerate, ) ) recordings = RecordingSet.from_recordings(recordings) def _get_channel(spk_id, session, ref=None): if mic == "ihm": return global_spk_channel_map[(session, spk_id)] else: recording = recordings[session] return ( list(range(recording.num_channels)) if not ref else [i for i, s in enumerate(recording.sources) if ref in s.source] ) # Then we create the supervisions for session in DATASET_PARTS[part]: part_ = "train" if use_chime7_split and session in ["S19", "S20"] else part with open(chime6_dir / "transcriptions" / part_ / f"{session}.json") as f: transcript = json.load(f) for idx, segment in enumerate(transcript): spk_id = segment["speaker"] channel = _get_channel( spk_id, session, ref=segment["ref"] if use_reference_array and part != "train" else None, ) start = TimeFormatConverter.hms_to_seconds(segment["start_time"]) end = TimeFormatConverter.hms_to_seconds(segment["end_time"]) if start >= end: # some segments may have negative duration continue supervisions.append( SupervisionSegment( id=f"{session}-{idx}", recording_id=session, start=start, duration=add_durations(end, -start, sampling_rate=16000), channel=channel, text=normalize_text_chime6( segment["words"], normalize=normalize_text ), language="English", speaker=spk_id, custom={ "location": segment["location"], } if part != "train" and "location" in segment else None, ) ) supervisions = SupervisionSet.from_segments(supervisions) recording_set, supervision_set = fix_manifests( recordings=recordings, supervisions=supervisions ) # Fix manifests validate_recordings_and_supervisions(recording_set, supervision_set) if output_dir is not None: mic_affix = f"{mic}-ref" if use_reference_array else mic supervision_set.to_file( output_dir / f"chime6-{mic_affix}_supervisions_{part}.jsonl.gz" ) recording_set.to_file( output_dir / f"chime6-{mic}_recordings_{part}.jsonl.gz" ) manifests[part] = { "recordings": recording_set, "supervisions": supervision_set, } return manifests
def _verify_md5_checksums( corpus_dir: Pathlike, num_workers: int = 1, sessions: Optional[List[str]] = None ) -> bool: import hashlib # First download checksum file and read it into a dictionary temp_dir = Path(tempfile.mkdtemp()) checksum_file = temp_dir / "md5sums.txt" resumable_download( CHIME6_MD5SUM_FILE, str(checksum_file), desc="Downloading checksum file" ) checksums = {} with open(checksum_file, "r") as f: for line in f: checksum, filename = line.strip().split(" ", maxsplit=1) checksums[Path(filename).stem] = checksum # Now verify the checksums def _verify_checksum(file: Pathlike) -> bool: checksum = hashlib.md5(open(str(file), "rb").read()).hexdigest() filename = str(file.stem) if filename in checksums and checksum != checksums[filename]: return False return True all_files = list(corpus_dir.rglob("*.wav")) if sessions is not None: all_files = [f for f in all_files if f.stem.split("_")[0] in sessions] print(f"Verifying checksum with {num_workers} workers...") with ThreadPoolExecutor(max_workers=num_workers) as ex: results = list(tqdm(ex.map(_verify_checksum, all_files), total=len(all_files))) return all(results) # The following class is based on functions from: # https://github.com/chimechallenge/chime6-synchronisation # We have made 2 changes to get some speed-up: # 1. We combine all channels in an array for applying frame drop correction. # 2. We apply multi-threading with 4 threads for clock drift correction. class Chime6ArraySynchronizer: """ Class for synchronizing CHiME6 array recordings. """ def __init__( self, corpus_dir: Pathlike, output_dir: Pathlike, sox_path: Pathlike = "sox", num_workers: int = 1, ) -> None: self.corpus_dir = Path(corpus_dir) self.output_dir = Path(output_dir) self.sox_path = Path(sox_path) self.num_workers = num_workers # Create output directory self.output_dir.mkdir(exist_ok=True, parents=True) # Download the audio edits JSON file audio_edits_json = self.output_dir / "audio_edits.json" resumable_download(CHIME6_AUDIO_EDITS_JSON, str(audio_edits_json)) with open(audio_edits_json) as f: self.audio_edits = dict(json.load(f)) # Check SOX path if not str(self.sox_path).endswith("sox"): self.sox_path = self.sox_path / "sox" def synchronize_session(self, session: str) -> None: """ Synchronize a single CHiME6 session. """ temp_dir = Path( tempfile.mkdtemp(prefix=f"chime6_{session}_", dir=self.output_dir) ) if session not in self.audio_edits: logging.warning(f"No audio edits found for session {session}") return session_audio_edits = self.audio_edits[session] print(f"Correcting {session} for frame drops...") self.correct_frame_drops(temp_dir, session, frame_drops=session_audio_edits) print(f"Correcting {session} for clock drift...") self.correct_clock_drift( temp_dir, session, linear_fit=session_audio_edits, num_threads=self.num_workers, ) print(f"Adjusting timestamps in {session} JSON files...") self.adjust_json_timestamps(session, linear_fit=session_audio_edits) # clean up shutil.rmtree(temp_dir) return def correct_frame_drops( self, output_dir: Pathlike, session: str, frame_drops: Optional[Dict[str, Any]] = None, ) -> None: # For binaural recordings, we just create symbolic links to the original files session_binaural_wavs = sorted( (self.corpus_dir / "audio").rglob(f"{session}_P*.wav") ) for wav in session_binaural_wavs: wav_relative_path = wav.relative_to(self.corpus_dir) wav_output_path = output_dir / wav_relative_path wav_output_path.parent.mkdir(exist_ok=True, parents=True) os.symlink(wav, wav_output_path) # For array recordings, we need to apply the edits. We first group the channels by # their corresponding array. For example, the wav names are like: S02_U01.CH2.wav session_array_wavs = sorted( (self.corpus_dir / "audio").rglob(f"{session}_U*.wav") ) array_wavs = defaultdict(list) for wav in session_array_wavs: array_id = wav.stem.split(".")[0].split("_")[-1] array_wavs[array_id].append(wav) # Then we apply the edits to each array for array_id, wavs in array_wavs.items(): if array_id not in frame_drops: logging.warning( f"Array {array_id} in session {session} has no frame drops information." ) continue in_wavs, out_wavs = [], [] for wav in wavs: wav_relative_path = wav.relative_to(self.corpus_dir) wav_output_path = output_dir / wav_relative_path in_wavs.append(wav) out_wavs.append(wav_output_path) # Apply the edits to the wavs self._apply_edits_to_wav(in_wavs, out_wavs, frame_drops[array_id]["edits"]) return def _apply_edits_to_wav( self, in_wavs: Pathlike, out_wavs: Pathlike, edits: List[List[int]] ) -> None: import soundfile as sf x = np.concatenate( [Recording.from_file(wav).load_audio() for wav in in_wavs], axis=0 ) # Pre-allocate space for editted signal max_space = edits[-1][2] + edits[-1][1] - edits[-1][0] x_new = np.zeros(shape=(x.shape[0], max_space), dtype=x.dtype) length_x = x.shape[1] for edit in edits: in_from = edit[0] in_to = min(edit[1], length_x) out_from = edit[2] out_to = out_from + in_to - in_from if in_from > length_x: break x_new[:, out_from - 1 : out_to] = x[:, in_from - 1 : in_to] # Trim off excess x_new = x_new[:, 0:out_to] # Write to file for i, wav in enumerate(out_wavs): sf.write( file=str(wav), data=np.expand_dims(x_new[i], axis=1), samplerate=16000, format="WAV", ) return def correct_clock_drift( self, corpus_dir: Pathlike, session: str, linear_fit: Optional[Dict[str, Any]] = None, num_threads: int = 1, ) -> None: session_wavs = sorted((corpus_dir / "audio").rglob(f"{session}_*.wav")) with ThreadPoolExecutor(max_workers=num_threads) as ex: futures = [] for wav in session_wavs: wav_relative_path = wav.relative_to(corpus_dir) wav_output_path = self.output_dir / wav_relative_path # Get binaural mic id or array id mic_id = wav.name.split(".")[0].split("_")[-1] if mic_id not in linear_fit: logging.warning( f"Channel {mic_id} in session {session} has no clock drift information." ) continue futures.append( ex.submit( self._apply_clock_drift_correction, wav, wav_output_path, linear_fit[mic_id], ) ) for future in tqdm(futures): future.result() return def _apply_clock_drift_correction( self, in_wav: Pathlike, out_wav: Pathlike, linear_fit: Dict[str, Union[int, float, List]], ) -> None: speeds = linear_fit["speed"] padding = linear_fit["padding"] filename = in_wav.stem sox_cmd = str(self.sox_path) in_wav = str(in_wav) out_wav = str(out_wav) if isinstance(speeds, list): # multisegment fit - only needed for S01_U02 and S01_U05 starts = padding ends = padding[1:] + [-1] # -1 means end of signal command_concat = [sox_cmd] samples_to_lose = 0 tmpdir = tempfile.mkdtemp(dir=self.output_dir) for seg, (start, end, speed) in enumerate(zip(starts, ends, speeds)): # print(seg, start, speed, end) of1 = tmpdir + "/" + filename + "." + str(seg) + ".wav" of2 = tmpdir + "/" + filename + ".x" + str(seg) + ".wav" command1 = [sox_cmd, "-D", "-R", in_wav, of1] if seg == 0: # 'start' has a different meaning for first segment # Acts like padding does in the simple one-piece case if start < 0: start = -start trim = ["trim", f"{start}s"] else: trim = ["pad", f"{start}s", "0s", "trim", "0s"] else: start += samples_to_lose # may need to truncate some samples trim = ["trim", f"{int(start)}s"] command1 += trim if end > 0: # segment of given duration duration = end - start command1 += [f"{duration}s"] if speed < 0: # Negative speed means backward in time, # Effectively have to remove some samples # This happen in S01_U05. samples_to_lose = -duration / speed else: samples_to_lose = 0 command2 = [sox_cmd, "-D", "-R", of1, of2, "speed", str(speed)] subprocess.call(command1) subprocess.call(command2) command_concat.append(of2) command_concat.append(out_wav) subprocess.call(command_concat) # Clean up shutil.rmtree(tmpdir) else: # The -R to suppress dithering so that command produces identical results each time command = [sox_cmd, "-D", "-R", in_wav, out_wav, "speed", str(speeds)] if padding > 0: # Change speed and pad command += ["pad", f"{padding}s", "0s"] else: # Change speed and trim command += ["trim", f"{-padding}s"] # Note, the order of speed vs trim/pad makes a difference # I believe sox actually applies the speed transform after the padding. # but speed is so close to 1 and the padding so short that it will # come out about the same either way around. logging.info(f"Running command: {' '.join(command)}") subprocess.call(command) return def adjust_json_timestamps( self, session: str, linear_fit: Optional[Dict[str, Any]] = None ) -> None: in_json = next((self.corpus_dir / "transcriptions").rglob(f"{session}.json")) relative_path = in_json.relative_to(self.corpus_dir) out_json = self.output_dir / relative_path corrected_utts = [] with open(in_json, "r") as fin, open(out_json, "w") as fout: data = json.load(fin) for segment in data: if "speaker" not in segment: continue pid = segment["speaker"] speed = linear_fit[pid]["speed"] padding = linear_fit[pid]["padding"] delta_t = padding / 16000 # convert to seconds start_time = ( TimeFormatConverter.hms_to_seconds( segment["start_time"]["original"] ) / speed + delta_t ) end_time = ( TimeFormatConverter.hms_to_seconds(segment["end_time"]["original"]) / speed + delta_t ) segment["start_time"] = TimeFormatConverter.seconds_to_hms(start_time) segment["end_time"] = TimeFormatConverter.seconds_to_hms(end_time) corrected_utts.append(segment) json.dump(corrected_utts, fout, indent=2) return