Source code for deeppavlov.dataset_iterators.multitask_iterator

# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import math
import random
from logging import getLogger
from typing import Iterator, Optional, Tuple, Union

import numpy as np

from deeppavlov.core.common.errors import ConfigError
from deeppavlov.core.common.params import from_params
from deeppavlov.core.common.registry import register
from deeppavlov.core.data.data_learning_iterator import DataLearningIterator

log = getLogger(__name__)


[docs]@register('multitask_iterator') class MultiTaskIterator: """ Class merges data from several dataset iterators. When used for batch generation batches from merged dataset iterators are united into one batch. If sizes of merged datasets are different smaller datasets are repeated until their size becomes equal to the largest dataset. Args: data: dictionary which keys are task names and values are dictionaries with fields ``"train", "valid", "test"``. num_train_epochs: number of training epochs tasks: dictionary which keys are task names and values are init params of dataset iterators. If task has key-value pair ``'use_task_defaults': False`` task_defaults for this task dataset iterator will be ignored. batch_size: batch_size sampling_mode: mode of sampling we use. It can be plain, uniform or anneal. gradient_accumulation_steps: number of gradient accumulation steps. Default is 1 steps_per_epoch: number of steps per epoch. Nesessary if gradient_accumulation_steps > 1 iterator_class_name: name of iterator class. use_label_name, seed, features - parameters for the iterator class one_element_tuples: if True, tuple of x consisting of one element is returned in this element. Default: True task_defaults: default task parameters. seed - random seed for sampling Attributes: data: dictionary of data with fields "train", "valid" and "test" (or some of them) """ def __init__( self, data: dict, num_train_epochs: int, tasks: dict, batch_size: int = 8, sampling_mode: str = 'plain', gradient_accumulation_steps: int = 1, steps_per_epoch: int = 0, one_element_tuples: bool = True, task_defaults: dict = None, seed: int = 42, **kwargs ): if data.keys() != tasks.keys(): raise ConfigError("Task names from dataset reader don't mach task names from dataset iterator: " f"{data.keys()} != {tasks.keys()}.") self.task_iterators = {} if task_defaults is None: task_defaults = dict() for task_name, task_params in tasks.items(): if task_params.pop('use_task_defaults', True) is True: task_config = copy.deepcopy(task_defaults) task_config.update(task_params) else: task_config = task_params try: self.task_iterators[task_name] = from_params(task_config, data=data[task_name]) except Exception as e: log.error(f'Failed to initialize dataset_iterator for "{task_name}" task. Make sure that all parameters' 'from `task_defaults` and task parameters are correct.') raise e self.n_tasks = len(tasks.keys()) self.num_train_epochs = num_train_epochs self.steps_per_epoch = steps_per_epoch self.gradient_accumulation_steps = gradient_accumulation_steps self.epochs_done = 0 self.steps_taken = 0 self.task_id = None self.sampling_mode = sampling_mode self.data = { "train": self._extract_data_type("train"), "valid": self._extract_data_type("valid"), "test": self._extract_data_type("test"), } for mode in ["train", "valid", "test"]: log.info(f'For {mode}') for task_name in self.data[mode]: log.info(f'{task_name} has {len(self.data[mode][task_name])} examples') self.train_sizes = self._get_data_size("train") if steps_per_epoch == 0: self.steps_per_epoch = sum(self.train_sizes) // batch_size else: self.steps_per_epoch = steps_per_epoch def is_nan(a): return a != a for mode in ['train', 'valid', 'test']: for task in self.data[mode]: for i in range(len(self.data[mode][task]) - 1, -1, -1): x = self.data[mode][task][i][0] y = self.data[mode][task][i][1] if is_nan(x) or any([is_nan(z) for z in x]) or is_nan(y): log.info(f'NAN detected {self.data[mode][task][i - 1:i]}') del self.data[mode][task][i] log.info(f'NAN for mode {mode} task {task} element {i} CLEARED') elif isinstance(x, tuple) and len(x) == 1 and one_element_tuples: # x is a tuple consisting of 1 element. return it as string self.data[mode][task][i] = (x[0], y) self.max_task_data_len = dict() for data_type in self.data: sizes = self._get_data_size(data_type) self.max_task_data_len[data_type] = max(sizes) random.seed(seed) def _get_data_size(self, data_type): """Returns list of sizes of each dataset for the given data_type: train,test or valid.""" return [len(self.data[data_type][key]) for key in self.data[data_type]] def _get_probs(self, data_type): """Returns sampling probabilities for different sampling modes - plain, uniform or anneal""" if self.sampling_mode == 'uniform': sizes = [1 for _ in self._get_data_size(data_type)] # as we sample uniformly s = sum(sizes) probs = [p / s for p in sizes] elif self.sampling_mode == 'plain': sizes = self._get_data_size(data_type) n_samples = sum(sizes) probs = [p / n_samples for p in sizes] elif self.sampling_mode == 'anneal': alpha = 1.0 - 0.8 * (self.epochs_done / self.num_train_epochs) annealed_sizes = [p ** alpha for p in self._get_data_size(data_type)] n_samples = sum(annealed_sizes) probs = [p / n_samples for p in annealed_sizes] else: raise ValueError(f'Unsupported sampling mode {self.sampling_mode}') return probs def _extract_data_type(self, data_type): """Function that merges data of the current data_type (e.g. train) from all task_iterators into one dict""" dataset_part = {} for task, iterator in self.task_iterators.items(): dataset_part[task] = getattr(iterator, data_type) return dataset_part def _transform_before_yielding(self, x, y, batch_size): """Function that transforms data from dataset before yielding""" if len(x) != len(y): raise Exception(f'x has len {len(x)} but y has len {len(y)}') new_x, new_y = [], [] for i in range(batch_size): x_tuple = tuple([x[t_id][i] for t_id in range(self.n_tasks)]) y_tuple = tuple([y[t_id][i] for t_id in range(self.n_tasks)]) if self.n_tasks == 1: x_tuple = x_tuple[0] y_tuple = y_tuple[0] new_x.append(x_tuple) new_y.append(y_tuple) batches = (tuple(new_x), tuple(new_y)) return batches
[docs] def gen_batches(self, batch_size: int, data_type: str = "train", shuffle: bool = None) -> Iterator[Tuple[tuple, tuple]]: """ Generates batches and expected output to train neural networks. If there are not enough samples from any task, samples are padded with None Args: batch_size: number of samples in batch data_type: can be either 'train', 'test', or 'valid' shuffle: whether to shuffle dataset before batching Yields: A tuple of a batch of inputs and a batch of expected outputs. Inputs and outputs are tuples. Element of inputs or outputs is a tuple which elements are x values of merged tasks in the order tasks are present in `tasks` argument of `__init__` method. """ max_task_data_len = self.max_task_data_len[data_type] log.info(f'Batch size {batch_size} with gradient accumulation steps {self.gradient_accumulation_steps}') log.info(f'Efficient batch size {batch_size // self.gradient_accumulation_steps}') batch_size = batch_size // self.gradient_accumulation_steps if data_type == "train": generators = [ SingleTaskBatchGenerator(iter_, batch_size, data_type, shuffle) for iter_ in self.task_iterators.values() ] # probs only required while training probs = self._get_probs("train") for step in range(self.steps_per_epoch): if (self.steps_taken + 1) % self.gradient_accumulation_steps == 0 or self.task_id is None: self.task_id = np.random.choice(self.n_tasks, p=probs) x = [[None for _ in range(batch_size)] for _ in range(self.n_tasks)] y = [[None for _ in range(batch_size)] for _ in range(self.n_tasks)] x[self.task_id], y[self.task_id] = generators[self.task_id].__next__() if not all([s is None for s in x[self.task_id]]): batch_to_yield = self._transform_before_yielding( x, y, batch_size) yield batch_to_yield self.epochs_done += 1 # one additional step is taken while logging training metrics self.steps_taken -= 1 else: eval_batch_size = 1 x = [[None for _ in range(eval_batch_size)] for _ in range(self.n_tasks)] y = [[None for _ in range(eval_batch_size)] for _ in range(self.n_tasks)] generators = [ SingleTaskBatchGenerator( iter_, batch_size=eval_batch_size, data_type=data_type, shuffle=shuffle) for iter_ in self.task_iterators.values() ] for step in range(max_task_data_len): for task_id in range(self.n_tasks): x[task_id], y[task_id] = generators[task_id].__next__() batches = self._transform_before_yielding(x, y, eval_batch_size) yield batches
[docs] def get_instances(self, data_type: str = "train"): """ Returns a tuple of inputs and outputs from all datasets. Lengths of and outputs are equal to the size of the largest dataset. Smaller datasets are padded with Nones until their sizes are equal to the size of the largest dataset. Args: data_type: can be either 'train', 'test', or 'valid' Returns: A tuple of all inputs for a data type and all expected outputs for a data type. """ max_task_data_len = max( [ len(iter_.get_instances(data_type)[0]) for iter_ in self.task_iterators.values() ] ) x_instances = [] y_instances = [] for task_name, iter_ in self.task_iterators.items(): x, y = iter_.get_instances(data_type) n_repeats = math.ceil(max_task_data_len / len(x)) x *= n_repeats y *= n_repeats x_instances.append(x[:max_task_data_len]) y_instances.append(y[:max_task_data_len]) error_msg = f'Len of x_instances {len(x_instances)} and y_instances {len(y_instances)} dont match' if len(x_instances) != len(y_instances): raise Exception(error_msg) instances = (tuple(zip(*x_instances)), tuple(zip(*y_instances))) return instances
[docs]class SingleTaskBatchGenerator: """ Batch generator for a single task. If there are no elements in the dataset to form another batch, Nones are returned. Args: dataset_iterator: dataset iterator from which batches are drawn. batch_size: size fo the batch. data_type: "train", "valid", or "test" shuffle: whether dataset will be shuffled. n_batches: the number of batches that will be generated. """ def __init__( self, dataset_iterator: Union[DataLearningIterator], batch_size: int, data_type: str, shuffle: bool, n_batches: Optional[int] = None, size_of_last_batch: Optional[int] = None, ): self.dataset_iterator = dataset_iterator self.batch_size = batch_size self.data_type = data_type self.shuffle = shuffle self.n_batches = n_batches self.size_of_last_batch = ( self.batch_size if size_of_last_batch is None else size_of_last_batch) self.inner_batch_size = math.gcd( len(self.dataset_iterator.data[data_type]), batch_size ) self.gen = self.dataset_iterator.gen_batches( self.inner_batch_size, self.data_type, self.shuffle ) self.batch_count = 0 def __iter__(self): return self def __next__(self): if self.n_batches is not None and self.batch_count > self.n_batches: raise StopIteration x, y = (), () while len(x) < self.batch_size or len(y) < self.batch_size: try: xx, yy = next(self.gen) x += xx y += yy except StopIteration: x_nones = tuple([None for _ in range(self.batch_size)]) y_nones = x_nones return x_nones, y_nones self.batch_count += 1 if self.batch_count == self.n_batches: x = x[:self.size_of_last_batch] y = y[:self.size_of_last_batch] return x, y