Source code for deeppavlov.models.morpho_tagger.morpho_tagger

# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from logging import getLogger
from pathlib import Path
from typing import List, Optional, Union, Tuple

import numpy as np
import tensorflow.keras.backend as K
from tensorflow.keras import Model
from tensorflow.keras.layers import (Input, Dense, Lambda, Concatenate, Conv2D, Dropout, LSTM, Bidirectional,
                                     TimeDistributed)
from tensorflow.keras.optimizers import Nadam
from tensorflow.keras.regularizers import l2

from deeppavlov.core.common.registry import register
from deeppavlov.core.data.simple_vocab import SimpleVocabulary
from deeppavlov.core.models.keras_model import KerasModel
from .cells import Highway
from .common_tagger import to_one_hot

log = getLogger(__name__)

MAX_WORD_LENGTH = 30


[docs]@register("morpho_tagger") class MorphoTagger(KerasModel): """A class for character-based neural morphological tagger Parameters: symbols: character vocabulary tags: morphological tags vocabulary save_path: the path where model is saved load_path: the path from where model is loaded mode: usage mode word_rnn: the type of character-level network (only `cnn` implemented) char_embeddings_size: the size of character embeddings char_conv_layers: the number of convolutional layers on character level char_window_size: the width of convolutional filter (filters). It can be a list if several parallel filters are applied, for example, [2, 3, 4, 5]. char_filters: the number of convolutional filters for each window width. It can be a number, a list (when there are several windows of different width on a single convolution layer), a list of lists, if there are more than 1 convolution layers, or **None**. If **None**, a layer with width **width** contains min(**char_filter_multiple** * **width**, 200) filters. char_filter_multiple: the ratio between filters number and window width char_highway_layers: the number of highway layers on character level conv_dropout: the ratio of dropout between convolutional layers highway_dropout: the ratio of dropout between highway layers, intermediate_dropout: the ratio of dropout between convolutional and highway layers on character level lstm_dropout: dropout ratio in word-level LSTM word_vectorizers: list of parameters for additional word-level vectorizers, for each vectorizer it stores a pair of vectorizer dimension and the dimension of the corresponding word embedding word_lstm_layers: the number of word-level LSTM layers word_lstm_units: hidden dimensions of word-level LSTMs word_dropout: the ratio of dropout before word level (it is applied to word embeddings) regularizer: l2 regularization parameter verbose: the level of verbosity A subclass of :class:`~deeppavlov.core.models.keras_model.KerasModel` """ def __init__(self, symbols: SimpleVocabulary, tags: SimpleVocabulary, save_path: Optional[Union[str, Path]] = None, load_path: Optional[Union[str, Path]] = None, mode: str = 'infer', word_rnn: str = "cnn", char_embeddings_size: int = 16, char_conv_layers: int = 1, char_window_size: Union[int, List[int]] = 5, char_filters: Union[int, List[int]] = None, char_filter_multiple: int = 25, char_highway_layers: int = 1, conv_dropout: float = 0.0, highway_dropout: float = 0.0, intermediate_dropout: float = 0.0, lstm_dropout: float = 0.0, word_vectorizers: List[Tuple[int, int]] = None, word_lstm_layers: int = 1, word_lstm_units: Union[int, List[int]] = 128, word_dropout: float = 0.0, regularizer: float = None, verbose: int = 1, **kwargs): # Calls parent constructor. Results in creation of save_folder if it doesn't exist super().__init__(save_path=save_path, load_path=load_path, mode=mode, **kwargs) self.symbols = symbols self.tags = tags self.word_rnn = word_rnn self.char_embeddings_size = char_embeddings_size self.char_conv_layers = char_conv_layers self.char_window_size = char_window_size self.char_filters = char_filters self.char_filter_multiple = char_filter_multiple self.char_highway_layers = char_highway_layers self.conv_dropout = conv_dropout self.highway_dropout = highway_dropout self.intermediate_dropout = intermediate_dropout self.lstm_dropout = lstm_dropout self.word_dropout = word_dropout self.word_vectorizers = word_vectorizers # a list of additional vectorizer dimensions self.word_lstm_layers = word_lstm_layers self.word_lstm_units = word_lstm_units self.regularizer = regularizer self.verbose = verbose self._initialize() self.model_ = None self.build() # Tries to load the model from model `load_path`, if it is available self.load()
[docs] def load(self) -> None: """ Checks existence of the model file, loads the model if the file exists Loads model weights from a file """ # Checks presence of the model files if self.load_path.exists(): path = str(self.load_path.resolve()) log.info('[loading model from {}]'.format(path)) self.model_.load_weights(path)
[docs] def save(self) -> None: """ Saves model weights to the save_path, provided in config. The directory is already created by super().__init__, which is called in __init__ of this class""" path = str(self.save_path.absolute()) log.info('[saving model to {}]'.format(path)) self.model_.save_weights(path)
def _initialize(self): if isinstance(self.char_window_size, int): self.char_window_size = [self.char_window_size] if self.char_filters is None or isinstance(self.char_filters, int): self.char_filters = [self.char_filters] * len(self.char_window_size) if len(self.char_window_size) != len(self.char_filters): raise ValueError("There should be the same number of window sizes and filter sizes") if isinstance(self.word_lstm_units, int): self.word_lstm_units = [self.word_lstm_units] * self.word_lstm_layers if len(self.word_lstm_units) != self.word_lstm_layers: raise ValueError("There should be the same number of lstm layer units and lstm layers") if self.word_vectorizers is None: self.word_vectorizers = [] if self.regularizer is not None: self.regularizer = l2(self.regularizer) if self.verbose > 0: log.info("{} symbols, {} tags in CharacterTagger".format(len(self.symbols), len(self.tags)))
[docs] def build(self): """Builds the network using Keras. """ word_inputs = Input(shape=(None, MAX_WORD_LENGTH+2), dtype="int32") inputs = [word_inputs] word_outputs = self._build_word_cnn(word_inputs) if len(self.word_vectorizers) > 0: additional_word_inputs = [Input(shape=(None, input_dim), dtype="float32") for input_dim, dense_dim in self.word_vectorizers] inputs.extend(additional_word_inputs) additional_word_embeddings = [Dense(dense_dim)(additional_word_inputs[i]) for i, (_, dense_dim) in enumerate(self.word_vectorizers)] word_outputs = Concatenate()([word_outputs] + additional_word_embeddings) outputs, lstm_outputs = self._build_basic_network(word_outputs) compile_args = {"optimizer": Nadam(lr=0.002, clipnorm=5.0), "loss": "categorical_crossentropy", "metrics": ["accuracy"]} self.model_ = Model(inputs, outputs) self.model_.compile(**compile_args) if self.verbose > 0: self.model_.summary(print_fn=log.info) return self
def _build_word_cnn(self, inputs): """Builds word-level network """ inputs = Lambda(K.one_hot, arguments={"num_classes": len(self.symbols)}, output_shape=lambda x: tuple(x) + (len(self.symbols),))(inputs) char_embeddings = Dense(self.char_embeddings_size, use_bias=False)(inputs) conv_outputs = [] self.char_output_dim_ = 0 for window_size, filters_number in zip(self.char_window_size, self.char_filters): curr_output = char_embeddings curr_filters_number = (min(self.char_filter_multiple * window_size, 200) if filters_number is None else filters_number) for _ in range(self.char_conv_layers - 1): curr_output = Conv2D(curr_filters_number, (1, window_size), padding="same", activation="relu", data_format="channels_last")(curr_output) if self.conv_dropout > 0.0: curr_output = Dropout(self.conv_dropout)(curr_output) curr_output = Conv2D(curr_filters_number, (1, window_size), padding="same", activation="relu", data_format="channels_last")(curr_output) conv_outputs.append(curr_output) self.char_output_dim_ += curr_filters_number if len(conv_outputs) > 1: conv_output = Concatenate(axis=-1)(conv_outputs) else: conv_output = conv_outputs[0] highway_input = Lambda(K.max, arguments={"axis": -2})(conv_output) if self.intermediate_dropout > 0.0: highway_input = Dropout(self.intermediate_dropout)(highway_input) for i in range(self.char_highway_layers - 1): highway_input = Highway(activation="relu")(highway_input) if self.highway_dropout > 0.0: highway_input = Dropout(self.highway_dropout)(highway_input) highway_output = Highway(activation="relu")(highway_input) return highway_output def _build_basic_network(self, word_outputs): """ Creates the basic network architecture, transforming word embeddings to intermediate outputs """ if self.word_dropout > 0.0: lstm_outputs = Dropout(self.word_dropout)(word_outputs) else: lstm_outputs = word_outputs for j in range(self.word_lstm_layers-1): lstm_outputs = Bidirectional( LSTM(self.word_lstm_units[j], return_sequences=True, dropout=self.lstm_dropout))(lstm_outputs) lstm_outputs = Bidirectional( LSTM(self.word_lstm_units[-1], return_sequences=True, dropout=self.lstm_dropout))(lstm_outputs) pre_outputs = TimeDistributed( Dense(len(self.tags), activation="softmax", activity_regularizer=self.regularizer), name="p")(lstm_outputs) return pre_outputs, lstm_outputs # noinspection PyPep8Naming def _transform_batch(self, data, labels=None, transform_to_one_hot=True): data, additional_data = data[0], data[1:] L = max(len(x) for x in data) X = np.array([self._make_sent_vector(x, L) for x in data]) X = [X] + [np.array(x) for x in additional_data] if labels is not None: Y = np.array([self._make_tags_vector(y, L) for y in labels]) if transform_to_one_hot: Y = to_one_hot(Y, len(self.tags)) return X, Y else: return X
[docs] def train_on_batch(self, *args) -> None: """Trains the model on a single batch. Args: *args: the list of network inputs. Last element of `args` is the batch of targets, all previous elements are training data batches """ # data: List[Iterable], labels: Iterable[list] # Args: # data: a batch of word sequences # labels: a batch of correct tag sequences *data, labels = args # noinspection PyPep8Naming X, Y = self._transform_batch(data, labels) self.model_.train_on_batch(X, Y)
# noinspection PyPep8Naming
[docs] def predict_on_batch(self, data: Union[List[np.ndarray], Tuple[np.ndarray]], return_indexes: bool = False) -> List[List[str]]: """ Makes predictions on a single batch Args: data: model inputs for a single batch, data[0] contains input character encodings and is the only element of data for mist models. Subsequent elements of data include the output of additional vectorizers, e.g., dictionary-based one. return_indexes: whether to return tag indexes in vocabulary or the tags themselves Returns: a batch of label sequences """ X = self._transform_batch(data) objects_number, lengths = len(X[0]), [len(elem) for elem in data[0]] Y = self.model_.predict_on_batch(X) labels = np.argmax(Y, axis=-1) answer: List[Optional[List[str]]] = [None] * objects_number for i, (elem, length) in enumerate(zip(labels, lengths)): elem = elem[:length] answer[i] = elem if return_indexes else self.tags.idxs2toks(elem) return answer
[docs] def __call__(self, *x_batch: np.ndarray, **kwargs) -> Union[List, np.ndarray]: """ Predicts answers on batch elements. Args: x_batch: a batch to predict answers on. It can be either a single array for basic model or a sequence of arrays for a complex one ( :config:`configuration file <morpho_tagger/UD2.0/morpho_ru_syntagrus_pymorphy.json>` or its lemmatized version). """ return self.predict_on_batch(x_batch, **kwargs)
def _make_sent_vector(self, sent: List, bucket_length: int = None) -> np.ndarray: """Transforms a sentence to Numpy array, which will be the network input. Args: sent: input sentence bucket_length: the width of the bucket Returns: A 3d array, answer[i][j][k] contains the index of k-th letter in j-th word of i-th input sentence. """ bucket_length = bucket_length or len(sent) answer = np.zeros(shape=(bucket_length, MAX_WORD_LENGTH+2), dtype=np.int32) for i, word in enumerate(sent): answer[i, 0] = self.tags["BEGIN"] m = min(len(word), MAX_WORD_LENGTH) for j, x in enumerate(word[-m:]): answer[i, j+1] = self.symbols[x] answer[i, m+1] = self.tags["END"] answer[i, m+2:] = self.tags["PAD"] return answer def _make_tags_vector(self, tags, bucket_length=None) -> np.ndarray: """Transforms a sentence of tags to Numpy array, which will be the network target. Args: tags: input sentence of tags bucket_length: the width of the bucket Returns: A 2d array, answer[i][j] contains the index of j-th tag in i-th input sentence. """ bucket_length = bucket_length or len(tags) answer = np.zeros(shape=(bucket_length,), dtype=np.int32) for i, tag in enumerate(tags): answer[i] = self.tags[tag] return answer