Source code for deeppavlov.models.preprocessors.bert_preprocessor

# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import random
from logging import getLogger
from typing import Tuple, List, Optional, Union

from bert_dp.preprocessing import convert_examples_to_features, InputExample, InputFeatures
from bert_dp.tokenization import FullTokenizer

from deeppavlov.core.commands.utils import expand_path
from deeppavlov.core.common.registry import register
from deeppavlov.core.data.utils import zero_pad
from deeppavlov.core.models.component import Component
from deeppavlov.models.preprocessors.mask import Mask

log = getLogger(__name__)


[docs]@register('bert_preprocessor') class BertPreprocessor(Component): """Tokenize text on subtokens, encode subtokens with their indices, create tokens and segment masks. Check details in :func:`bert_dp.preprocessing.convert_examples_to_features` function. Args: vocab_file: path to vocabulary do_lower_case: set True if lowercasing is needed max_seq_length: max sequence length in subtokens, including [SEP] and [CLS] tokens Attributes: max_seq_length: max sequence length in subtokens, including [SEP] and [CLS] tokens tokenizer: instance of Bert FullTokenizer """ def __init__(self, vocab_file: str, do_lower_case: bool = True, max_seq_length: int = 512, **kwargs) -> None: self.max_seq_length = max_seq_length vocab_file = str(expand_path(vocab_file)) self.tokenizer = FullTokenizer(vocab_file=vocab_file, do_lower_case=do_lower_case)
[docs] def __call__(self, texts_a: List[str], texts_b: Optional[List[str]] = None) -> List[InputFeatures]: """Call Bert :func:`bert_dp.preprocessing.convert_examples_to_features` function to tokenize and create masks. texts_a and texts_b are separated by [SEP] token Args: texts_a: list of texts, texts_b: list of texts, it could be None, e.g. single sentence classification task Returns: batch of :class:`bert_dp.preprocessing.InputFeatures` with subtokens, subtoken ids, subtoken mask, segment mask. """ if texts_b is None: texts_b = [None] * len(texts_a) # unique_id is not used examples = [InputExample(unique_id=0, text_a=text_a, text_b=text_b) for text_a, text_b in zip(texts_a, texts_b)] return convert_examples_to_features(examples, self.max_seq_length, self.tokenizer)
[docs]@register('bert_ner_preprocessor') class BertNerPreprocessor(Component): """Takes tokens and splits them into bert subtokens, encodes subtokens with their indices. Creates a mask of subtokens (one for the first subtoken, zero for the others). If tags are provided, calculates tags for subtokens. Args: vocab_file: path to vocabulary do_lower_case: set True if lowercasing is needed max_seq_length: max sequence length in subtokens, including [SEP] and [CLS] tokens max_subword_length: replace token to <unk> if it's length is larger than this (defaults to None, which is equal to +infinity) token_masking_prob: probability of masking token while training provide_subword_tags: output tags for subwords or for words subword_mask_mode: subword to select inside word tokens, can be "first" or "last" (default="first") Attributes: max_seq_length: max sequence length in subtokens, including [SEP] and [CLS] tokens max_subword_length: rmax lenght of a bert subtoken tokenizer: instance of Bert FullTokenizer """ def __init__(self, vocab_file: str, do_lower_case: bool = False, max_seq_length: int = 512, max_subword_length: int = None, token_masking_prob: float = 0.0, provide_subword_tags: bool = False, subword_mask_mode: str = "first", **kwargs): self._re_tokenizer = re.compile(r"[\w']+|[^\w ]") self.provide_subword_tags = provide_subword_tags self.mode = kwargs.get('mode') self.max_seq_length = max_seq_length self.max_subword_length = max_subword_length self.subword_mask_mode = subword_mask_mode vocab_file = str(expand_path(vocab_file)) self.tokenizer = FullTokenizer(vocab_file=vocab_file, do_lower_case=do_lower_case) self.token_masking_prob = token_masking_prob
[docs] def __call__(self, tokens: Union[List[List[str]], List[str]], tags: List[List[str]] = None, **kwargs): if isinstance(tokens[0], str): tokens = [re.findall(self._re_tokenizer, s) for s in tokens] subword_tokens, subword_tok_ids, startofword_markers, subword_tags = [], [], [], [] for i in range(len(tokens)): toks = tokens[i] ys = ['O'] * len(toks) if tags is None else tags[i] assert len(toks) == len(ys), \ f"toks({len(toks)}) should have the same length as ys({len(ys)})" sw_toks, sw_marker, sw_ys = \ self._ner_bert_tokenize(toks, ys, self.tokenizer, self.max_subword_length, mode=self.mode, subword_mask_mode=self.subword_mask_mode, token_masking_prob=self.token_masking_prob) if self.max_seq_length is not None: if len(sw_toks) > self.max_seq_length: raise RuntimeError(f"input sequence after bert tokenization" f" shouldn't exceed {self.max_seq_length} tokens.") subword_tokens.append(sw_toks) subword_tok_ids.append(self.tokenizer.convert_tokens_to_ids(sw_toks)) startofword_markers.append(sw_marker) subword_tags.append(sw_ys) assert len(sw_marker) == len(sw_toks) == len(subword_tok_ids[-1]) == len(sw_ys), \ f"length of sow_marker({len(sw_marker)}), tokens({len(sw_toks)})," \ f" token ids({len(subword_tok_ids[-1])}) and ys({len(ys)})" \ f" for tokens = `{toks}` should match" subword_tok_ids = zero_pad(subword_tok_ids, dtype=int, padding=0) startofword_markers = zero_pad(startofword_markers, dtype=int, padding=0) attention_mask = Mask()(subword_tokens) if tags is not None: if self.provide_subword_tags: return tokens, subword_tokens, subword_tok_ids, \ attention_mask, startofword_markers, subword_tags else: nonmasked_tags = [[t for t in ts if t != 'X'] for ts in tags] for swts, swids, swms, ts in zip(subword_tokens, subword_tok_ids, startofword_markers, nonmasked_tags): if (len(swids) != len(swms)) or (len(ts) != sum(swms)): log.warning('Not matching lengths of the tokenization!') log.warning(f'Tokens len: {len(swts)}\n Tokens: {swts}') log.warning(f'Markers len: {len(swms)}, sum: {sum(swms)}') log.warning(f'Masks: {swms}') log.warning(f'Tags len: {len(ts)}\n Tags: {ts}') return tokens, subword_tokens, subword_tok_ids, \ attention_mask, startofword_markers, nonmasked_tags return tokens, subword_tokens, subword_tok_ids, startofword_markers, attention_mask
@staticmethod def _ner_bert_tokenize(tokens: List[str], tags: List[str], tokenizer: FullTokenizer, max_subword_len: int = None, mode: str = None, subword_mask_mode: str = "first", token_masking_prob: float = None) -> Tuple[List[str], List[int], List[str]]: do_masking = (mode == 'train') and (token_masking_prob is not None) do_cutting = (max_subword_len is not None) tokens_subword = ['[CLS]'] startofword_markers = [0] tags_subword = ['X'] for token, tag in zip(tokens, tags): token_marker = int(tag != 'X') subwords = tokenizer.tokenize(token) if not subwords or (do_cutting and (len(subwords) > max_subword_len)): tokens_subword.append('[UNK]') startofword_markers.append(token_marker) tags_subword.append(tag) else: if do_masking and (random.random() < token_masking_prob): tokens_subword.extend(['[MASK]'] * len(subwords)) else: tokens_subword.extend(subwords) if subword_mask_mode == "last": startofword_markers.extend([0] * (len(subwords) - 1) + [token_marker]) else: startofword_markers.extend([token_marker] + [0] * (len(subwords) - 1)) tags_subword.extend([tag] + ['X'] * (len(subwords) - 1)) tokens_subword.append('[SEP]') startofword_markers.append(0) tags_subword.append('X') return tokens_subword, startofword_markers, tags_subword
[docs]@register('bert_ranker_preprocessor') class BertRankerPreprocessor(BertPreprocessor): """Tokenize text to sub-tokens, encode sub-tokens with their indices, create tokens and segment masks for ranking. Builds features for a pair of context with each of the response candidates. """
[docs] def __call__(self, batch: List[List[str]]) -> List[List[InputFeatures]]: """Call BERT :func:`bert_dp.preprocessing.convert_examples_to_features` function to tokenize and create masks. Args: batch: list of elemenents where the first element represents the batch with contexts and the rest of elements represent response candidates batches Returns: list of feature batches with subtokens, subtoken ids, subtoken mask, segment mask. """ if isinstance(batch[0], str): batch = [batch] cont_resp_pairs = [] if len(batch[0]) == 1: contexts = batch[0] responses_empt = [None] * len(batch) cont_resp_pairs.append(zip(contexts, responses_empt)) else: contexts = [el[0] for el in batch] for i in range(1, len(batch[0])): responses = [] for el in batch: responses.append(el[i]) cont_resp_pairs.append(zip(contexts, responses)) examples = [] for s in cont_resp_pairs: ex = [InputExample(unique_id=0, text_a=context, text_b=response) for context, response in s] examples.append(ex) features = [convert_examples_to_features(el, self.max_seq_length, self.tokenizer) for el in examples] return features
[docs]@register('bert_sep_ranker_preprocessor') class BertSepRankerPreprocessor(BertPreprocessor): """Tokenize text to sub-tokens, encode sub-tokens with their indices, create tokens and segment masks for ranking. Builds features for a context and for each of the response candidates separately. """
[docs] def __call__(self, batch: List[List[str]]) -> List[List[InputFeatures]]: """Call BERT :func:`bert_dp.preprocessing.convert_examples_to_features` function to tokenize and create masks. Args: batch: list of elemenents where the first element represents the batch with contexts and the rest of elements represent response candidates batches Returns: list of feature batches with subtokens, subtoken ids, subtoken mask, segment mask for the context and each of response candidates separately. """ if isinstance(batch[0], str): batch = [batch] samples = [] for i in range(len(batch[0])): s = [] for el in batch: s.append(el[i]) samples.append(s) s_empt = [None] * len(samples[0]) # TODO: add unique id examples = [] for s in samples: ex = [InputExample(unique_id=0, text_a=text_a, text_b=text_b) for text_a, text_b in zip(s, s_empt)] examples.append(ex) features = [convert_examples_to_features(el, self.max_seq_length, self.tokenizer) for el in examples] return features
[docs]@register('bert_sep_ranker_predictor_preprocessor') class BertSepRankerPredictorPreprocessor(BertSepRankerPreprocessor): """Tokenize text to sub-tokens, encode sub-tokens with their indices, create tokens and segment masks for ranking. Builds features for a context and for each of the response candidates separately. In addition, builds features for a response (and corresponding context) text base. Args: resps: list of strings containing the base of text responses resp_vecs: BERT vector respresentations of ``resps``, if is ``None`` features for the response base will be build conts: list of strings containing the base of text contexts cont_vecs: BERT vector respresentations of ``conts``, if is ``None`` features for the response base will be build """ def __init__(self, resps=None, resp_vecs=None, conts=None, cont_vecs=None, **kwargs) -> None: super().__init__(**kwargs) self.resp_features = None self.cont_features = None if resps is not None and resp_vecs is None: log.info("Building BERT features for the response base...") resp_batch = [[el] for el in resps] self.resp_features = self(resp_batch) if conts is not None and cont_vecs is None: log.info("Building BERT features for the context base...") cont_batch = [[el] for el in conts] self.cont_features = self(cont_batch)