Source code for deeppavlov.models.ranking.ranking_network

from keras.layers import Input, LSTM, Embedding, GlobalMaxPooling1D, Lambda, subtract, Conv2D, Dense, Activation
from keras.layers.merge import Dot, Subtract, Add, Multiply
from keras.models import Model
from keras.layers.wrappers import Bidirectional
from keras.optimizers import Adam
from keras.initializers import glorot_uniform, Orthogonal
from keras import losses
from keras import backend as K
import tensorflow as tf
import numpy as np
from deeppavlov.core.models.tf_backend import TfModelMeta
from deeppavlov.core.common.log import get_logger
from deeppavlov.core.layers import keras_layers
from pathlib import Path
from deeppavlov.models.ranking.emb_dict import EmbDict

log = get_logger(__name__)


[docs]class RankingNetwork(metaclass=TfModelMeta): """Class to perform context-response matching with neural networks. Args: toks_num: A size of `tok2int` vocabulary to build embedding layer. chars_num: A size of `char2int` vocabulary to build character-level embedding layer. learning_rate: Learning rate. device_num: A number of a device to perform model training on if several devices are available in a system. seed: Random seed. shared_weights: Whether to use shared weights in the model to encode contexts and responses. triplet_mode: Whether to use a model with triplet loss. If ``False``, a model with crossentropy loss will be used. margin: A margin parameter for triplet loss. Only required if ``triplet_mode`` is set to ``True``. distance: Distance metric (similarity measure) to compare context and response representations in the model. Possible values are ``cos_similarity`` (cosine similarity), ``euqlidian`` (euqlidian distance), ``sigmoid`` (1 minus sigmoid). token_embeddings: Whether to use token (word) embeddings in the model. use_matrix: Whether to use trainable matrix with token (word) embeddings. max_sequence_length: A maximum length of a sequence in tokens. Longer sequences will be truncated and shorter ones will be padded. tok_dynamic_batch: Whether to use dynamic batching. If ``True``, a maximum length of a sequence for a batch will be equal to the maximum of all sequences lengths from this batch, but not higher than ``max_sequence_length``. embedding_dim: Dimensionality of token (word) embeddings. char_embeddings: Whether to use character-level token (word) embeddings in the model. max_token_length: A maximum length of a token for representing it by a character-level embedding. char_dynamic_batch: Whether to use dynamic batching for character-level embeddings. If ``True``, a maximum length of a token for a batch will be equal to the maximum of all tokens lengths from this batch, but not higher than ``max_token_length``. char_emb_dim: Dimensionality of character-level embeddings. reccurent: A type of the RNN cell. Possible values are ``lstm`` and ``bilstm``. hidden_dim: Dimensionality of the hidden state of the RNN cell. If ``reccurent`` equals ``bilstm`` to get the actual dimensionality ``hidden_dim`` should be doubled. max_pooling: Whether to use max-pooling operation to get context (response) vector representation. If ``False``, the last hidden state of the RNN will be used. """ def __init__(self, toks_num: int, chars_num: int, emb_dict: EmbDict, max_sequence_length: int, max_token_length: int = None, learning_rate: float = 1e-3, device_num: int = 0, seed: int = None, shared_weights: bool = True, triplet_mode: bool = True, margin: float = 0.1, distance: str = "cos_similarity", token_embeddings: bool = True, use_matrix: bool = False, tok_dynamic_batch: bool = False, embedding_dim: int = 300, char_embeddings: bool = False, char_dynamic_batch: bool = False, char_emb_dim: int = 32, highway_on_top: bool = False, reccurent: str = "bilstm", hidden_dim: int = 300, max_pooling: bool = True): self.distance = distance self.toks_num = toks_num self.emb_dict = emb_dict self.use_matrix = use_matrix self.seed = seed self.hidden_dim = hidden_dim self.learning_rate = learning_rate self.margin = margin self.embedding_dim = embedding_dim self.device_num = device_num self.shared_weights = shared_weights self.pooling = max_pooling self.recurrent = reccurent self.token_embeddings = token_embeddings self.char_embeddings = char_embeddings self.chars_num = chars_num self.char_emb_dim = char_emb_dim self.highway_on_top = highway_on_top self.triplet_mode = triplet_mode if tok_dynamic_batch: self.max_sequence_length = None else: self.max_sequence_length = max_sequence_length if char_dynamic_batch: self.max_token_length = None else: self.max_token_length = max_token_length self.sess = self._config_session() K.set_session(self.sess) self.optimizer = Adam(lr=self.learning_rate) self.duplet = self.duplet() if self.triplet_mode: self.loss = self.triplet_loss self.obj_model = self.triplet_model() else: self.loss = losses.binary_crossentropy self.obj_model = self.duplet_model() self.obj_model.compile(loss=self.loss, optimizer=self.optimizer) self.score_model = self.duplet self.context_embedding = Model(inputs=self.duplet.inputs, outputs=self.duplet.get_layer(name="pooling").get_output_at(0)) self.response_embedding = Model(inputs=self.duplet.inputs, outputs=self.duplet.get_layer(name="pooling").get_output_at(1)) # self.score_model = Model(inputs=[self.obj_model.inputs[0], self.obj_model.inputs[1]], # outputs=self.obj_model.get_layer(name="score_model").get_output_at(0)) # self.context_embedding = Model(inputs=[self.obj_model.inputs[0], self.obj_model.inputs[1]], # outputs=self.obj_model.get_layer(name="pooling").get_output_at(0)) # self.response_embedding = Model(inputs=[self.obj_model.inputs[2], self.obj_model.inputs[3]], # outputs=self.obj_model.get_layer(name="pooling").get_output_at(1)) def _config_session(self): """ Configure session for particular device Returns: tensorflow.Session """ config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(self.device_num) return tf.Session(config=config) def load(self, path): log.info("[initializing `{}` from saved]".format(self.__class__.__name__)) self.obj_model.load_weights(path) def save(self, path): log.info("[saving `{}`]".format(self.__class__.__name__)) self.obj_model.save_weights(path) self.context_embedding.save(str(Path(path).parent / 'sen_emb_model.h5')) def init_from_scratch(self, emb_matrix): log.info("[initializing new `{}`]".format(self.__class__.__name__)) if self.token_embeddings and not self.char_embeddings: if self.use_matrix: if self.shared_weights: self.duplet.get_layer(name="embedding").set_weights([emb_matrix]) if self.shared_weights: self.duplet.get_layer(name="embedding_a").set_weights([emb_matrix]) self.duplet.get_layer(name="embedding_b").set_weights([emb_matrix]) def embedding_layer(self): if self.shared_weights: out_a = Embedding(self.toks_num, self.embedding_dim, input_length=self.max_sequence_length, trainable=True, name="embedding") return out_a, out_a else: out_a = Embedding(self.toks_num, self.embedding_dim, input_length=self.max_sequence_length, trainable=True, name="embedding_a") out_b = Embedding(self.toks_num, self.embedding_dim, input_length=self.max_sequence_length, trainable=True, name="embedding_b") return out_a, out_b def lstm_layer(self): """Create a LSTM layer of a model.""" if self.pooling: ret_seq = True else: ret_seq = False ker_in = glorot_uniform(seed=self.seed) rec_in = Orthogonal(seed=self.seed) if self.shared_weights: if self.recurrent == "bilstm" or self.recurrent is None: out_a = Bidirectional(LSTM(self.hidden_dim, input_shape=(self.max_sequence_length, self.embedding_dim,), kernel_initializer=ker_in, recurrent_initializer=rec_in, return_sequences=ret_seq), merge_mode='concat') elif self.recurrent == "lstm": out_a = LSTM(self.hidden_dim, input_shape=(self.max_sequence_length, self.embedding_dim,), kernel_initializer=ker_in, recurrent_initializer=rec_in, return_sequences=ret_seq) return out_a, out_a else: if self.recurrent == "bilstm" or self.recurrent is None: out_a = Bidirectional(LSTM(self.hidden_dim, input_shape=(self.max_sequence_length, self.embedding_dim,), kernel_initializer=ker_in, recurrent_initializer=rec_in, return_sequences=ret_seq), merge_mode='concat') out_b = Bidirectional(LSTM(self.hidden_dim, input_shape=(self.max_sequence_length, self.embedding_dim,), kernel_initializer=ker_in, recurrent_initializer=rec_in, return_sequences=ret_seq), merge_mode='concat') elif self.recurrent == "lstm": out_a = LSTM(self.hidden_dim, input_shape=(self.max_sequence_length, self.embedding_dim,), kernel_initializer=ker_in, recurrent_initializer=rec_in, return_sequences=ret_seq) out_b = LSTM(self.hidden_dim, input_shape=(self.max_sequence_length, self.embedding_dim,), kernel_initializer=ker_in, recurrent_initializer=rec_in, return_sequences=ret_seq) return out_a, out_b def triplet_loss(self, y_true, y_pred): """Triplet loss function""" return K.mean(K.maximum(self.margin - y_pred, 0.), axis=-1) def duplet(self): if self.token_embeddings and not self.char_embeddings: if self.use_matrix: context = Input(shape=(self.max_sequence_length,)) response = Input(shape=(self.max_sequence_length,)) emb_layer_a, emb_layer_b = self.embedding_layer() emb_c = emb_layer_a(context) emb_r = emb_layer_b(response) else: context = Input(shape=(self.max_sequence_length, self.embedding_dim,)) response = Input(shape=(self.max_sequence_length, self.embedding_dim,)) emb_c = context emb_r = response elif not self.token_embeddings and self.char_embeddings: context = Input(shape=(self.max_sequence_length, self.max_token_length,)) response = Input(shape=(self.max_sequence_length, self.max_token_length,)) char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, char_embedding_dim=self.char_emb_dim) emb_c = char_cnn_layer(context) emb_r = char_cnn_layer(response) elif self.token_embeddings and self.char_embeddings: context = Input(shape=(self.max_sequence_length, self.max_token_length,)) response = Input(shape=(self.max_sequence_length, self.max_token_length,)) if self.use_matrix: c_tok = Lambda(lambda x: x[:,:,0])(context) r_tok = Lambda(lambda x: x[:,:,0])(response) emb_layer_a, emb_layer_b = self.embedding_layer() emb_c = emb_layer_a(c_tok) emb_rp = emb_layer_b(r_tok) c_char = Lambda(lambda x: x[:,:,1:])(context) r_char = Lambda(lambda x: x[:,:,1:])(response) else: c_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(context) r_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response) emb_c = c_tok emb_rp = r_tok c_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(context) r_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response) char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, char_embedding_dim=self.char_emb_dim) emb_c_char = char_cnn_layer(c_char) emb_r_char = char_cnn_layer(r_char) emb_c = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_c, emb_c_char]) emb_r = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rp, emb_r_char]) lstm_layer_a, lstm_layer_b = self.lstm_layer() lstm_c = lstm_layer_a(emb_c) lstm_r = lstm_layer_b(emb_r) if self.pooling: pooling_layer = GlobalMaxPooling1D(name="pooling") lstm_c = pooling_layer(lstm_c) lstm_r = pooling_layer(lstm_r) if self.distance == "cos_similarity": cosine_layer = Dot(normalize=True, axes=-1, name="score_model") score = cosine_layer([lstm_c, lstm_r]) score = Lambda(lambda x: 1. - x)(score) elif self.distance == "euclidian": dist_score = Lambda(lambda x: K.expand_dims(self.euclidian_dist(x)), name="score_model") score = dist_score([lstm_c, lstm_r]) elif self.distance == "sigmoid": dist = Lambda(self.diff_mult_dist)([lstm_c, lstm_r]) score = Dense(1, activation='sigmoid', name="score_model")(dist) score = Lambda(lambda x: 1. - x)(score) model = Model([context, response], score) return model def duplet_model(self): duplet = self.duplet c_shape = K.int_shape(duplet.inputs[0]) r_shape = K.int_shape(duplet.inputs[1]) c = Input(batch_shape=c_shape) r = Input(batch_shape=r_shape) score = duplet([c, r]) score = Lambda(lambda x: 1. - x)(score) model = Model([c, r], score) return model def triplet_model(self): duplet = self.duplet c_shape = K.int_shape(duplet.inputs[0]) r_shape = K.int_shape(duplet.inputs[1]) c1 = Input(batch_shape=c_shape) r1 = Input(batch_shape=r_shape) c2 = Input(batch_shape=c_shape) r2 = Input(batch_shape=r_shape) score1 = duplet([c1, r1]) score2 = duplet([c2, r2]) score_diff = Subtract()([score2, score1]) model = Model([c1, r1, c2, r2], score_diff) return model def diff_mult_dist(self, inputs): input1, input2 = inputs a = K.abs(input1-input2) b = Multiply()(inputs) return K.concatenate([input1, input2, a, b]) def euclidian_dist(self, x_pair): x1_norm = K.l2_normalize(x_pair[0], axis=1) x2_norm = K.l2_normalize(x_pair[1], axis=1) diff = x1_norm - x2_norm square = K.square(diff) sum = K.sum(square, axis=1) sum = K.clip(sum, min_value=1e-12, max_value=None) dist = K.sqrt(sum) return dist def train_on_batch(self, batch, y): batch = [x for el in batch for x in el] if self.token_embeddings and not self.char_embeddings: if self.use_matrix: self.obj_model.train_on_batch(x=[np.asarray(x) for x in batch], y=np.asarray(y)) else: b = batch for i in range(len(b)): b[i] = self.emb_dict.get_embs(b[i]) self.obj_model.train_on_batch(x=b, y=np.asarray(y)) elif not self.token_embeddings and self.char_embeddings: self.obj_model.train_on_batch(x=[np.asarray(x) for x in batch], y=np.asarray(y)) elif self.token_embeddings and self.char_embeddings: if self.use_matrix: self.obj_model.train_on_batch(x=[np.asarray(x) for x in batch], y=np.asarray(y)) else: b = [x[0] for x in batch] for i in range(len(b)): b[i] = self.emb_dict.get_embs(b[i]) self.obj_model.train_on_batch(x=b, y=np.asarray(y)) def predict_score_on_batch(self, batch): if self.token_embeddings and not self.char_embeddings: if self.use_matrix: return self.score_model.predict_on_batch(x=batch) else: b = batch for i in range(len(b)): b[i] = self.emb_dict.get_embs(b[i]) return self.score_model.predict_on_batch(x=b) elif not self.token_embeddings and self.char_embeddings: return self.score_model.predict_on_batch(x=batch) elif self.token_embeddings and self.char_embeddings: if self.use_matrix: return self.score_model.predict_on_batch(x=batch) else: b = [batch[i][:,:,0] for i in range(len(batch))] b = [np.concatenate([b[i], batch[i][:,:,1:]], axis=2) for i in range(len(batch))] return self.score_model.predict_on_batch(x=b) def predict_embedding_on_batch(self, batch, type='context'): if type == 'context': embedding = self.context_embedding elif type == 'response': embedding = self.response_embedding if self.token_embeddings and not self.char_embeddings: if self.use_matrix: return embedding.predict_on_batch(x=batch) else: b = batch b = [self.emb_dict.get_embs(el) for el in b] return embedding.predict_on_batch(x=b) elif not self.token_embeddings and self.char_embeddings: return embedding.predict_on_batch(x=batch) elif self.token_embeddings and self.char_embeddings: if self.use_matrix: return embedding.predict_on_batch(x=batch) else: b = [self.emb_dict.get_embs(batch[i][:,:,0]) for i in range(len(batch))] b = [np.concatenate([b[i], batch[i][:,:,1:]], axis=2) for i in range(len(batch))] return embedding.predict_on_batch(x=b) def predict_embedding(self, batch, bs, type='context'): num_batches = len(batch[0]) // bs embs = [] for i in range(num_batches): b = [batch[j][i * bs:(i + 1) * bs] for j in range(len(batch))] embs.append(self.predict_embedding_on_batch(b)) if len(batch[0]) % bs != 0: b = [batch[j][num_batches * bs:] for j in range(len(batch))] embs.append(self.predict_embedding_on_batch(b, type=type)) embs = np.vstack(embs) return embs
# def triplet_model(self): # if self.embedding_level is None or self.embedding_level == 'token': # if self.use_matrix: # context1 = Input(shape=(self.max_sequence_length,)) # response_positive = Input(shape=(self.max_sequence_length,)) # context2 = Input(shape=(self.max_sequence_length,)) # response_negative = Input(shape=(self.max_sequence_length,)) # emb_layer_a, emb_layer_b = self.embedding_layer() # emb_c1 = emb_layer_a(context1) # emb_c2 = emb_layer_a(context2) # emb_rp = emb_layer_b(response_positive) # emb_rn = emb_layer_b(response_negative) # else: # context1 = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # response_positive = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # context2 = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # response_negative = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # emb_c1 = context1 # emb_c2 = context2 # emb_rp = response_positive # emb_rn = response_negative # elif self.embedding_level == 'char': # context1 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_positive = Input(shape=(self.max_sequence_length, self.max_token_length,)) # context2 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_negative = Input(shape=(self.max_sequence_length, self.max_token_length,)) # # char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, # char_embedding_dim=self.char_emb_dim) # emb_c1 = char_cnn_layer(context1) # emb_c2 = char_cnn_layer(context2) # emb_rp = char_cnn_layer(response_positive) # emb_rn = char_cnn_layer(response_negative) # # elif self.embedding_level == 'token_and_char': # context1 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # context2 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_positive = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_negative = Input(shape=(self.max_sequence_length, self.max_token_length,)) # # if self.use_matrix: # c_tok1 = Lambda(lambda x: x[:,:,0])(context1) # c_tok2 = Lambda(lambda x: x[:,:,0])(context2) # rp_tok = Lambda(lambda x: x[:,:,0])(response_positive) # rn_tok = Lambda(lambda x: x[:,:,0])(response_negative) # emb_layer_a, emb_layer_b = self.embedding_layer() # emb_c1 = emb_layer_a(c_tok1) # emb_c2 = emb_layer_a(c_tok2) # emb_rp = emb_layer_b(rp_tok) # emb_rn = emb_layer_b(rn_tok) # c_char1 = Lambda(lambda x: x[:,:,1:])(context1) # c_char2 = Lambda(lambda x: x[:,:,1:])(context2) # rp_char = Lambda(lambda x: x[:,:,1:])(response_positive) # rn_char = Lambda(lambda x: x[:,:,1:])(response_negative) # else: # c_tok1 = Lambda(lambda x: x[:,:,:self.embedding_dim])(context1) # c_tok2 = Lambda(lambda x: x[:,:,:self.embedding_dim])(context2) # rp_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response_positive) # rn_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response_negative) # emb_c1 = c_tok1 # emb_c2 = c_tok2 # emb_rp = rp_tok # emb_rn = rn_tok # c_char1 = Lambda(lambda x: x[:,:,self.embedding_dim:])(context1) # c_char2 = Lambda(lambda x: x[:,:,self.embedding_dim:])(context2) # rp_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response_positive) # rn_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response_negative) # # char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, # char_embedding_dim=self.char_emb_dim) # # emb_c_char1 = char_cnn_layer(c_char1) # emb_c_char2 = char_cnn_layer(c_char2) # emb_rp_char = char_cnn_layer(rp_char) # emb_rn_char = char_cnn_layer(rn_char) # # emb_c1 = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_c1, emb_c_char1]) # emb_c2 = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_c2, emb_c_char2]) # emb_rp = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rp, emb_rp_char]) # emb_rn = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rn, emb_rn_char]) # # lstm_layer_a, lstm_layer_b = self.lstm_layer() # lstm_c1 = lstm_layer_a(emb_c1) # lstm_c2 = lstm_layer_a(emb_c2) # lstm_rp = lstm_layer_b(emb_rp) # lstm_rn = lstm_layer_b(emb_rn) # if self.pooling: # pooling_layer = GlobalMaxPooling1D(name="pooling") # lstm_c1 = pooling_layer(lstm_c1) # lstm_c2 = pooling_layer(lstm_c2) # lstm_rp = pooling_layer(lstm_rp) # lstm_rn = pooling_layer(lstm_rn) # if self.distance == "euclidian": # dist_score = Lambda(lambda x: K.expand_dims(self.euclidian_dist(x)), name="score_model") # dist_pos = dist_score([lstm_c1, lstm_rp]) # dist_neg = dist_score([lstm_c2, lstm_rn]) # elif self.distance == "cos_similarity": # cosine_layer = Dot(normalize=True, axes=-1, name="score_model") # dist_pos = cosine_layer([lstm_c1, lstm_rp]) # dist_pos = Lambda(lambda x: 1. - x)(dist_pos) # dist_neg = cosine_layer([lstm_c2, lstm_rn]) # dist_neg = Lambda(lambda x: 1. - x)(dist_neg) # score_diff = Subtract()([dist_neg, dist_pos]) # model = Model([context1, response_positive, context2, response_negative], score_diff) # return model # def triplet_model(self): # if self.embedding_level is None or self.embedding_level == 'token': # if self.use_matrix: # context1 = Input(shape=(self.max_sequence_length,)) # response_positive = Input(shape=(self.max_sequence_length,)) # context2 = Input(shape=(self.max_sequence_length,)) # response_negative = Input(shape=(self.max_sequence_length,)) # emb_layer_a, emb_layer_b = self.embedding_layer() # emb_c1 = emb_layer_a(context1) # emb_c2 = emb_layer_a(context2) # emb_rp = emb_layer_b(response_positive) # emb_rn = emb_layer_b(response_negative) # else: # context1 = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # response_positive = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # context2 = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # response_negative = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # emb_c1 = context1 # emb_c2 = context2 # emb_rp = response_positive # emb_rn = response_negative # elif self.embedding_level == 'char': # context1 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_positive = Input(shape=(self.max_sequence_length, self.max_token_length,)) # context2 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_negative = Input(shape=(self.max_sequence_length, self.max_token_length,)) # # char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, # char_embedding_dim=self.char_emb_dim) # emb_c1 = char_cnn_layer(context1) # emb_c2 = char_cnn_layer(context2) # emb_rp = char_cnn_layer(response_positive) # emb_rn = char_cnn_layer(response_negative) # # elif self.embedding_level == 'token_and_char': # context1 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # context2 = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_positive = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_negative = Input(shape=(self.max_sequence_length, self.max_token_length,)) # # if self.use_matrix: # c_tok1 = Lambda(lambda x: x[:,:,0])(context1) # c_tok2 = Lambda(lambda x: x[:,:,0])(context2) # rp_tok = Lambda(lambda x: x[:,:,0])(response_positive) # rn_tok = Lambda(lambda x: x[:,:,0])(response_negative) # emb_layer_a, emb_layer_b = self.embedding_layer() # emb_c1 = emb_layer_a(c_tok1) # emb_c2 = emb_layer_a(c_tok2) # emb_rp = emb_layer_b(rp_tok) # emb_rn = emb_layer_b(rn_tok) # c_char1 = Lambda(lambda x: x[:,:,1:])(context1) # c_char2 = Lambda(lambda x: x[:,:,1:])(context2) # rp_char = Lambda(lambda x: x[:,:,1:])(response_positive) # rn_char = Lambda(lambda x: x[:,:,1:])(response_negative) # else: # c_tok1 = Lambda(lambda x: x[:,:,:self.embedding_dim])(context1) # c_tok2 = Lambda(lambda x: x[:,:,:self.embedding_dim])(context2) # rp_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response_positive) # rn_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response_negative) # emb_c1 = c_tok1 # emb_c2 = c_tok2 # emb_rp = rp_tok # emb_rn = rn_tok # c_char1 = Lambda(lambda x: x[:,:,self.embedding_dim:])(context1) # c_char2 = Lambda(lambda x: x[:,:,self.embedding_dim:])(context2) # rp_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response_positive) # rn_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response_negative) # # char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, # char_embedding_dim=self.char_emb_dim) # # emb_c_char1 = char_cnn_layer(c_char1) # emb_c_char2 = char_cnn_layer(c_char2) # emb_rp_char = char_cnn_layer(rp_char) # emb_rn_char = char_cnn_layer(rn_char) # # emb_c1 = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_c1, emb_c_char1]) # emb_c2 = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_c2, emb_c_char2]) # emb_rp = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rp, emb_rp_char]) # emb_rn = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rn, emb_rn_char]) # # lstm_layer_a, lstm_layer_b = self.lstm_layer() # lstm_c1 = lstm_layer_a(emb_c1) # lstm_c2 = lstm_layer_a(emb_c2) # lstm_rp = lstm_layer_b(emb_rp) # lstm_rn = lstm_layer_b(emb_rn) # if self.pooling: # pooling_layer = GlobalMaxPooling1D(name="pooling") # lstm_c1 = pooling_layer(lstm_c1) # lstm_c2 = pooling_layer(lstm_c2) # lstm_rp = pooling_layer(lstm_rp) # lstm_rn = pooling_layer(lstm_rn) # if self.distance == "euclidian": # dist_score = Lambda(lambda x: K.expand_dims(self.euclidian_dist(x)), name="score_model") # dist_pos = dist_score([lstm_c1, lstm_rp]) # dist_neg = dist_score([lstm_c2, lstm_rn]) # elif self.distance == "cos_similarity": # cosine_layer = Dot(normalize=True, axes=-1, name="score_model") # dist_pos = cosine_layer([lstm_c1, lstm_rp]) # dist_pos = Lambda(lambda x: 1. - x)(dist_pos) # dist_neg = cosine_layer([lstm_c2, lstm_rn]) # dist_neg = Lambda(lambda x: 1. - x)(dist_neg) # score_diff = Subtract()([dist_neg, dist_pos]) # model = Model([context1, response_positive, context2, response_negative], score_diff) # return model # def triplet_hinge_loss_model(self): # if self.embedding_level is None or self.embedding_level == 'token': # if self.use_matrix: # context = Input(shape=(self.max_sequence_length,)) # response_positive = Input(shape=(self.max_sequence_length,)) # response_negative = Input(shape=(self.max_sequence_length,)) # emb_layer_a, emb_layer_b = self.embedding_layer() # emb_c = emb_layer_a(context) # emb_rp = emb_layer_b(response_positive) # emb_rn = emb_layer_b(response_negative) # else: # context = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # response_positive = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # response_negative = Input(shape=(self.max_sequence_length, self.embedding_dim,)) # emb_c = context # emb_rp = response_positive # emb_rn = response_negative # elif self.embedding_level == 'char': # context = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_positive = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_negative = Input(shape=(self.max_sequence_length, self.max_token_length,)) # # char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, # char_embedding_dim=self.char_emb_dim) # emb_c = char_cnn_layer(context) # emb_rp = char_cnn_layer(response_positive) # emb_rn = char_cnn_layer(response_negative) # # elif self.embedding_level == 'token_and_char': # context = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_positive = Input(shape=(self.max_sequence_length, self.max_token_length,)) # response_negative = Input(shape=(self.max_sequence_length, self.max_token_length,)) # # if self.use_matrix: # c_tok = Lambda(lambda x: x[:,:,0])(context) # rp_tok = Lambda(lambda x: x[:,:,0])(response_positive) # rn_tok = Lambda(lambda x: x[:,:,0])(response_negative) # emb_layer_a, emb_layer_b = self.embedding_layer() # emb_c = emb_layer_a(c_tok) # emb_rp = emb_layer_b(rp_tok) # emb_rn = emb_layer_b(rn_tok) # c_char = Lambda(lambda x: x[:,:,1:])(context) # rp_char = Lambda(lambda x: x[:,:,1:])(response_positive) # rn_char = Lambda(lambda x: x[:,:,1:])(response_negative) # else: # c_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(context) # rp_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response_positive) # rn_tok = Lambda(lambda x: x[:,:,:self.embedding_dim])(response_negative) # emb_c = c_tok # emb_rp = rp_tok # emb_rn = rn_tok # c_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(context) # rp_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response_positive) # rn_char = Lambda(lambda x: x[:,:,self.embedding_dim:])(response_negative) # # char_cnn_layer = keras_layers.char_emb_cnn_func(n_characters=self.chars_num, # char_embedding_dim=self.char_emb_dim) # # emb_c_char = char_cnn_layer(c_char) # emb_rp_char = char_cnn_layer(rp_char) # emb_rn_char = char_cnn_layer(rn_char) # # emb_c = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_c, emb_c_char]) # emb_rp = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rp, emb_rp_char]) # emb_rn = Lambda(lambda x: K.concatenate(x, axis=-1))([emb_rn, emb_rn_char]) # # lstm_layer_a, lstm_layer_b = self.lstm_layer() # lstm_c = lstm_layer_a(emb_c) # lstm_rp = lstm_layer_b(emb_rp) # lstm_rn = lstm_layer_b(emb_rn) # if self.pooling: # pooling_layer = GlobalMaxPooling1D(name="pooling") # lstm_c = pooling_layer(lstm_c) # lstm_rp = pooling_layer(lstm_rp) # lstm_rn = pooling_layer(lstm_rn) # if self.distance == "euclidian": # dist_score = Lambda(self.euclidian_dist, # output_shape=self.euclidian_dist_output_shape, # name="score_model") # dist_pos = dist_score([lstm_c, lstm_rp]) # dist_neg = dist_score([lstm_c, lstm_rn]) # score_diff = Subtract()([dist_neg, dist_pos]) # elif self.distance == "cos_similarity": # cosine_layer = Dot(normalize=True, axes=-1, name="score_model") # dist_pos = cosine_layer([lstm_c, lstm_rp]) # dist_neg = cosine_layer([lstm_c, lstm_rn]) # score_diff = Subtract()([dist_pos, dist_neg]) # model = Model([context, response_positive, response_negative], score_diff) # return model