Source code for deeppavlov.dataset_readers.morphotagging_dataset_reader

# Copyright 2018 Neural Networks and Deep Learning lab, MIPT
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from logging import getLogger
from pathlib import Path
from typing import Dict, List, Union, Tuple, Optional

from deeppavlov.core.common.registry import register
from import DatasetReader
from import download_decompress, mark_done


log = getLogger(__name__)

[docs]def get_language(filepath: str) -> str: """Extracts language from typical UD filename """ return filepath.split("-")[0]
[docs]def read_infile(infile: Union[Path, str], from_words=False, word_column: int = WORD_COLUMN, pos_column: int = POS_COLUMN, tag_column: int = TAG_COLUMN, max_sents: int = -1, read_only_words: bool = False) -> List[Tuple[List, Union[List, None]]]: """Reads input file in CONLL-U format Args: infile: a path to a file word_column: column containing words (default=1) pos_column: column containing part-of-speech labels (default=3) tag_column: column containing fine-grained tags (default=5) max_sents: maximal number of sents to read read_only_words: whether to read only words Returns: a list of sentences. Each item contains a word sequence and a tag sequence, which is ``None`` in case ``read_only_words = True`` """ answer, curr_word_sent, curr_tag_sent = [], [], [] if from_words: word_column, read_only_words = 0, True if infile is not sys.stdin: fin = open(infile, "r", encoding="utf8") else: fin = sys.stdin for line in fin: line = line.strip() if line.startswith("#"): continue if line == "": if len(curr_word_sent) > 0: if read_only_words: curr_tag_sent = None answer.append((curr_word_sent, curr_tag_sent)) curr_tag_sent, curr_word_sent = [], [] if len(answer) == max_sents: break continue splitted = line.split("\t") index = splitted[0] if not from_words and not index.isdigit(): continue curr_word_sent.append(splitted[word_column]) if not read_only_words: pos, tag = splitted[pos_column], splitted[tag_column] tag = pos if tag == "_" else "{},{}".format(pos, tag) curr_tag_sent.append(tag) if len(curr_word_sent) > 0: if read_only_words: curr_tag_sent = None answer.append((curr_word_sent, curr_tag_sent)) if infile is not sys.stdin: fin.close() return answer
[docs]@register('morphotagger_dataset_reader') class MorphotaggerDatasetReader(DatasetReader): """Class to read training datasets in UD format""" URL = ''
[docs] def read(self, data_path: Union[List, str], language: Optional[None] = None, data_types: Optional[List[str]] = None, **kwargs) -> Dict[str, List]: """Reads UD dataset from data_path. Args: data_path: can be either 1. a directory containing files. The file for data_type 'mode' is then data_path / {language}-ud-{mode}.conllu 2. a list of files, containing the same number of items as data_types language: a language to detect filename when it is not given data_types: which dataset parts among 'train', 'dev', 'test' are returned Returns: a dictionary containing dataset fragments (see ``read_infile``) for given data types """ if data_types is None: data_types = ["train", "dev"] elif isinstance(data_types, str): data_types = list(data_types) for data_type in data_types: if data_type not in ["train", "dev", "test"]: raise ValueError("Unknown data_type: {}, only train, dev and test " "datatypes are allowed".format(data_type)) if isinstance(data_path, str): data_path = Path(data_path) if isinstance(data_path, Path): if data_path.exists(): is_file = data_path.is_file() else: is_file = (len(data_types) == 1) if is_file: # path to a single file data_path, reserve_data_path = [data_path], None else: # path to data directory if language is None: raise ValueError("You must implicitly provide language " "when providing data directory as source") reserve_data_path = data_path data_path = [data_path / "{}-ud-{}.conllu".format(language, mode) for mode in data_types] reserve_data_path = [ reserve_data_path / language / "{}-ud-{}.conllu".format(language, mode) for mode in data_types] else: data_path = [Path(data_path) for data_path in data_path] reserve_data_path = None if len(data_path) != len(data_types): raise ValueError("The number of input files in data_path and data types " "in data_types must be equal") has_missing_files = any(not filepath.exists() for filepath in data_path) if has_missing_files and reserve_data_path is not None: has_missing_files = any(not filepath.exists() for filepath in reserve_data_path) if not has_missing_files: data_path = reserve_data_path if has_missing_files: # Files are downloaded from the Web repository dir_path = data_path[0].parent language = language or get_language(data_path[0].parts[-1]) url = self.URL + "{}.tar.gz".format(language)'[downloading data from {} to {}]'.format(url, dir_path)) dir_path.mkdir(exist_ok=True, parents=True) download_decompress(url, dir_path) mark_done(dir_path) data = {} for mode, filepath in zip(data_types, data_path): if mode == "dev": mode = "valid" # if mode == "test": # kwargs["read_only_words"] = True data[mode] = read_infile(filepath, **kwargs) return data