Source code for deeppavlov.models.morpho_tagger.morpho_tagger

# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from logging import getLogger
from pathlib import Path
from typing import List, Optional, Union, Tuple

import numpy as np
import keras.layers as kl
import keras.optimizers as ko
import keras.regularizers as kreg
import keras.backend as kb
from keras import Model

from deeppavlov.core.common.registry import register
from deeppavlov.core.data.simple_vocab import SimpleVocabulary
from deeppavlov.core.models.keras_model import KerasModel
from .cells import Highway
from .common_tagger import to_one_hot

log = getLogger(__name__)

MAX_WORD_LENGTH = 30


[docs]@register("morpho_tagger") class MorphoTagger(KerasModel): """A class for character-based neural morphological tagger Parameters: symbols: character vocabulary tags: morphological tags vocabulary save_path: the path where model is saved load_path: the path from where model is loaded mode: usage mode word_rnn: the type of character-level network (only `cnn` implemented) char_embeddings_size: the size of character embeddings char_conv_layers: the number of convolutional layers on character level char_window_size: the width of convolutional filter (filters). It can be a list if several parallel filters are applied, for example, [2, 3, 4, 5]. char_filters: the number of convolutional filters for each window width. It can be a number, a list (when there are several windows of different width on a single convolution layer), a list of lists, if there are more than 1 convolution layers, or **None**. If **None**, a layer with width **width** contains min(**char_filter_multiple** * **width**, 200) filters. char_filter_multiple: the ratio between filters number and window width char_highway_layers: the number of highway layers on character level conv_dropout: the ratio of dropout between convolutional layers highway_dropout: the ratio of dropout between highway layers, intermediate_dropout: the ratio of dropout between convolutional and highway layers on character level lstm_dropout: dropout ratio in word-level LSTM word_vectorizers: list of parameters for additional word-level vectorizers, for each vectorizer it stores a pair of vectorizer dimension and the dimension of the corresponding word embedding word_lstm_layers: the number of word-level LSTM layers word_lstm_units: hidden dimensions of word-level LSTMs word_dropout: the ratio of dropout before word level (it is applied to word embeddings) regularizer: l2 regularization parameter verbose: the level of verbosity A subclass of :class:`~deeppavlov.core.models.keras_model.KerasModel` """ def __init__(self, symbols: SimpleVocabulary, tags: SimpleVocabulary, save_path: Optional[Union[str, Path]] = None, load_path: Optional[Union[str, Path]] = None, mode: str = 'infer', word_rnn: str = "cnn", char_embeddings_size: int = 16, char_conv_layers: int = 1, char_window_size: Union[int, List[int]] = 5, char_filters: Union[int, List[int]] = None, char_filter_multiple: int = 25, char_highway_layers: int = 1, conv_dropout: float = 0.0, highway_dropout: float = 0.0, intermediate_dropout: float = 0.0, lstm_dropout: float = 0.0, word_vectorizers: List[Tuple[int, int]] = None, word_lstm_layers: int = 1, word_lstm_units: Union[int, List[int]] = 128, word_dropout: float = 0.0, regularizer: float = None, verbose: int = 1, **kwargs): # Calls parent constructor. Results in creation of save_folder if it doesn't exist super().__init__(save_path=save_path, load_path=load_path, mode=mode) self.symbols = symbols self.tags = tags self.word_rnn = word_rnn self.char_embeddings_size = char_embeddings_size self.char_conv_layers = char_conv_layers self.char_window_size = char_window_size self.char_filters = char_filters self.char_filter_multiple = char_filter_multiple self.char_highway_layers = char_highway_layers self.conv_dropout = conv_dropout self.highway_dropout = highway_dropout self.intermediate_dropout = intermediate_dropout self.lstm_dropout = lstm_dropout self.word_dropout = word_dropout self.word_vectorizers = word_vectorizers # a list of additional vectorizer dimensions self.word_lstm_layers = word_lstm_layers self.word_lstm_units = word_lstm_units self.regularizer = regularizer self.verbose = verbose self._initialize() self.build() # Tries to load the model from model `load_path`, if it is available self.load()
[docs] def load(self) -> None: """ Checks existence of the model file, loads the model if the file exists Loads model weights from a file """ # Checks presence of the model files if self.load_path.exists(): path = str(self.load_path.resolve()) log.info('[loading model from {}]'.format(path)) self.model_.load_weights(path)
[docs] def save(self) -> None: """ Saves model weights to the save_path, provided in config. The directory is already created by super().__init__, which is called in __init__ of this class""" path = str(self.save_path.absolute()) log.info('[saving model to {}]'.format(path)) self.model_.save_weights(path)
def _initialize(self): if isinstance(self.char_window_size, int): self.char_window_size = [self.char_window_size] if self.char_filters is None or isinstance(self.char_filters, int): self.char_filters = [self.char_filters] * len(self.char_window_size) if len(self.char_window_size) != len(self.char_filters): raise ValueError("There should be the same number of window sizes and filter sizes") if isinstance(self.word_lstm_units, int): self.word_lstm_units = [self.word_lstm_units] * self.word_lstm_layers if len(self.word_lstm_units) != self.word_lstm_layers: raise ValueError("There should be the same number of lstm layer units and lstm layers") if self.word_vectorizers is None: self.word_vectorizers = [] if self.regularizer is not None: self.regularizer = kreg.l2(self.regularizer) if self.verbose > 0: log.info("{} symbols, {} tags in CharacterTagger".format(len(self.symbols), len(self.tags)))
[docs] def build(self): """Builds the network using Keras. """ word_inputs = kl.Input(shape=(None, MAX_WORD_LENGTH+2), dtype="int32") inputs = [word_inputs] word_outputs = self._build_word_cnn(word_inputs) if len(self.word_vectorizers) > 0: additional_word_inputs = [kl.Input(shape=(None, input_dim), dtype="float32") for input_dim, dense_dim in self.word_vectorizers] inputs.extend(additional_word_inputs) additional_word_embeddings = [kl.Dense(dense_dim)(additional_word_inputs[i]) for i, (_, dense_dim) in enumerate(self.word_vectorizers)] word_outputs = kl.Concatenate()([word_outputs] + additional_word_embeddings) outputs, lstm_outputs = self._build_basic_network(word_outputs) compile_args = {"optimizer": ko.nadam(lr=0.002, clipnorm=5.0), "loss": "categorical_crossentropy", "metrics": ["accuracy"]} self.model_ = Model(inputs, outputs) self.model_.compile(**compile_args) if self.verbose > 0: self.model_.summary(print_fn=log.info) return self
def _build_word_cnn(self, inputs): """Builds word-level network """ inputs = kl.Lambda(kb.one_hot, arguments={"num_classes": len(self.symbols)}, output_shape=lambda x: tuple(x) + (len(self.symbols),))(inputs) char_embeddings = kl.Dense(self.char_embeddings_size, use_bias=False)(inputs) conv_outputs = [] self.char_output_dim_ = 0 for window_size, filters_number in zip(self.char_window_size, self.char_filters): curr_output = char_embeddings curr_filters_number = (min(self.char_filter_multiple * window_size, 200) if filters_number is None else filters_number) for _ in range(self.char_conv_layers - 1): curr_output = kl.Conv2D(curr_filters_number, (1, window_size), padding="same", activation="relu", data_format="channels_last")(curr_output) if self.conv_dropout > 0.0: curr_output = kl.Dropout(self.conv_dropout)(curr_output) curr_output = kl.Conv2D(curr_filters_number, (1, window_size), padding="same", activation="relu", data_format="channels_last")(curr_output) conv_outputs.append(curr_output) self.char_output_dim_ += curr_filters_number if len(conv_outputs) > 1: conv_output = kl.Concatenate(axis=-1)(conv_outputs) else: conv_output = conv_outputs[0] highway_input = kl.Lambda(kb.max, arguments={"axis": -2})(conv_output) if self.intermediate_dropout > 0.0: highway_input = kl.Dropout(self.intermediate_dropout)(highway_input) for i in range(self.char_highway_layers - 1): highway_input = Highway(activation="relu")(highway_input) if self.highway_dropout > 0.0: highway_input = kl.Dropout(self.highway_dropout)(highway_input) highway_output = Highway(activation="relu")(highway_input) return highway_output def _build_basic_network(self, word_outputs): """ Creates the basic network architecture, transforming word embeddings to intermediate outputs """ if self.word_dropout > 0.0: lstm_outputs = kl.Dropout(self.word_dropout)(word_outputs) else: lstm_outputs = word_outputs for j in range(self.word_lstm_layers-1): lstm_outputs = kl.Bidirectional( kl.LSTM(self.word_lstm_units[j], return_sequences=True, dropout=self.lstm_dropout))(lstm_outputs) lstm_outputs = kl.Bidirectional( kl.LSTM(self.word_lstm_units[-1], return_sequences=True, dropout=self.lstm_dropout))(lstm_outputs) pre_outputs = kl.TimeDistributed( kl.Dense(len(self.tags), activation="softmax", activity_regularizer=self.regularizer), name="p")(lstm_outputs) return pre_outputs, lstm_outputs def _transform_batch(self, data, labels=None, transform_to_one_hot=True): data, additional_data = data[0], data[1:] L = max(len(x) for x in data) X = np.array([self._make_sent_vector(x, L) for x in data]) X = [X] + [np.array(x) for x in additional_data] if labels is not None: Y = np.array([self._make_tags_vector(y, L) for y in labels]) if transform_to_one_hot: Y = to_one_hot(Y, len(self.tags)) return X, Y else: return X
[docs] def train_on_batch(self, *args) -> None: """Trains the model on a single batch. Args: *args: the list of network inputs. Last element of `args` is the batch of targets, all previous elements are training data batches """ # data: List[Iterable], labels: Iterable[list] # Args: # data: a batch of word sequences # labels: a batch of correct tag sequences *data, labels = args X, Y = self._transform_batch(data, labels) self.model_.train_on_batch(X, Y)
[docs] def predict_on_batch(self, data: Union[List[np.ndarray], Tuple[np.ndarray]], return_indexes: bool = False) -> List[List[str]]: """ Makes predictions on a single batch Args: data: model inputs for a single batch, data[0] contains input character encodings and is the only element of data for mist models. Subsequent elements of data include the output of additional vectorizers, e.g., dictionary-based one. return_indexes: whether to return tag indexes in vocabulary or the tags themselves Returns: a batch of label sequences """ X = self._transform_batch(data) objects_number, lengths = len(X[0]), [len(elem) for elem in data[0]] Y = self.model_.predict_on_batch(X) labels = np.argmax(Y, axis=-1) answer: List[List[str]] = [None] * objects_number for i, (elem, length) in enumerate(zip(labels, lengths)): elem = elem[:length] answer[i] = elem if return_indexes else self.tags.idxs2toks(elem) return answer
[docs] def __call__(self, *x_batch, **kwargs) -> Union[List, np.ndarray]: """ Predicts answers on batch elements. Args: x_batch: a batch to predict answers on. It can be either a single array for basic model or a sequence of arrays for a complex one ( :config:`configuration file <morpho_tagger/UD2.0/morpho_ru_syntagrus_pymorphy.json>` or its lemmatized version). """ return self.predict_on_batch(x_batch, **kwargs)
def _make_sent_vector(self, sent: List, bucket_length: int =None) -> np.ndarray: """Transforms a sentence to Numpy array, which will be the network input. Args: sent: input sentence bucket_length: the width of the bucket Returns: A 3d array, answer[i][j][k] contains the index of k-th letter in j-th word of i-th input sentence. """ bucket_length = bucket_length or len(sent) answer = np.zeros(shape=(bucket_length, MAX_WORD_LENGTH+2), dtype=np.int32) for i, word in enumerate(sent): answer[i, 0] = self.tags["BEGIN"] m = min(len(word), MAX_WORD_LENGTH) for j, x in enumerate(word[-m:]): answer[i, j+1] = self.symbols[x] answer[i, m+1] = self.tags["END"] answer[i, m+2:] = self.tags["PAD"] return answer def _make_tags_vector(self, tags, bucket_length=None) -> np.ndarray: """Transforms a sentence of tags to Numpy array, which will be the network target. Args: tags: input sentence of tags bucket_length: the width of the bucket Returns: A 2d array, answer[i][j] contains the index of j-th tag in i-th input sentence. """ bucket_length = bucket_length or len(tags) answer = np.zeros(shape=(bucket_length,), dtype=np.int32) for i, tag in enumerate(tags): answer[i] = self.tags[tag] return answer