Source code for lhotse.recipes.mtedx

"""
MTEDx is a collection of transcribed and translated speech corpora:
 https://openslr.org/100

It has 8 languages:
es - 189h
fr - 189h
pt - 164h
it - 107h
ru - 53h
el - 30h
ar - 18h
de - 15h

A subset of this audio is translated and split into the following partitions:
     - train
     - dev
     - test
     - iwslt2021 sets

This recipe only prepares the ASR portion of the data.
"""
import logging
import re
import tarfile
import unicodedata
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict, Optional, Sequence, Union

from tqdm.auto import tqdm

from lhotse import (
    Recording,
    RecordingSet,
    SupervisionSegment,
    SupervisionSet,
    validate_recordings_and_supervisions,
)
from lhotse.qa import fix_manifests
from lhotse.utils import Pathlike, is_module_available, resumable_download, safe_extract

# Keep Markings such as vowel signs, all letters, and decimal numbers
VALID_CATEGORIES = ("Mc", "Mn", "Ll", "Lm", "Lo", "Lt", "Lu", "Nd", "Zs")
KEEP_LIST = ["\u2019"]


ASR = (
    "es",
    "fr",
    "pt",
    "it",
    "ru",
    "el",
    "ar",
    "de",
)


ISOCODE2LANG = {
    "fr": "French",
    "es": "Spanish",
    "pt": "Portuguese",
    "it": "Italian",
    "ru": "Russian",
    "el": "Greek",
    "ar": "Arabic",
    "de": "German",
}

###############################################################################
#                             Download and Untar
###############################################################################
[docs] def download_mtedx( target_dir: Pathlike = ".", languages: Optional[Union[str, Sequence[str]]] = "all", ) -> Path: """ Download and untar the dataset. :param: target_dir: Pathlike, the path of the directory where the mtdex_corpus directory will be created and to which data will be downloaded. :param: languages: A str or sequence of strings specifying which languages to download. The default 'all', downloads all available languages. :return: the path to downloaded and extracted directory with data. """ target_dir = Path(target_dir) / "mtedx_corpus" target_dir.mkdir(parents=True, exist_ok=True) langs_list = list(ISOCODE2LANG.keys()) # If for some reason languages = None, assume this also means 'all' if isinstance(languages, str) and languages != "all": langs_list = [languages] elif isinstance(languages, list) or isinstance(languages, tuple): langs_list = languages for lang in tqdm(langs_list, "Downloading MTEDx languages"): tar_path = target_dir / f"{lang}-{lang}.tgz" completed_detector = target_dir / f".{lang}.completed" if completed_detector.is_file(): logging.info(f"Skipping {lang} because {completed_detector} exists.") continue resumable_download( f"http://www.openslr.org/resources/100/mtedx_{lang}.tgz", filename=tar_path ) with tarfile.open(tar_path) as tar: safe_extract(tar, path=target_dir) completed_detector.touch() return target_dir
############################################################################### # Prepare MTEDx ###############################################################################
[docs] def prepare_mtedx( corpus_dir: Pathlike, output_dir: Optional[Pathlike] = None, languages: Optional[Union[str, Sequence[str]]] = "all", num_jobs: int = 1, ) -> Dict[str, Dict[str, Dict[str, Union[RecordingSet, SupervisionSet]]]]: """ Prepares manifest of all MTEDx languages requested. :param corpus_dir: Path to the root where MTEDx data was downloaded. It should be called mtedx_corpus. :param output_dir: Root directory where .json manifests are stored. :param languages: str or str sequence specifying the languages to prepare. The str 'all' prepares all languages. :return: """ # Resolve corpus_dir type if isinstance(corpus_dir, str): corpus_dir = Path(corpus_dir) # Resolve output_dir type if isinstance(output_dir, str): output_dir = Path(output_dir) langs_list = list(ISOCODE2LANG.keys()) # If for some reason languages = None, assume this also means 'all' if isinstance(languages, str) and languages != "all": langs_list = [languages] elif isinstance(languages, list) or isinstance(languages, tuple): if languages[0] != "all": langs_list = languages manifests = defaultdict(dict) for lang in langs_list: corpus_dir_lang = corpus_dir / f"{lang}-{lang}" output_dir_lang = output_dir / f"{lang}" if corpus_dir_lang.is_dir(): manifests[lang] = prepare_single_mtedx_language( corpus_dir_lang, output_dir_lang, language=lang, num_jobs=num_jobs, ) return dict(manifests)
############################################################################### # All remaining functions are just helper functions, mainly for text # normalization and parsing the vtt files that come with the mtedx corpus ############################################################################### # Prepare data for a single language def prepare_single_mtedx_language( corpus_dir: Pathlike, output_dir: Optional[Pathlike] = None, language: str = "language", num_jobs: int = 1, ) -> Dict[str, Dict[str, Union[RecordingSet, SupervisionSet]]]: """ Prepares manifests using a single MTEDx language. This function works as follows: - First it looks for the audio directory in the data/wav where the .flac files are stored. - Then, it looks for the vtt directory in data/{train,dev,test}/vtt which contains the segmentation and transcripts for the audio. - The transcripts undergo some basic text normalization :param corpus_dir: Path to the root of the MTEDx download :param output_dir: Path where the manifests are stored as .json files :param language: The two-letter language code. :param num_jobs: Number of threads to use when preparing data. :return: """ if isinstance(corpus_dir, str): corpus_dir = Path(corpus_dir) manifests = defaultdict(dict) with ThreadPoolExecutor(num_jobs) as ex: for split in ("train", "valid", "test"): audio_dir = corpus_dir / f"data/{split}/wav" recordings = RecordingSet.from_recordings( Recording.from_file(p) for p in audio_dir.glob("*.flac") ) if len(recordings) == 0: logging.warning(f"No .flac files found in {audio_dir}") supervisions = [] text_dir = corpus_dir / f"data/{split}/vtt" futures = [] for p in text_dir.glob("*"): futures.append(ex.submit(_filename_to_supervisions, p, language)) for future in tqdm(futures, desc="Processing", leave=False): result = future.result() if result is None: continue for sup in result: supervisions.append(sup) if len(supervisions) == 0: logging.warning(f"No supervisions found in {text_dir}") supervisions = SupervisionSet.from_segments(supervisions) recordings, supervisions = fix_manifests(recordings, supervisions) validate_recordings_and_supervisions(recordings, supervisions) manifests[split] = { "recordings": recordings, "supervisions": supervisions, } if output_dir is not None: if isinstance(output_dir, str): output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) save_split = "dev" if split == "valid" else split recordings.to_file( output_dir / f"mtedx-{language}_recordings_{split}.jsonl.gz" ) supervisions.to_file( output_dir / f"mtedx-{language}_supervisions_{split}.jsonl.gz" ) return dict(manifests) # Function that takes in the vtt file name (Path object) and returns a # list of SUpervisionSegments def _filename_to_supervisions(filename: Path, language: str): lines = filename.read_text() recoid = filename.stem.split(".")[0] supervisions = [] filterfun = partial(_filter_word) for start, end, line in _parse_vtt(lines, "<noise>"): line_list = [] for w in line.split(): w_ = w.strip() if re.match(r"^(\([^)]*\) *)+$", w_): line_list.append(w_) elif filterfun(w): line_list.append(w_) else: line_list.append("<unk>") line_ = " ".join(line_list) if re.match(r"^\w+ *(<[^>]*> *)+$", line_, re.UNICODE): line_new = line_.strip() elif "<" in line_ or ">" in line_: continue else: line_new = line_.strip() supervisions.append( SupervisionSegment( id=_format_uttid(recoid, start), recording_id=recoid, start=start, duration=round(end - start, ndigits=8), channel=0, text=line_new, language=language, speaker=recoid, ) ) return supervisions # Handles proper formatting (0-padding) of utterance ids def _format_uttid(recoid, start): # Since each recording is a talk, normally by 1 speaker, we use the # recoid as the spkid as well. start = "{0:08d}".format(int(float(start) * 100)) return "_".join([recoid, start]) # Filters out words from text that do not have the right unicode categories. # This includes strange punctuation for instance def _filter_word(s): for c in s: if unicodedata.category(c) not in VALID_CATEGORIES and c not in KEEP_LIST: return False return True # This filters out individual characters in a line of text def _filter(s): return unicodedata.category(s) in VALID_CATEGORIES or s in KEEP_LIST # Convert time to seconds def _time2sec(time): hr, mn, sec = time.split(":") return int(hr) * 3600.0 + int(mn) * 60.0 + float(sec) # Parse the times in the vtt files def _parse_time_segment(l): start, end = l.split(" --> ") start = _time2sec(start) end = _time2sec(end) return start, end # There are some strange space symbols that we normalize def _normalize_space(c): if unicodedata.category(c) == "Zs": return " " else: return c # Parse the vtt file def _parse_vtt(lines, noise): # Import regex for some special unicode handling that re has issues with if not is_module_available("regex"): raise ImportError( "regex package not found. Please install..." " (pip install regex)" ) else: import regex as re2 noise_pattern = re2.compile(r"\([^)]*\)", re2.UNICODE) apostrophe_pattern = re2.compile(r"(\w)'(\w)") html_tags = re2.compile(r"(&[^ ;]*;)|(</?[iu]>)") blocks = lines.split("\n\n") for i, b in enumerate(blocks, -1): if i > 0 and b.strip() != "": b_lines = b.split("\n") start, end = _parse_time_segment(b_lines[0]) line = " ".join(b_lines[1:]) line_new = line if line.strip("- ") != "": line_parts = noise_pattern.sub(noise, line_new) line_parts = apostrophe_pattern.sub(r"\1\u2019\2", line_parts) line_parts = html_tags.sub("", line_parts) line_parts_new = [] for lp in line_parts.split(noise): line_parts_new.append( "".join( [i for i in filter(_filter, lp.strip().replace("-", " "))] ) ) joiner = " " + noise + " " line_new = joiner.join(line_parts_new) line_new = re2.sub( r"\p{Zs}", lambda m: _normalize_space(m.group(0)), line_new ) line_new = re2.sub(r" +", " ", line_new).strip().lower() yield start, end, line_new