Source code for deeppavlov.core.models.torch_model

# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
from logging import getLogger
from pathlib import Path
from typing import Optional, Union

import torch

from deeppavlov.core.common.errors import ConfigError
from deeppavlov.core.models.nn_model import NNModel

log = getLogger(__name__)


[docs]class TorchModel(NNModel): """Class implements torch model's main methods. Args: model: torch.nn.Model-based neural network model device: device to use optimizer: name of `torch.optim` optimizer optimizer_parameters: dictionary with optimizer parameters learning_rate_drop_patience: how many validations with no improvements to wait learning_rate_drop_div: the divider of the learning rate after `learning_rate_drop_patience` unsuccessful validations load_before_drop: whether to load best model before dropping learning rate or not min_learning_rate: min value of learning rate if learning rate decay is used args: kwargs: dictionary with other model parameters Attributes: device: `cpu` or `cuda` device to use opt: dictionary with all model parameters model: torch model epochs_done: number of epochs that were done optimizer: `torch.optim` instance learning_rate_drop_patience: how many validations with no improvements to wait learning_rate_drop_div: the divider of the learning rate after `learning_rate_drop_patience` unsuccessful validations load_before_drop: whether to load best model before dropping learning rate or not min_learning_rate: min value of learning rate if learning rate decay is used clip_norm: clip gradients by norm coefficient """ def __init__(self, model: torch.nn.Module, device: Union[torch.device, str] = "cuda", optimizer: str = "AdamW", optimizer_parameters: Optional[dict] = None, learning_rate_drop_patience: Optional[int] = None, learning_rate_drop_div: Optional[float] = None, load_before_drop: bool = True, min_learning_rate: float = 1e-07, clip_norm: Optional[float] = None, *args, **kwargs): super().__init__(*args, **kwargs) self.model = model self.device = self._init_device(device) self.model.to(self.device) if self.device.type == "cuda" and torch.cuda.device_count() > 1: self.model = torch.nn.DataParallel(self.model) if optimizer_parameters is None: optimizer_parameters = {"lr": 0.01} self.optimizer = getattr(torch.optim, optimizer)(self.model.parameters(), **optimizer_parameters) self.epochs_done = 0 self.learning_rate_drop_patience = learning_rate_drop_patience self.learning_rate_drop_div = learning_rate_drop_div self.load_before_drop = load_before_drop self.min_learning_rate = min_learning_rate self.clip_norm = clip_norm self.load() # we need to switch to eval mode here because by default it's in `train` mode. # But in case of `interact/build_model` usage, we need to have model in eval mode. self.model.eval() log.debug(f"Model was successfully initialized! Model summary:\n {self.model}") def _init_device(self, device: Union[torch.device, str]) -> torch.device: if device == "gpu": device = "cuda" if isinstance(device, str): device = torch.device(device) if device.type == "cuda" and not torch.cuda.is_available(): log.warning(f"Unable to place component {self.__class__.__name__} on GPU, " "since no CUDA GPUs are available. Using CPU.") device = torch.device('cpu') return device @property def is_data_parallel(self) -> bool: return isinstance(self.model, torch.nn.DataParallel) def load(self, fname: Optional[str] = None, *args, **kwargs) -> None: """Load model from `fname` (if `fname` is not given, use `self.load_path`) to `self.model` along with the optimizer `self.optimizer`. If `fname` (if `fname` is not given, use `self.load_path`) does not exist, initialize model from scratch. Args: fname: string path to checkpoint *args: **kwargs: Returns: None """ if fname is not None: self.load_path = fname if self.load_path: log.debug(f"Load path {self.load_path} is given.") if isinstance(self.load_path, Path) and not self.load_path.parent.is_dir(): raise ConfigError("Provided load path is incorrect!") weights_path = Path(self.load_path.resolve()) weights_path = weights_path.with_suffix(f".pth.tar") if weights_path.exists(): log.debug(f"Load path {weights_path} exists.") log.debug(f"Initializing `{self.__class__.__name__}` from saved.") # now load the weights, optimizer from saved log.debug(f"Loading weights from {weights_path}.") checkpoint = torch.load(weights_path, map_location=self.device) model_state = checkpoint["model_state_dict"] optimizer_state = checkpoint["optimizer_state_dict"] # load a multi-gpu model on a single device if all([key.startswith("module.") for key in list(model_state.keys())]): model_state = {key.replace("module.", "", 1): val for key, val in model_state.items()} if self.is_data_parallel: self.model.module.load_state_dict(model_state) else: self.model.load_state_dict(model_state) try: # TODO: remove this try-except after hf models deep update self.optimizer.load_state_dict(optimizer_state) except ValueError as e: log.error(f'Failed to load optimizer state due to {repr(e)}') self.epochs_done = checkpoint.get("epochs_done", 0) else: log.warning(f"Init from scratch. Load path {weights_path} does not exist.") else: log.warning(f"Init from scratch. Load path {self.load_path} is not provided.") self.model.to(self.device) def save(self, fname: Optional[str] = None, *args, **kwargs) -> None: """Save torch model to `fname` (if `fname` is not given, use `self.save_path`). Checkpoint includes `model_state_dict`, `optimizer_state_dict`, and `epochs_done` (number of training epochs). Args: fname: *args: **kwargs: Returns: """ if fname is None: fname = self.save_path if not fname.parent.is_dir(): raise ConfigError("Provided save path is incorrect!") weights_path = Path(fname).with_suffix(f".pth.tar") log.info(f"Saving model to {weights_path}.") # move the model to `cpu` before saving to provide consistency if self.is_data_parallel: model_state_dict = self.model.module.cpu().state_dict() else: model_state_dict = self.model.cpu().state_dict() torch.save({ "model_state_dict": model_state_dict, "optimizer_state_dict": self.optimizer.state_dict(), "epochs_done": self.epochs_done }, weights_path) # return it back to device (necessary if it was on `cuda`) self.model.to(self.device) def process_event(self, event_name: str, data: dict) -> None: """Process event. After epoch, increase `self.epochs_done`. After validation, decrease learning rate in `self.learning_rate_drop_div` times (not lower than `self.min_learning_rate`) if given `self.learning_rate_drop_patience`. Args: event_name: whether event is send after epoch or batch. Set of values: ``"after_epoch", "after_batch"`` data: event data (dictionary) Returns: None """ if event_name == "after_epoch": self.epochs_done += 1 if event_name == "after_validation" and 'impatience' in data and self.learning_rate_drop_patience: if data['impatience'] == self.learning_rate_drop_patience: log.info(f"----------Current LR is decreased in {self.learning_rate_drop_div} times----------") if self.load_before_drop: self.load(self.save_path) self.model.eval() for param_group in self.optimizer.param_groups: param_group['lr'] = max(param_group['lr'] / self.learning_rate_drop_div, self.min_learning_rate) @abstractmethod def train_on_batch(self, x: list, y: list): pass def _make_step(self, loss: torch.Tensor) -> None: loss.backward() if self.clip_norm is not None: torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.clip_norm) self.optimizer.step()