"""
The data preparation recipe for the ICSI Meeting Corpus. It follows the Kaldi recipe
by Pawel Swietojanski:
https://git.informatik.fh-nuernberg.de/poppto72658/kaldi/-/commit/d5815d3255bb62eacf2fba6314f194fe09966453
ICSI data comprises around 72 hours of natural, meeting-style overlapped English speech
recorded at International Computer Science Institute (ICSI), Berkley.
Speech is captured using the set of parallel microphones, including close-talk headsets,
and several distant independent microhones (i.e. mics that do not form any explicitly
known geometry, see below for an example layout). Recordings are sampled at 16kHz.
The correponding paper describing the ICSI corpora is [1]
[1] A Janin, D Baron, J Edwards, D Ellis, D Gelbart, N Morgan, B Peskin,
T Pfau, E Shriberg, A Stolcke, and C Wooters, The ICSI meeting corpus.
in Proc IEEE ICASSP, 2003, pp. 364-367
ICSI data did not come with any pre-defined splits for train/valid/eval sets as it was
mostly used as a training material for NIST RT evaluations. Some portions of the unrelased ICSI
data (as a part of this corpora) can be found in, for example, NIST RT04 amd RT05 evaluation sets.
This recipe, however, to be self-contained factors out training (67.5 hours), development (2.2 hours
and evaluation (2.8 hours) sets in a way to minimise the speaker-overlap between different partitions,
and to avoid known issues with available recordings during evaluation. This recipe follows [2] where
dev and eval sets are making use of {Bmr021, Bns001} and {Bmr013, Bmr018, Bro021} meetings, respectively.
[2] S Renals and P Swietojanski, Neural networks for distant speech recognition.
in Proc IEEE HSCMA 2014 pp. 172-176. DOI:10.1109/HSCMA.2014.6843274
Below description is (mostly) copied from ICSI documentation for convenience.
=================================================================================
Simple diagram of the seating arrangement in the ICSI meeting room.
The ordering of seat numbers is as specified below, but their
alignment with microphones may not always be as precise as indicated
here. Also, the seat number only indicates where the participant
started the meeting. Since most of the microphones are wireless, they
were able to move around.
Door
1 2 3 4
-----------------------------------------------------------------------
| | | | S
| | | | c
| | | | r
9| D1 D2 | D3 PDA D4 | | e
| | | | e
| | | | n
| | | |
-----------------------------------------------------------------------
8 7 6 5
D1, D2, D3, D4 - Desktop PZM microphones
PDA - The mockup PDA with two cheap microphones
The following are the TYPICAL channel assignments, although a handful
of meetings (including Bmr003, Btr001, Btr002) differed in assignment.
The mapping from the above, to the actual waveform channels in the corpora,
and (this recipe for a signle distant mic case) is:
D1 - chanE - (this recipe: sdm3)
D2 - chanF - (this recipe: sdm4)
D3 - chan6 - (this recipe: sdm1)
D4 - chan7 - (this recipe: sdm2)
PDA left - chanC
PDA right - chanD
-----------
Note (Pawel): The mapping for headsets is being extracted from mrt files.
In cases where IHM channels are missing for some speakers in some meetings,
in this recipe we either back off to distant channel (typically D2, default)
or (optionally) skip this speaker's segments entirely from processing.
This is not the case for eval set, where all the channels come with the
expected recordings, and split is the same for all conditions (thus allowing
for direct comparisons between IHM, SDM and MDM settings).
NOTE on data: The ICSI data is freely available from the website (see `download` below)
and also as LDC corpora. The annotations that we download below are same as
LDC2004T04, but there are some differences in the audio data, specifically in the
session names. Some sessions (Bns...) are named (bns...) in the LDC corpus, and the
Mix-Headset wav files are not available from the LDC corpus. So we recommend downloading
the public version even if you have an LDC subscription. The public data also includes
annotations of roles, dialog, summary etc. but we have not included them in this recipe.
"""
import itertools
import logging
import urllib
import xml.etree.ElementTree as ET
import zipfile
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, NamedTuple, Optional, Tuple, Union
from tqdm.auto import tqdm
from lhotse import validate_recordings_and_supervisions
from lhotse.audio import AudioSource, Recording, RecordingSet
from lhotse.audio.backend import read_sph
from lhotse.qa import fix_manifests
from lhotse.recipes.utils import normalize_text_ami
from lhotse.supervision import AlignmentItem, SupervisionSegment, SupervisionSet
from lhotse.utils import Pathlike, Seconds, add_durations, resumable_download
# fmt:off
PARTITIONS = {
'train': [
"Bdb001", "Bed002", "Bed003", "Bed004", "Bed005", "Bed006", "Bed008", "Bed009",
"Bed010", "Bed011", "Bed012", "Bed013", "Bed014", "Bed015", "Bed016", "Bed017",
"Bmr001", "Bmr002", "Bmr003", "Bmr005", "Bmr006", "Bmr007", "Bmr008", "Bmr009",
"Bmr010", "Bmr011", "Bmr012", "Bmr014", "Bmr015", "Bmr016", "Bmr019", "Bmr020",
"Bmr022", "Bmr023", "Bmr024", "Bmr025", "Bmr026", "Bmr027", "Bmr028", "Bmr029",
"Bmr030", "Bmr031", "Bns002", "Bns003", "Bro003", "Bro004", "Bro005", "Bro007",
"Bro008", "Bro010", "Bro011", "Bro012", "Bro013", "Bro014", "Bro015", "Bro016",
"Bro017", "Bro018", "Bro019", "Bro022", "Bro023", "Bro024", "Bro025", "Bro026",
"Bro027", "Bro028", "Bsr001", "Btr001", "Btr002", "Buw001",
],
'dev': ["Bmr021", "Bns001"],
'test': ["Bmr013", "Bmr018", "Bro021"]
}
MIC_TO_CHANNELS = {
"ihm": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B"],
"sdm": ["6"],
"mdm": ["E", "F", "6", "7"],
"ihm-mix": [],
}
# fmt:on
def download_audio(
target_dir: Path,
force_download: Optional[bool] = False,
url: Optional[str] = "http://https://groups.inf.ed.ac.uk/ami",
mic: Optional[str] = "ihm",
) -> None:
# Audios
for item in tqdm(
itertools.chain.from_iterable(PARTITIONS.values()),
desc="Downloading ICSI meetings",
):
if mic in ["ihm", "sdm", "mdm"]:
for channel in MIC_TO_CHANNELS[mic]:
wav_url = f"{url}/ICSIsignals/SPH/{item}/chan{channel}.sph"
wav_dir = target_dir / item
wav_dir.mkdir(parents=True, exist_ok=True)
wav_path = wav_dir / f"chan{channel}.sph"
try:
resumable_download(
wav_url, filename=wav_path, force_download=force_download
)
except urllib.error.HTTPError as e:
logging.warning(f"Skipping failed download from {wav_url}")
else:
wav_url = f"{url}/ICSIsignals/NXT/{item}.interaction.wav"
wav_dir = target_dir / item
wav_dir.mkdir(parents=True, exist_ok=True)
wav_path = wav_dir / f"Mix-Headset.wav"
resumable_download(
wav_url, filename=wav_path, force_download=force_download
)
[docs]
def download_icsi(
target_dir: Pathlike = ".",
audio_dir: Optional[Pathlike] = None,
transcripts_dir: Optional[Pathlike] = None,
force_download: Optional[bool] = False,
url: Optional[str] = "http://groups.inf.ed.ac.uk/ami",
mic: Optional[str] = "ihm",
) -> Path:
"""
Download ICSI audio and annotations for provided microphone setting.
:param target_dir: Pathlike, the path in which audio and transcripts dir are created by default.
:param audio_dir: Pathlike (default = '<target_dir>/audio'), the path to store the audio data.
:param transcripts_dir: Pathlike (default = '<target_dir>/transcripts'), path to store the transcripts data
:param force_download: bool (default = False), if True, download even if file is present.
:param url: str (default = 'http://groups.inf.ed.ac.uk/ami'), download URL.
:param mic: str {'ihm','ihm-mix','sdm','mdm'}, type of mic setting.
:return: the path to downloaded and extracted directory with data.
"""
target_dir = Path(target_dir)
audio_dir = Path(audio_dir) if audio_dir else target_dir / "speech"
transcripts_dir = (
Path(transcripts_dir) if transcripts_dir else target_dir / "transcripts"
)
# Audio
download_audio(audio_dir, force_download, url, mic)
# Annotations
logging.info("Downloading ICSI annotations")
if transcripts_dir.exists() and not force_download:
logging.info(
f"Skip downloading transcripts as they exist in: {transcripts_dir}"
)
return target_dir
# We need the MRT transcripts for the speaker-to-channel mapping. The NXT transcripts
# are used for the actual annotations (since they contain word alignments)
annotations_url_mrt = f"{url}/ICSICorpusAnnotations/ICSI_original_transcripts.zip"
annotations_url_nxt = f"{url}/ICSICorpusAnnotations/ICSI_core_NXT.zip"
resumable_download(
annotations_url_mrt,
filename=target_dir / "ICSI_original_transcripts.zip",
force_download=force_download,
)
resumable_download(
annotations_url_nxt,
filename=target_dir / "ICSI_core_NXT.zip",
force_download=force_download,
)
with zipfile.ZipFile(target_dir / "ICSI_core_NXT.zip") as z:
# Unzips transcripts to <target_dir>/'transcripts'
# zip file also contains some documentation which will be unzipped to <target_dir>
z.extractall(target_dir)
# If custom dir is passed, rename 'transcripts' dir accordingly
if transcripts_dir:
Path(target_dir / "transcripts").rename(transcripts_dir)
# From the MRT transcripts, we only need the transcripts/preambles.mrt file
with zipfile.ZipFile(target_dir / "ICSI_original_transcripts.zip") as z:
z.extract("transcripts/preambles.mrt", transcripts_dir)
return target_dir
class IcsiSegmentAnnotation(NamedTuple):
text: str
speaker: str
gender: str
start_time: Seconds
end_time: Seconds
words: List[AlignmentItem]
def parse_icsi_annotations(
transcripts_dir: Pathlike, normalize: str = "upper"
) -> Tuple[
Dict[Tuple[str, str, str], List[SupervisionSegment]], Dict[str, Dict[str, int]]
]:
annotations = defaultdict(list)
# In Lhotse, channels are integers, so we map channel ids to integers for each session
channel_to_idx_map = defaultdict(dict)
spk_to_channel_map = defaultdict(dict)
# First we get global speaker ids and channels
with open(transcripts_dir / "preambles.mrt") as f:
root = ET.parse(f).getroot() # <Meetings>
for child in root:
if child.tag == "Meeting":
meeting_id = child.attrib["Session"]
for grandchild in child:
if grandchild.tag == "Preamble":
for greatgrandchild in grandchild:
if greatgrandchild.tag == "Channels":
channel_to_idx_map[meeting_id] = {
channel.attrib["Name"]: idx
for idx, channel in enumerate(greatgrandchild)
}
elif greatgrandchild.tag == "Participants":
for speaker in greatgrandchild:
# some speakers may not have an associated channel in some meetings, so we
# assign them the SDM channel
spk_to_channel_map[meeting_id][
speaker.attrib["Name"]
] = (
speaker.attrib["Channel"]
if "Channel" in speaker.attrib
else "chan6"
)
# Get the speaker segment times from the segments file
segments = {}
for file in (transcripts_dir / "Segments").glob("*.xml"):
meet_id, local_id, _ = file.stem.split(".")
spk_segments = []
spk_id = None
with open(file) as f:
tree = ET.parse(f)
for seg in tree.getroot():
if seg.tag != "segment":
continue
if spk_id is None and "participant" in seg.attrib:
spk_id = seg.attrib["participant"]
start_time = float(seg.attrib["starttime"])
end_time = float(seg.attrib["endtime"])
spk_segments.append((start_time, end_time))
if spk_id is None or len(spk_segments) == 0:
continue
key = (meet_id, local_id)
channel = spk_to_channel_map[meet_id][spk_id]
segments[key] = (spk_id, channel, spk_segments)
# Now we go through each speaker's word-level annotations and store them
words = {}
for file in (transcripts_dir / "Words").glob("*.xml"):
meet_id, local_id, _ = file.stem.split(".")
key = (meet_id, local_id)
if key not in segments:
continue
else:
spk_id, channel, spk_segments = segments[key]
seg_words = []
combine_with_next = False
with open(file) as f:
tree = ET.parse(f)
for i, word in enumerate(tree.getroot()):
if (
word.tag != "w"
or "starttime" not in word.attrib
or word.attrib["starttime"] == ""
or "endtime" not in word.attrib
or word.attrib["endtime"] == ""
):
continue
start_time = float(word.attrib["starttime"])
end_time = float(word.attrib["endtime"])
seg_words.append((start_time, end_time, word.text))
words[key] = (spk_id, channel, seg_words)
# Now we create segment-level annotations by combining the word-level
# annotations with the speaker segment times. We also normalize the text
# (if requested). The annotations is a dict indexed by (meeting_id, spk_id, channel).
annotations = defaultdict(list)
for key, (spk_id, channel, spk_segments) in segments.items():
# Get the words for this speaker
_, _, spk_words = words[key]
new_key = (key[0], spk_id, channel)
# Now iterate over the speaker segments and create segment annotations
for seg_start, seg_end in spk_segments:
seg_words = list(
filter(lambda w: w[0] >= seg_start and w[1] <= seg_end, spk_words)
)
if len(seg_words) == 0:
continue
start = seg_words[0][0]
end = seg_words[-1][1]
word_alignments = []
for w in seg_words:
w_start = max(start, round(w[0], ndigits=4))
w_end = min(end, round(w[1], ndigits=4))
w_dur = add_durations(w_end, -w_start, sampling_rate=16000)
w_symbol = normalize_text_ami(w[2], normalize=normalize)
if len(w_symbol) == 0:
continue
if w_dur <= 0:
logging.warning(
f"Segment {key[0]}.{spk_id}.{channel} at time {start}-{end} "
f"has a word with zero or negative duration. Skipping."
)
continue
word_alignments.append(
AlignmentItem(start=w_start, duration=w_dur, symbol=w_symbol)
)
text = " ".join(w.symbol for w in word_alignments)
annotations[new_key].append(
IcsiSegmentAnnotation(
text=text,
speaker=spk_id,
gender=spk_id[0],
start_time=start,
end_time=end,
words=word_alignments,
)
)
return annotations, channel_to_idx_map
# IHM and MDM audio requires grouping multiple channels of AudioSource into
# one Recording.
def prepare_audio_grouped(
audio_paths: List[Pathlike],
channel_to_idx_map: Dict[str, Dict[str, int]] = None,
save_to_wav: bool = False,
output_dir: Pathlike = None,
) -> RecordingSet:
# Group together multiple channels from the same session.
# We will use that to create a Recording with multiple sources (channels).
import soundfile as sf
from cytoolz import groupby
channel_wavs = groupby(lambda p: p.parts[-2], audio_paths)
if channel_to_idx_map is None:
channel_to_idx_map = defaultdict(dict)
recordings = []
for session_name, channel_paths in tqdm(
channel_wavs.items(), desc="Preparing audio"
):
if session_name not in channel_to_idx_map:
channel_to_idx_map[session_name] = {
c: idx for idx, c in enumerate(["chanE", "chanF", "chan6", "chan7"])
}
audio_sf, samplerate = read_sph(channel_paths[0])
if save_to_wav:
session_dir = Path(output_dir) / "wavs" / session_name
session_dir.mkdir(parents=True, exist_ok=True)
for i, audio_path in enumerate(channel_paths):
audio, _ = read_sph(audio_path)
wav_path = session_dir / f"{audio_path.stem}.wav"
sf.write(wav_path, audio.T, samplerate)
# Replace the sph path with the wav path
channel_paths[i] = wav_path
recordings.append(
Recording(
id=session_name,
sources=[
AudioSource(
type="file",
channels=[channel_to_idx_map[session_name][audio_path.stem]],
source=str(audio_path),
)
for audio_path in sorted(channel_paths)
if audio_path.stem in channel_to_idx_map[session_name]
],
sampling_rate=samplerate,
num_samples=audio_sf.shape[1],
duration=audio_sf.shape[1] / samplerate,
)
)
return RecordingSet.from_recordings(recordings)
# SDM and IHM-Mix settings do not require any grouping
def prepare_audio_single(
audio_paths: List[Pathlike],
save_to_wav: bool = False,
output_dir: Pathlike = None,
) -> RecordingSet:
import soundfile as sf
recordings = []
for audio_path in tqdm(audio_paths, desc="Preparing audio"):
session_name = audio_path.parts[-2]
if audio_path.suffix == ".wav":
audio_sf = sf.SoundFile(audio_path)
num_frames = audio_sf.frames
num_channels = audio_sf.channels
samplerate = audio_sf.samplerate
else:
audio_sf, samplerate = read_sph(audio_path)
num_channels, num_frames = audio_sf.shape
if save_to_wav:
session_dir = Path(output_dir) / "wavs" / session_name
session_dir.mkdir(parents=True, exist_ok=True)
wav_path = session_dir / f"{audio_path.stem}.wav"
sf.write(wav_path, audio_sf.T, samplerate)
# Replace the sph path with the wav path
audio_path = wav_path
recordings.append(
Recording(
id=session_name,
sources=[
AudioSource(
type="file",
channels=list(range(num_channels)),
source=str(audio_path),
)
],
sampling_rate=samplerate,
num_samples=num_frames,
duration=num_frames / samplerate,
)
)
return RecordingSet.from_recordings(recordings)
# For IHM mic, each headphone will have its own annotations, while for other mics
# all sources have the same annotation
def prepare_supervision_ihm(
audio: RecordingSet,
annotations: Dict[str, List[IcsiSegmentAnnotation]],
channel_to_idx_map: Dict[str, Dict[str, int]],
) -> SupervisionSet:
# Create a mapping from a tuple of (session_id, channel) to the list of annotations.
# This way we can map the supervisions to the right channels in a multi-channel recording.
annotation_by_id_and_channel = {
(key[0], channel_to_idx_map[key[0]][key[2]]): annotations[key]
for key in annotations
}
segments = []
for recording in tqdm(audio, desc="Preparing supervision"):
# IHM can have multiple audio sources for each recording
for source in recording.sources:
# For each source, "channels" will always be a one-element list
(channel,) = source.channels
annotation = annotation_by_id_and_channel.get((recording.id, channel))
if annotation is None:
continue
for seg_idx, seg_info in enumerate(annotation):
duration = seg_info.end_time - seg_info.start_time
# Some annotations in IHM setting exceed audio duration, so we
# ignore such segments
if seg_info.end_time > recording.duration:
logging.warning(
f"Segment {recording.id}-{channel}-{seg_idx} exceeds "
f"recording duration. Not adding to supervisions."
)
continue
if duration > 0:
segments.append(
SupervisionSegment(
id=f"{recording.id}-{channel}-{seg_idx}",
recording_id=recording.id,
start=seg_info.start_time,
duration=duration,
channel=channel,
language="English",
speaker=seg_info.speaker,
gender=seg_info.gender,
text=seg_info.text,
alignment={"word": seg_info.words},
)
)
return SupervisionSet.from_segments(segments)
def prepare_supervision_other(
audio: RecordingSet, annotations: Dict[str, List[IcsiSegmentAnnotation]]
) -> SupervisionSet:
annotation_by_id = defaultdict(list)
for key, value in annotations.items():
annotation_by_id[key[0]].extend(value)
segments = []
for recording in tqdm(audio, desc="Preparing supervision"):
annotation = annotation_by_id.get(recording.id)
# In these mic settings, all sources will share supervision.
source = recording.sources[0]
if annotation is None:
logging.warning(f"No annotation found for recording {recording.id}")
continue
if len(source.channels) > 1:
logging.warning(
f"More than 1 channels in recording {recording.id}. "
f"Skipping this recording."
)
continue
for seg_idx, seg_info in enumerate(annotation):
duration = seg_info.end_time - seg_info.start_time
if duration > 0:
segments.append(
SupervisionSegment(
id=f"{recording.id}-{seg_idx}",
recording_id=recording.id,
start=seg_info.start_time,
duration=duration,
channel=recording.channel_ids,
language="English",
speaker=seg_info.speaker,
gender=seg_info.gender,
text=seg_info.text,
alignment={"word": seg_info.words},
)
)
return SupervisionSet.from_segments(segments)
[docs]
def prepare_icsi(
audio_dir: Pathlike,
transcripts_dir: Optional[Pathlike] = None,
output_dir: Optional[Pathlike] = None,
mic: Optional[str] = "ihm",
normalize_text: str = "kaldi",
save_to_wav: bool = False,
) -> Dict[str, Dict[str, Union[RecordingSet, SupervisionSet]]]:
"""
Returns the manifests which consist of the Recordings and Supervisions
:param audio_dir: Pathlike, the path which holds the audio data
:param transcripts_dir: Pathlike, the path which holds the transcripts data
:param output_dir: Pathlike, the path where to write the manifests - `None` means manifests aren't stored on disk.
:param mic: str {'ihm','ihm-mix','sdm','mdm'}, type of mic to use.
:param normalize_text: str {'none', 'upper', 'kaldi'} normalization of text
:param save_to_wav: bool, whether to save the sph audio to wav format
:return: a Dict whose key is ('train', 'dev', 'test'), and the values are dicts of manifests under keys
'recordings' and 'supervisions'.
"""
audio_dir = Path(audio_dir)
transcripts_dir = (
Path(transcripts_dir)
if transcripts_dir is not None
else audio_dir / "transcripts"
)
assert audio_dir.is_dir(), f"No such directory: {audio_dir}"
assert transcripts_dir.is_dir(), f"No such directory: {transcripts_dir}"
assert mic in MIC_TO_CHANNELS.keys(), f"Mic {mic} not supported"
if save_to_wav:
assert output_dir is not None, "output_dir must be specified when saving to wav"
if output_dir is not None:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
logging.info("Parsing ICSI transcripts")
annotations, channel_to_idx_map = parse_icsi_annotations(
transcripts_dir, normalize=normalize_text
)
# Audio
logging.info("Preparing recording manifests")
channels = "".join(MIC_TO_CHANNELS[mic])
if mic == "ihm" or mic == "mdm":
audio_paths = audio_dir.rglob(f"chan[{channels}].sph")
audio = prepare_audio_grouped(
list(audio_paths),
channel_to_idx_map if mic == "ihm" else None,
save_to_wav,
output_dir,
)
elif mic == "sdm" or mic == "ihm-mix":
audio_paths = (
audio_dir.rglob(f"chan[{channels}].sph")
if len(channels)
else audio_dir.rglob("*.wav")
)
audio = prepare_audio_single(list(audio_paths), save_to_wav, output_dir)
# Supervisions
logging.info("Preparing supervision manifests")
supervision = (
prepare_supervision_ihm(audio, annotations, channel_to_idx_map)
if mic == "ihm"
else prepare_supervision_other(audio, annotations)
)
manifests = defaultdict(dict)
for part in ["train", "dev", "test"]:
# Get recordings for current data split
audio_part = audio.filter(lambda x: x.id in PARTITIONS[part])
supervision_part = supervision.filter(
lambda x: x.recording_id in PARTITIONS[part]
)
audio_part, supervision_part = fix_manifests(audio_part, supervision_part)
validate_recordings_and_supervisions(audio_part, supervision_part)
# Write to output directory if a path is provided
if output_dir is not None:
audio_part.to_file(output_dir / f"icsi-{mic}_recordings_{part}.jsonl.gz")
supervision_part.to_file(
output_dir / f"icsi-{mic}_supervisions_{part}.jsonl.gz"
)
# Combine all manifests into one dictionary
manifests[part] = {"recordings": audio_part, "supervisions": supervision_part}
return dict(manifests)