Source code for lhotse.recipes.ami

"""
The data preparation recipe for the AMI Meeting Corpus.

NOTE on data splits and references:

- The official AMI documentation (http://groups.inf.ed.ac.uk/ami/corpus/datasets.shtml) recommends
three different data partitions: scenario-only, full-corpus, and full-corpus-asr, based on the
task that the data is used for. We provide an argument `partition` which specifies which
partition is to be used.

- We use the latest version of the official annotations: ami_public_manual_1.6.2. This differs from
the Kaldi s5 and s5b recipes which use 1.6.1 (known to have alignment and annotation issues). We
get word-level annotations with time-marks and combine adjacent words into one segment if: (i) they
belong to the same speaker, and (ii) there is no pause between the words. (These supervisions can
later be modified to get larger super-segments based on the task)

NOTE on mic settings: AMI comes with 4 different microphone settings:

- ihm (individual headset microphone)
- sdm (single distant microphone)
- ihm-mix (mix-headset sum)
- mdm (multiple distant microphone)

These can be specified using the `mic` argument.
"""

import itertools
import logging
import os
import urllib.request
import xml.etree.ElementTree as ET
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, NamedTuple, Optional, Tuple, Union

from tqdm.auto import tqdm

from lhotse import validate_recordings_and_supervisions
from lhotse.audio import AudioSource, Recording, RecordingSet
from lhotse.qa import fix_manifests
from lhotse.recipes.utils import normalize_text_ami
from lhotse.supervision import AlignmentItem, SupervisionSegment, SupervisionSet
from lhotse.utils import Pathlike, Seconds, add_durations, resumable_download

# fmt: off
MEETINGS = {
    'EN2001': ['EN2001a', 'EN2001b', 'EN2001d', 'EN2001e'],
    'EN2002': ['EN2002a', 'EN2002b', 'EN2002c', 'EN2002d'],
    'EN2003': ['EN2003a'],
    'EN2004': ['EN2004a'],
    'EN2005': ['EN2005a'],
    'EN2006': ['EN2006a','EN2006b'],
    'EN2009': ['EN2009b','EN2009c','EN2009d'],
    'ES2002': ['ES2002a','ES2002b','ES2002c','ES2002d'],
    'ES2003': ['ES2003a','ES2003b','ES2003c','ES2003d'],
    'ES2004': ['ES2004a','ES2004b','ES2004c','ES2004d'],
    'ES2005': ['ES2005a','ES2005b','ES2005c','ES2005d'],
    'ES2006': ['ES2006a','ES2006b','ES2006c','ES2006d'],
    'ES2007': ['ES2007a','ES2007b','ES2007c','ES2007d'],
    'ES2008': ['ES2008a','ES2008b','ES2008c','ES2008d'],
    'ES2009': ['ES2009a','ES2009b','ES2009c','ES2009d'],
    'ES2010': ['ES2010a','ES2010b','ES2010c','ES2010d'],
    'ES2011': ['ES2011a','ES2011b','ES2011c','ES2011d'],
    'ES2012': ['ES2012a','ES2012b','ES2012c','ES2012d'],
    'ES2013': ['ES2013a','ES2013b','ES2013c','ES2013d'],
    'ES2014': ['ES2014a','ES2014b','ES2014c','ES2014d'],
    'ES2015': ['ES2015a','ES2015b','ES2015c','ES2015d'],
    'ES2016': ['ES2016a','ES2016b','ES2016c','ES2016d'],
    'IB4001': ['IB4001'],
    'IB4002': ['IB4002'],
    'IB4003': ['IB4003'],
    'IB4004': ['IB4004'],
    'IB4005': ['IB4005'],
    'IB4010': ['IB4010'],
    'IB4011': ['IB4011'],
    'IN1001': ['IN1001'],
    'IN1002': ['IN1002'],
    'IN1005': ['IN1005'],
    'IN1007': ['IN1007'],
    'IN1008': ['IN1008'],
    'IN1009': ['IN1009'],
    'IN1012': ['IN1012'],
    'IN1013': ['IN1013'],
    'IN1014': ['IN1014'],
    'IN1016': ['IN1016'],
    'IS1000': ['IS1000a','IS1000b','IS1000c','IS1000d'],
    'IS1001': ['IS1001a','IS1001b','IS1001c','IS1001d'],
    'IS1002': ['IS1002b','IS1002c','IS1002d'],
    'IS1003': ['IS1003a','IS1003b','IS1003c','IS1003d'],
    'IS1004': ['IS1004a','IS1004b','IS1004c','IS1004d'],
    'IS1005': ['IS1005a','IS1005b','IS1005c'],
    'IS1006': ['IS1006a','IS1006b','IS1006c','IS1006d'],
    'IS1007': ['IS1007a','IS1007b','IS1007c','IS1007d'],
    'IS1008': ['IS1008a','IS1008b','IS1008c','IS1008d'],
    'IS1009': ['IS1009a','IS1009b','IS1009c','IS1009d'],
    'TS3003': ['TS3003a','TS3003b','TS3003c','TS3003d'],
    'TS3004': ['TS3004a','TS3004b','TS3004c','TS3004d'],
    'TS3005': ['TS3005a','TS3005b','TS3005c','TS3005d'],
    'TS3006': ['TS3006a','TS3006b','TS3006c','TS3006d'],
    'TS3007': ['TS3007a','TS3007b','TS3007c','TS3007d'],
    'TS3008': ['TS3008a','TS3008b','TS3008c','TS3008d'],
    'TS3009': ['TS3009a','TS3009b','TS3009c','TS3009d'],
    'TS3010': ['TS3010a','TS3010b','TS3010c','TS3010d'],
    'TS3011': ['TS3011a','TS3011b','TS3011c','TS3011d'],
    'TS3012': ['TS3012a','TS3012b','TS3012c','TS3012d'],
}

PARTITIONS = {
    'scenario-only': {
        'train': [meeting for session in [
                'ES2002','ES2005','ES2006','ES2007','ES2008','ES2009','ES2010','ES2012','ES2013',
                'ES2015','ES2016','IS1000','IS1001','IS1002','IS1003','IS1004','IS1005','IS1006',
                'IS1007','TS3005','TS3008','TS3009','TS3010','TS3011','TS3012'
            ] for meeting in MEETINGS[session] if meeting not in ['IS1002a','IS1005d']],
        'dev': [meeting for session in [
                'ES2003','ES2011','IS1008','TS3004','TS3006'
            ] for meeting in MEETINGS[session]],
        'test': [meeting for session in [
                'ES2004','ES2014','IS1009','TS3003','TS3007'
            ] for meeting in MEETINGS[session]]
    },
    'full-corpus': {
        'train': [meeting for session in [
                'ES2002','ES2005','ES2006','ES2007','ES2008','ES2009','ES2010','ES2012','ES2013',
                'ES2015','ES2016','IS1000','IS1001','IS1002','IS1003','IS1004','IS1005','IS1006',
                'IS1007','TS3005','TS3008','TS3009','TS3010','TS3011','TS3012','EN2001','EN2003',
                'EN2004','EN2005','EN2006','EN2009','IN1001','IN1002','IN1005','IN1007','IN1008',
                'IN1009','IN1012','IN1013','IN1014','IN1016'
            ] for meeting in MEETINGS[session]],
        'dev': [meeting for session in [
                'ES2003','ES2011','IS1008','TS3004','TS3006','IB4001','IB4002','IB4003','IB4004',
                'IB4010','IB4011'
            ] for meeting in MEETINGS[session]],
        'test': [meeting for session in [
                'ES2004','ES2014','IS1009','TS3003','TS3007','EN2002'
            ] for meeting in MEETINGS[session]]
    },
    'full-corpus-asr': {
        'train': [meeting for session in [
                'ES2002','ES2005','ES2006','ES2007','ES2008','ES2009','ES2010','ES2012','ES2013',
                'ES2015','ES2016','IS1000','IS1001','IS1002','IS1003','IS1004','IS1005','IS1006',
                'IS1007','TS3005','TS3008','TS3009','TS3010','TS3011','TS3012','EN2001','EN2003',
                'EN2004','EN2005','EN2006','EN2009','IN1001','IN1002','IN1005','IN1007','IN1008',
                'IN1009','IN1012','IN1013','IN1014','IN1016','ES2014','TS3007','ES2003','TS3006'
            ] for meeting in MEETINGS[session]],
        'dev': [meeting for session in [
                'ES2011','IS1008','TS3004','IB4001','IB4002','IB4003','IB4004','IB4010','IB4011'
            ] for meeting in MEETINGS[session]],
        'test': [meeting for session in [
                'ES2004','IS1009','TS3003','EN2002'
            ] for meeting in MEETINGS[session]]
    }
}

MICS = ['ihm','ihm-mix','sdm','mdm','mdm8-bf']
MDM_ARRAYS = ['Array1','Array2']
MDM_CHANNELS = ['01','02','03','04','05','06','07','08']
# fmt: on


def download_audio(
    target_dir: Path,
    force_download: Optional[bool] = False,
    url: Optional[str] = "http://groups.inf.ed.ac.uk/ami",
    mic: Optional[str] = "ihm",
) -> None:
    # Audios
    for item in tqdm(
        itertools.chain.from_iterable(MEETINGS.values()),
        desc="Downloading AMI meetings",
    ):
        if mic == "ihm":
            headset_num = 5 if item in ("EN2001a", "EN2001d", "EN2001e") else 4
            for m in range(headset_num):
                wav_name = f"{item}.Headset-{m}.wav"
                wav_url = f"{url}/AMICorpusMirror/amicorpus/{item}/audio/{wav_name}"
                wav_dir = target_dir / "wav_db" / item / "audio"
                wav_dir.mkdir(parents=True, exist_ok=True)
                wav_path = wav_dir / wav_name
                resumable_download(
                    wav_url,
                    filename=wav_path,
                    force_download=force_download,
                )
        elif mic == "ihm-mix":
            wav_name = f"{item}.Mix-Headset.wav"
            wav_url = f"{url}/AMICorpusMirror/amicorpus/{item}/audio/{wav_name}"
            wav_dir = target_dir / "wav_db" / item / "audio"
            wav_dir.mkdir(parents=True, exist_ok=True)
            wav_path = wav_dir / wav_name
            resumable_download(
                wav_url, filename=wav_path, force_download=force_download
            )
        elif mic == "sdm":
            wav_name = f"{item}.Array1-01.wav"
            wav_url = f"{url}/AMICorpusMirror/amicorpus/{item}/audio/{wav_name}"
            wav_dir = target_dir / "wav_db" / item / "audio"
            wav_dir.mkdir(parents=True, exist_ok=True)
            wav_path = wav_dir / wav_name
            resumable_download(
                wav_url,
                filename=wav_path,
                force_download=force_download,
                missing_ok=True,
            )
        elif mic == "mdm":
            for array in MDM_ARRAYS:
                for channel in MDM_CHANNELS:
                    wav_name = f"{item}.{array}-{channel}.wav"
                    wav_url = f"{url}/AMICorpusMirror/amicorpus/{item}/audio/{wav_name}"
                    wav_dir = target_dir / "wav_db" / item / "audio"
                    wav_dir.mkdir(parents=True, exist_ok=True)
                    wav_path = wav_dir / wav_name
                    resumable_download(
                        wav_url,
                        filename=wav_path,
                        force_download=force_download,
                        missing_ok=True,
                    )
        elif mic == "mdm8-bf":
            wav_name = f"{item}_MDM8.wav"
            wav_url = f"{url}/AMICorpusMirror/amicorpus/beamformed/{item}/{wav_name}"
            wav_dir = target_dir / "wav_db" / item / "audio"
            wav_dir.mkdir(parents=True, exist_ok=True)
            wav_path = wav_dir / wav_name
            resumable_download(
                wav_url, filename=wav_path, force_download=force_download
            )


[docs] def download_ami( target_dir: Pathlike = ".", annotations: Optional[Pathlike] = None, force_download: Optional[bool] = False, url: Optional[str] = "http://groups.inf.ed.ac.uk/ami", mic: Optional[str] = "ihm", ) -> Path: """ Download AMI audio and annotations for provided microphone setting. Example usage: 1. Download AMI data for IHM mic setting: >>> download_ami(mic='ihm') 2. Download AMI data for IHM-mix mic setting, and use existing annotations: >>> download_ami(mic='ihm-mix', annotations='/path/to/existing/annotations.zip') :param target_dir: Pathlike, the path to store the data. :param annotations: Pathlike (default = None), path to save annotations zip file :param force_download: bool (default = False), if True, download even if file is present. :param url: str (default = 'http://groups.inf.ed.ac.uk/ami'), AMI download URL. :param mic: str {'ihm','ihm-mix','sdm','mdm','mdm8-bf'}, type of mic setting. :return: the path to downloaded and extracted directory with data. """ target_dir = Path(target_dir) annotations = ( target_dir / "ami_public_manual_1.6.2.zip" if not annotations else annotations ) # Audio download_audio(target_dir, force_download, url, mic) # Annotations logging.info("Downloading AMI annotations") if annotations.exists(): logging.info(f"Skip downloading annotations as they exist in: {annotations}") return target_dir annotations_url = f"{url}/AMICorpusAnnotations/ami_public_manual_1.6.2.zip" resumable_download(annotations_url, annotations, force_download=force_download) return target_dir
class AmiSegmentAnnotation(NamedTuple): text: str speaker: str gender: str start_time: Seconds end_time: Seconds words: List[AlignmentItem] def parse_ami_annotations( annotations_dir: Pathlike, normalize: str = "upper", max_words_per_segment: Optional[int] = None, merge_consecutive: bool = False, ) -> Dict[str, List[SupervisionSegment]]: # Extract if zipped file if str(annotations_dir).endswith(".zip"): import zipfile with zipfile.ZipFile(annotations_dir) as z: z.extractall(path=annotations_dir.parent) annotations_dir = annotations_dir.parent # First we get global speaker ids and channels global_spk_id = {} channel_id = {} with open(annotations_dir / "corpusResources" / "meetings.xml") as f: tree = ET.parse(f) for meeting in tree.getroot(): meet_id = meeting.attrib["observation"] for speaker in meeting: local_id = (meet_id, speaker.attrib["nxt_agent"]) global_spk_id[local_id] = speaker.attrib["global_name"] channel_id[local_id] = int(speaker.attrib["channel"]) # Get the speaker segment times from the segments file segments = {} for file in (annotations_dir / "segments").iterdir(): meet_id, local_spkid, _ = file.stem.split(".") if (meet_id, local_spkid) not in global_spk_id: logging.warning( f"No speaker {meet_id}.{local_spkid} found! Skipping" " annotation." ) continue spk = global_spk_id[(meet_id, local_spkid)] channel = channel_id[(meet_id, local_spkid)] key = (meet_id, spk, channel) segments[key] = [] with open(file) as f: tree = ET.parse(f) for seg in tree.getroot(): if seg.tag != "segment": continue start_time = float(seg.attrib["transcriber_start"]) end_time = float(seg.attrib["transcriber_end"]) segments[key].append((start_time, end_time)) # Now we go through each speaker's word-level annotations and store them words = {} for file in (annotations_dir / "words").iterdir(): meet_id, local_spkid, _ = file.stem.split(".") if (meet_id, local_spkid) not in global_spk_id: continue spk = global_spk_id[(meet_id, local_spkid)] channel = channel_id[(meet_id, local_spkid)] key = (meet_id, spk, channel) if key not in segments: continue words[key] = [] with open(file) as f: tree = ET.parse(f) for word in tree.getroot(): if word.tag != "w" or "starttime" not in word.attrib: continue start_time = float(word.attrib["starttime"]) end_time = float(word.attrib["endtime"]) words[key].append((start_time, end_time, word.text)) # Now we create segment-level annotations by combining the word-level # annotations with the speaker segment times. We also normalize the text # and break-up long segments (if requested). annotations = defaultdict(list) for key, segs in segments.items(): # Get the words for this speaker spk_words = words[key] # Now iterate over the speaker segments and create segment annotations for seg_start, seg_end in segs: seg_words = list( filter(lambda w: w[0] >= seg_start and w[1] <= seg_end, spk_words) ) subsegments = split_segment( seg_words, max_words_per_segment, merge_consecutive ) for subseg in subsegments: start = subseg[0][0] end = subseg[-1][1] word_alignments = [] for w in subseg: w_start = max(start, round(w[0], ndigits=4)) w_end = min(end, round(w[1], ndigits=4)) w_dur = add_durations(w_end, -w_start, sampling_rate=16000) w_symbol = normalize_text_ami(w[2], normalize=normalize) if len(w_symbol) == 0: continue if w_dur <= 0: logging.warning( f"Segment {key[0]}.{key[1]}.{key[2]} at time {start}-{end} " f"has a word with zero or negative duration. Skipping." ) continue word_alignments.append( AlignmentItem(start=w_start, duration=w_dur, symbol=w_symbol) ) text = " ".join(w.symbol for w in word_alignments) annotations[key].append( AmiSegmentAnnotation( text=text, speaker=key[1], gender=key[1][0], start_time=start, end_time=end, words=word_alignments, ) ) return annotations def split_segment( words: List[Tuple[float, float, str]], max_words_per_segment: Optional[int] = None, merge_consecutive: bool = False, ) -> List[List[Tuple[float, float, str]]]: """ Given a list of words, return a list of segments (each segment is a list of words) where each segment has at most max_words_per_segment words. If merge_consecutive is True, then consecutive segments with less than max_words_per_segment words will be merged together. """ def split_(sequence, sep): chunk = [] for val in sequence: if val[-1] == sep: if len(chunk) > 0: yield chunk chunk = [] else: chunk.append(val) if len(chunk) > 0: yield chunk def split_on_fullstop_(sequence): subsegs = list(split_(sequence, ".")) if len(subsegs) < 2: return subsegs # Set a large default value for max_words_per_segment if not provided max_segment_length = max_words_per_segment if max_words_per_segment else 100000 if merge_consecutive: # Merge consecutive subsegments if their length is less than max_words_per_segment merged_subsegs = [subsegs[0]] for subseg in subsegs[1:]: if ( merged_subsegs[-1][-1][1] == subseg[0][0] and len(merged_subsegs[-1]) + len(subseg) <= max_segment_length ): merged_subsegs[-1].extend(subseg) else: merged_subsegs.append(subseg) subsegs = merged_subsegs return subsegs def split_on_comma_(segment): # This function smartly splits a segment on commas such that the number of words # in each subsegment is as close to max_words_per_segment as possible. # First we create subsegments by splitting on commas subsegs = list(split_(segment, ",")) if len(subsegs) < 2: return subsegs # Now we merge subsegments while ensuring that the number of words in each # subsegment is less than max_words_per_segment merged_subsegs = [subsegs[0]] for subseg in subsegs[1:]: if len(merged_subsegs[-1]) + len(subseg) <= max_words_per_segment: merged_subsegs[-1].extend(subseg) else: merged_subsegs.append(subseg) return merged_subsegs # First we split the list based on full-stops. subsegments = list(split_on_fullstop_(words)) if max_words_per_segment is not None: # Now we split each subsegment based on commas to get at most max_words_per_segment # words per subsegment. subsegments = [ list(split_on_comma_(subseg)) if len(subseg) > max_words_per_segment else [subseg] for subseg in subsegments ] # flatten the list of lists subsegments = [item for sublist in subsegments for item in sublist] # Filter out empty subsegments subsegments = list(filter(lambda s: len(s) > 0, subsegments)) return subsegments # IHM and MDM audio requires grouping multiple channels of AudioSource into # one Recording. def prepare_audio_grouped( audio_paths: List[Pathlike], ) -> RecordingSet: import soundfile as sf # Group together multiple channels from the same session. # We will use that to create a Recording with multiple sources (channels). from cytoolz import groupby channel_wavs = groupby(lambda p: p.parts[-3], audio_paths) recordings = [] for session_name, channel_paths in tqdm( channel_wavs.items(), desc="Processing audio files" ): audio_sf = sf.SoundFile(str(channel_paths[0])) sources = [] all_mono = True for idx, audio_path in enumerate(sorted(channel_paths)): audio = sf.SoundFile(str(audio_path)) if audio.channels > 1: logging.warning( f"Skipping recording {session_name} since it has a stereo" " channel" ) all_mono = False break sources.append( AudioSource(type="file", channels=[idx], source=str(audio_path)) ) if not all_mono: continue recordings.append( Recording( id=session_name, sources=sources, sampling_rate=audio_sf.samplerate, num_samples=audio_sf.frames, duration=audio_sf.frames / audio_sf.samplerate, ) ) return RecordingSet.from_recordings(recordings) # SDM, IHM-Mix, and mdm8-bf settings do not require any grouping def prepare_audio_single( audio_paths: List[Pathlike], mic: Optional[str] = "ihm-mix", ) -> RecordingSet: import soundfile as sf recordings = [] for audio_path in tqdm(audio_paths, desc="Processing audio files"): session_name = ( audio_path.parts[-3] if mic != "mdm8-bf" else audio_path.parts[-2] ) audio_sf = sf.SoundFile(str(audio_path)) recordings.append( Recording( id=session_name, sources=[ AudioSource( type="file", channels=list(range(audio_sf.channels)), source=str(audio_path), ) ], sampling_rate=audio_sf.samplerate, num_samples=audio_sf.frames, duration=audio_sf.frames / audio_sf.samplerate, ) ) return RecordingSet.from_recordings(recordings) # For IHM mic, each headphone will have its own annotations, while for other mics # all sources have the same annotation def prepare_supervision_ihm( audio: RecordingSet, annotations: Dict[str, List[AmiSegmentAnnotation]] ) -> SupervisionSet: # Create a mapping from a tuple of (session_id, channel) to the list of annotations. # This way we can map the supervisions to the right channels in a multi-channel recording. annotation_by_id_and_channel = { (key[0], key[2]): annotations[key] for key in annotations } segments = [] for recording in tqdm(audio, desc="Preparing supervisions"): # AMI IHM can have multiple audio sources for each recording for source in recording.sources: # For each source, "channels" will always be a one-element list (channel,) = source.channels annotation = annotation_by_id_and_channel.get((recording.id, channel)) if annotation is None: logging.warning( f"No annotation found for recording {recording.id} " f"(file {source.source})" ) continue for seg_idx, seg_info in enumerate(annotation): duration = add_durations( seg_info.end_time, -seg_info.start_time, sampling_rate=16000 ) # Some annotations in IHM setting exceed audio duration, so we # ignore such segments if seg_info.end_time > recording.duration: logging.warning( f"Segment {recording.id}-{channel}-{seg_idx} exceeds " "recording duration. Not adding to supervisions." ) continue if duration > 0: segments.append( SupervisionSegment( id=f"{recording.id}-{channel}-{seg_idx}", recording_id=recording.id, start=round(seg_info.start_time, ndigits=4), duration=duration, channel=channel, language="English", speaker=seg_info.speaker, gender=seg_info.gender, text=seg_info.text, alignment={"word": seg_info.words}, ) ) return SupervisionSet.from_segments(segments) def prepare_supervision_other( audio: RecordingSet, annotations: Dict[str, List[AmiSegmentAnnotation]] ) -> SupervisionSet: annotation_by_id = defaultdict(list) for key, value in annotations.items(): annotation_by_id[key[0]].extend(value) segments = [] for recording in tqdm(audio, desc="Preparing supervisions"): annotation = annotation_by_id.get(recording.id) # In these mic settings, all sources (1 for ihm-mix, sdm, and mdm8-bf and 16 for mdm) # will share supervision. if annotation is None: logging.warning(f"No annotation found for recording {recording.id}") continue if any(len(source.channels) > 1 for source in recording.sources): logging.warning( f"More than 1 channels in recording {recording.id}. " "Skipping this recording." ) continue for seg_idx, seg_info in enumerate(annotation): duration = seg_info.end_time - seg_info.start_time if duration > 0: segments.append( SupervisionSegment( id=f"{recording.id}-{seg_idx}", recording_id=recording.id, start=seg_info.start_time, duration=duration, channel=recording.channel_ids, language="English", speaker=seg_info.speaker, gender=seg_info.gender, text=seg_info.text, alignment={"word": seg_info.words}, ) ) return SupervisionSet.from_segments(segments)
[docs] def prepare_ami( data_dir: Pathlike, annotations_dir: Optional[Pathlike] = None, output_dir: Optional[Pathlike] = None, mic: Optional[str] = "ihm", partition: Optional[str] = "full-corpus", normalize_text: str = "kaldi", max_words_per_segment: Optional[int] = None, merge_consecutive: bool = False, ) -> Dict[str, Dict[str, Union[RecordingSet, SupervisionSet]]]: """ Returns the manifests which consist of the Recordings and Supervisions :param data_dir: Pathlike, the path of the data dir. :param annotations: Pathlike, the path of the annotations dir or zip file. :param output_dir: Pathlike, the path where to write the manifests. :param mic: str {'ihm','ihm-mix','sdm','mdm','mdm8-bf'}, type of mic to use. :param partition: str {'full-corpus','full-corpus-asr','scenario-only'}, AMI official data split :param normalize_text: str {'none', 'upper', 'kaldi'} normalization of text :param max_words_per_segment: int, maximum number of words per segment. If not None, we will split longer segments similar to Kaldi's data prep scripts, i.e., split on full-stop and comma. :param merge_consecutive: bool, if True, merge consecutive segments split on full-stop. We will only merge segments if the number of words in the merged segment is less than max_words_per_segment. :return: a Dict whose key is ('train', 'dev', 'eval'), and the values are dicts of manifests under keys 'recordings' and 'supervisions'. Example usage: 1. Prepare IHM-Mix data for ASR: >>> manifests = prepare_ami('/path/to/ami-corpus', mic='ihm-mix', partition='full-corpus-asr') 2. Prepare SDM data: >>> manifests = prepare_ami('/path/to/ami-corpus', mic='sdm', partition='full-corpus') """ data_dir = Path(data_dir) assert data_dir.is_dir(), f"No such directory: {data_dir}" assert mic in MICS, f"Mic {mic} not supported" assert partition in PARTITIONS, f"Partition {partition} not supported" if output_dir is not None: output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) logging.info("Parsing AMI annotations") if not annotations_dir: if (data_dir / "ami_public_manual_1.6.2").is_dir(): annotations_dir = data_dir / "ami_public_manual_1.6.2" elif (data_dir / "ami_public_manual_1.6.2.zip").is_file(): annotations_dir = data_dir / "ami_public_manual_1.6.2.zip" else: raise ValueError( "No annotations directory specified and no zip file found in" f" {data_dir}" ) else: annotations_dir = Path(annotations_dir) # Prepare annotations which is a list of segment-level transcriptions annotations = parse_ami_annotations( annotations_dir, normalize=normalize_text, max_words_per_segment=max_words_per_segment, merge_consecutive=merge_consecutive, ) # Audio logging.info("Preparing recording manifests") wav_dir = data_dir if mic in ["ihm", "mdm"]: audio_paths = ( wav_dir.rglob("*Headset-?.wav") if mic == "ihm" else wav_dir.rglob("*Array?-0?.wav") ) audio = prepare_audio_grouped(list(audio_paths)) elif mic in ["ihm-mix", "sdm", "mdm8-bf"]: if mic == "ihm-mix": audio_paths = wav_dir.rglob("*Mix-Headset.wav") elif mic == "sdm": audio_paths = wav_dir.rglob("*Array1-01.wav") elif mic == "mdm8-bf": audio_paths = wav_dir.rglob("*MDM8.wav") audio = prepare_audio_single(list(audio_paths), mic) # Supervisions logging.info("Preparing supervision manifests") supervision = ( prepare_supervision_ihm(audio, annotations) if mic == "ihm" else prepare_supervision_other(audio, annotations) ) manifests = defaultdict(dict) dataset_parts = PARTITIONS[partition] for part in ["train", "dev", "test"]: # Get recordings for current data split audio_part = audio.filter(lambda x: x.id in dataset_parts[part]) supervision_part = supervision.filter( lambda x: x.recording_id in dataset_parts[part] ) audio_part, supervision_part = fix_manifests(audio_part, supervision_part) validate_recordings_and_supervisions(audio_part, supervision_part) # Write to output directory if a path is provided if output_dir is not None: audio_part.to_file(output_dir / f"ami-{mic}_recordings_{part}.jsonl.gz") supervision_part.to_file( output_dir / f"ami-{mic}_supervisions_{part}.jsonl.gz" ) # Combine all manifests into one dictionary manifests[part] = { "recordings": audio_part, "supervisions": supervision_part, } return dict(manifests)