Source code for deeppavlov.models.morpho_tagger.network

# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union, Tuple, Iterable

import keras.layers as kl
import keras.optimizers as ko
import keras.regularizers as kreg
from keras import Model

from deeppavlov.core.common.registry import register
from deeppavlov.core.common.log import get_logger
from deeppavlov.core.models.keras_model import KerasWrapper
from deeppavlov.core.data.vocab import DefaultVocabulary
from .common_tagger import *
from .cells import Highway

log = get_logger(__name__)

MAX_WORD_LENGTH = 30

[docs]class CharacterTagger: """A class for character-based neural morphological tagger Parameters: symbols: character vocabulary tags: morphological tags vocabulary word_rnn: the type of character-level network (only `cnn` implemented) char_embeddings_size: the size of character embeddings char_conv_layers: the number of convolutional layers on character level char_window_size: the width of convolutional filter (filters). It can be a list if several parallel filters are applied, for example, [2, 3, 4, 5]. char_filters: the number of convolutional filters for each window width. It can be a number, a list (when there are several windows of different width on a single convolution layer), a list of lists, if there are more than 1 convolution layers, or **None**. If **None**, a layer with width **width** contains min(**char_filter_multiple** * **width**, 200) filters. char_filter_multiple: the ratio between filters number and window width char_highway_layers: the number of highway layers on character level conv_dropout: the ratio of dropout between convolutional layers highway_dropout: the ratio of dropout between highway layers, intermediate_dropout: the ratio of dropout between convolutional and highway layers on character level lstm_dropout: dropout ratio in word-level LSTM word_vectorizers: list of parameters for additional word-level vectorizers, for each vectorizer it stores a pair of vectorizer dimension and the dimension of the corresponding word embedding word_lstm_layers: the number of word-level LSTM layers word_lstm_units: hidden dimensions of word-level LSTMs word_dropout: the ratio of dropout before word level (it is applied to word embeddings) regularizer: l2 regularization parameter verbose: the level of verbosity """ def __init__(self, symbols: DefaultVocabulary, tags: DefaultVocabulary, word_rnn: str = "cnn", char_embeddings_size: int = 16, char_conv_layers: int = 1, char_window_size: Union[int, List[int]] = 5, char_filters: Union[int, List[int]] = None, char_filter_multiple: int = 25, char_highway_layers: int = 1, conv_dropout: float = 0.0, highway_dropout: float = 0.0, intermediate_dropout: float = 0.0, lstm_dropout: float = 0.0, word_vectorizers: List[Tuple[int, int]] = None, word_lstm_layers: int = 1, word_lstm_units: Union[int, List[int]] = 128, word_dropout: float = 0.0, regularizer: float = None, verbose: int = 1): self.symbols = symbols self.tags = tags self.word_rnn = word_rnn self.char_embeddings_size = char_embeddings_size self.char_conv_layers = char_conv_layers self.char_window_size = char_window_size self.char_filters = char_filters self.char_filter_multiple = char_filter_multiple self.char_highway_layers = char_highway_layers self.conv_dropout = conv_dropout self.highway_dropout = highway_dropout self.intermediate_dropout = intermediate_dropout self.lstm_dropout = lstm_dropout self.word_dropout = word_dropout self.word_vectorizers = word_vectorizers # a list of additional vectorizer dimensions self.word_lstm_layers = word_lstm_layers self.word_lstm_units = word_lstm_units self.regularizer = regularizer self.verbose = verbose self._initialize() self.build() def _initialize(self): if isinstance(self.char_window_size, int): self.char_window_size = [self.char_window_size] if self.char_filters is None or isinstance(self.char_filters, int): self.char_filters = [self.char_filters] * len(self.char_window_size) if len(self.char_window_size) != len(self.char_filters): raise ValueError("There should be the same number of window sizes and filter sizes") if isinstance(self.word_lstm_units, int): self.word_lstm_units = [self.word_lstm_units] * self.word_lstm_layers if len(self.word_lstm_units) != self.word_lstm_layers: raise ValueError("There should be the same number of lstm layer units and lstm layers") if self.word_vectorizers is None: self.word_vectorizers = [] if self.regularizer is not None: self.regularizer = kreg.l2(self.regularizer) if self.verbose > 0: log.info("{} symbols, {} tags in CharacterTagger".format(self.symbols_number_, self.tags_number_)) @property def symbols_number_(self) -> int: """Character vocabulary size """ return len(self.symbols) @property def tags_number_(self) -> int: """Tag vocabulary size """ return len(self.tags)
[docs] def build(self): """Builds the network using Keras. """ word_inputs = kl.Input(shape=(None, MAX_WORD_LENGTH+2), dtype="int32") inputs = [word_inputs] word_outputs = self._build_word_cnn(word_inputs) if len(self.word_vectorizers) > 0: additional_word_inputs = [kl.Input(shape=(None, input_dim), dtype="float32") for input_dim, dense_dim in self.word_vectorizers] inputs.extend(additional_word_inputs) additional_word_embeddings = [kl.Dense(dense_dim)(additional_word_inputs[i]) for i, (_, dense_dim) in enumerate(self.word_vectorizers)] word_outputs = kl.Concatenate()([word_outputs] + additional_word_embeddings) outputs, lstm_outputs = self._build_basic_network(word_outputs) compile_args = {"optimizer": ko.nadam(lr=0.002, clipnorm=5.0), "loss": "categorical_crossentropy", "metrics": ["accuracy"]} self.model_ = Model(inputs, outputs) self.model_.compile(**compile_args) if self.verbose > 0: self.model_.summary(print_fn=log.info) return self
def _build_word_cnn(self, inputs): """Builds word-level network """ inputs = kl.Lambda(kb.one_hot, arguments={"num_classes": self.symbols_number_}, output_shape=lambda x: tuple(x) + (self.symbols_number_,))(inputs) char_embeddings = kl.Dense(self.char_embeddings_size, use_bias=False)(inputs) conv_outputs = [] self.char_output_dim_ = 0 for window_size, filters_number in zip(self.char_window_size, self.char_filters): curr_output = char_embeddings curr_filters_number = (min(self.char_filter_multiple * window_size, 200) if filters_number is None else filters_number) for _ in range(self.char_conv_layers - 1): curr_output = kl.Conv2D(curr_filters_number, (1, window_size), padding="same", activation="relu", data_format="channels_last")(curr_output) if self.conv_dropout > 0.0: curr_output = kl.Dropout(self.conv_dropout)(curr_output) curr_output = kl.Conv2D(curr_filters_number, (1, window_size), padding="same", activation="relu", data_format="channels_last")(curr_output) conv_outputs.append(curr_output) self.char_output_dim_ += curr_filters_number if len(conv_outputs) > 1: conv_output = kl.Concatenate(axis=-1)(conv_outputs) else: conv_output = conv_outputs[0] highway_input = kl.Lambda(kb.max, arguments={"axis": -2})(conv_output) if self.intermediate_dropout > 0.0: highway_input = kl.Dropout(self.intermediate_dropout)(highway_input) for i in range(self.char_highway_layers - 1): highway_input = Highway(activation="relu")(highway_input) if self.highway_dropout > 0.0: highway_input = kl.Dropout(self.highway_dropout)(highway_input) highway_output = Highway(activation="relu")(highway_input) return highway_output def _build_basic_network(self, word_outputs): """ Creates the basic network architecture, transforming word embeddings to intermediate outputs """ if self.word_dropout > 0.0: lstm_outputs = kl.Dropout(self.word_dropout)(word_outputs) else: lstm_outputs = word_outputs for j in range(self.word_lstm_layers-1): lstm_outputs = kl.Bidirectional( kl.LSTM(self.word_lstm_units[j], return_sequences=True, dropout=self.lstm_dropout))(lstm_outputs) lstm_outputs = kl.Bidirectional( kl.LSTM(self.word_lstm_units[-1], return_sequences=True, dropout=self.lstm_dropout))(lstm_outputs) pre_outputs = kl.TimeDistributed( kl.Dense(self.tags_number_, activation="softmax", activity_regularizer=self.regularizer), name="p")(lstm_outputs) return pre_outputs, lstm_outputs def _transform_batch(self, data, labels=None, transform_to_one_hot=True): data, additional_data = data[0], data[1:] L = max(len(x) for x in data) X = np.array([self._make_sent_vector(x, L) for x in data]) X = [X] + [np.array(x) for x in additional_data] if labels is not None: Y = np.array([self._make_tags_vector(y, L) for y in labels]) if transform_to_one_hot: Y = to_one_hot(Y, len(self.tags)) return X, Y else: return X
[docs] def train_on_batch(self, data: List[Iterable], labels: Iterable[list]) -> None: """Trains model on a single batch Args: data: a batch of word sequences labels: a batch of correct tag sequences Returns: the trained model """ X, Y = self._transform_batch(data, labels) self.model_.train_on_batch(X, Y)
[docs] def predict_on_batch(self, data: Union[list, tuple], return_indexes: bool = False) -> List[List[str]]: """ Makes predictions on a single batch Args: data: a batch of word sequences together with additional inputs return_indexes: whether to return tag indexes in vocabulary or tags themselves Returns: a batch of label sequences """ X = self._transform_batch(data) objects_number, lengths = len(X[0]), [len(elem) for elem in data[0]] Y = self.model_.predict_on_batch(X) labels = np.argmax(Y, axis=-1) answer: List[List[str]] = [None] * objects_number for i, (elem, length) in enumerate(zip(labels, lengths)): elem = elem[:length] answer[i] = elem if return_indexes else self.tags.idxs2toks(elem) return answer
def _make_sent_vector(self, sent: List, bucket_length: int =None) -> np.ndarray: """Transforms a sentence to Numpy array, which will be the network input. Args: sent: input sentence bucket_length: the width of the bucket Returns: A 3d array, answer[i][j][k] contains the index of k-th letter in j-th word of i-th input sentence. """ bucket_length = bucket_length or len(sent) answer = np.zeros(shape=(bucket_length, MAX_WORD_LENGTH+2), dtype=np.int32) for i, word in enumerate(sent): answer[i, 0] = self.tags.tok2idx("BEGIN") m = min(len(word), MAX_WORD_LENGTH) for j, x in enumerate(word[-m:]): answer[i, j+1] = self.symbols.tok2idx(x) answer[i, m+1] = self.tags.tok2idx("END") answer[i, m+2:] = self.tags.tok2idx("PAD") return answer def _make_tags_vector(self, tags, bucket_length=None) -> np.ndarray: """Transforms a sentence of tags to Numpy array, which will be the network target. Args: tags: input sentence of tags bucket_length: the width of the bucket Returns: A 2d array, answer[i][j] contains the index of j-th tag in i-th input sentence. """ bucket_length = bucket_length or len(tags) answer = np.zeros(shape=(bucket_length,), dtype=np.int32) for i, tag in enumerate(tags): answer[i] = self.tags.tok2idx(tag) return answer
[docs] def save(self, outfile) -> None: """Saves model weights to a file Args: outfile: file with model weights (other model components should be given in config) """ self.model_.save_weights(outfile)
[docs] def load(self, infile) -> None: """Loads model weights from a file Args: infile: file to load model weights from """ self.model_.load_weights(infile)
[docs]@register("morpho_tagger") class MorphoTagger(KerasWrapper): """ A wrapper over :class:`CharacterTagger`. It is inherited from :class:`~deeppavlov.core.keras_model.KerasWrapper`. It accepts initialization parameters of :class:`CharacterTagger` """ def __init__(self, *args, **kwargs) -> None: super().__init__(CharacterTagger, *args, **kwargs)