# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from deeppavlov.core.common.errors import ConfigError
from deeppavlov.core.models.component import Component
from deeppavlov.core.models.nn_model import NNModel
[docs]class Chainer(Component):
"""
Builds an agent/component pipeline from heterogeneous components (Rule-based/ML/DL). It allows to train
and infer models in a pipeline as a whole.
"""
def __init__(self, in_x: [str, list] = None, out_params: [str, list] = None, in_y: [str, list] = None,
*args, as_component: bool = False, **kwargs):
self.pipe = []
self.train_pipe = []
if isinstance(in_x, str):
in_x = [in_x]
if isinstance(in_y, str):
in_y = [in_y]
if isinstance(out_params, str):
out_params = [out_params]
self.in_x = in_x or ['x']
self.in_y = in_y or ['y']
self.out_params = out_params or self.in_x
self.forward_map = set(self.in_x)
self.train_map = self.forward_map.union(self.in_y)
self.main = None
if as_component:
self._predict = self._predict_as_component
def append(self, component: Component, in_x: [str, list, dict]=None, out_params: [str, list]=None,
in_y: [str, list, dict]=None, main=False):
if isinstance(in_x, str):
in_x = [in_x]
if isinstance(in_y, str):
in_y = [in_y]
if isinstance(out_params, str):
out_params = [out_params]
in_x = in_x or self.in_x
if isinstance(in_x, dict):
x_keys, in_x = zip(*in_x.items())
else:
x_keys = []
out_params = out_params or in_x
if in_y is not None:
if isinstance(in_y, dict):
y_keys, in_y = zip(*in_y.items())
else:
y_keys = []
keys = x_keys + y_keys
if bool(x_keys) != bool(y_keys):
raise ConfigError('`in` and `in_y` for a component have to both be lists or dicts')
component: NNModel
main = True
assert self.train_map.issuperset(in_x+in_y), ('Arguments {} are expected but only {} are set'
.format(in_x+in_y, self.train_map))
preprocessor = Chainer(self.in_x, in_x+in_y, self.in_y)
for (t_in_x_keys, t_in_x), t_out, t_component in self.train_pipe:
if t_in_x_keys:
t_in_x = dict(zip(t_in_x_keys, t_in_x))
preprocessor.append(t_component, t_in_x, t_out)
def train_on_batch(*args, **kwargs):
preprocessed = zip(*preprocessor(*args, **kwargs))
if keys:
return component.train_on_batch(**dict(zip(keys, preprocessed)))
else:
return component.train_on_batch(*preprocessed)
self.train_on_batch = train_on_batch
self.process_event = component.process_event
if main:
self.main = component
if self.forward_map.issuperset(in_x):
self.pipe.append(((x_keys, in_x), out_params, component))
self.forward_map = self.forward_map.union(out_params)
if self.train_map.issuperset(in_x):
self.train_pipe.append(((x_keys, in_x), out_params, component))
self.train_map = self.train_map.union(out_params)
else:
raise ConfigError('Arguments {} are expected but only {} are set'.format(in_x, self.train_map))
def __call__(self, *args, **kwargs):
return self._predict(*args, **kwargs)
def _predict(self, x, y=None, to_return=None):
in_params = list(self.in_x)
if len(in_params) == 1:
args = [x]
else:
args = list(zip(*x))
if to_return is None:
to_return = self.out_params
if self.forward_map.issuperset(to_return):
pipe = self.pipe
elif y is None:
raise RuntimeError('Expected to return {} but only {} are set in memory'
.format(to_return, self.forward_map))
elif self.train_map.issuperset(to_return):
pipe = self.train_pipe
if len(self.in_y) == 1:
args.append(y)
else:
args += list(zip(*y))
in_params += self.in_y
else:
raise RuntimeError('Expected to return {} but only {} are set in memory'
.format(to_return, self.train_map))
mem = dict(zip(in_params, args))
del args, x, y
for (in_keys, in_params), out_params, component in pipe:
x = [mem[k] for k in in_params]
if in_keys:
res = component(**dict(zip(in_keys, x)))
else:
res = component(*x)
if len(out_params) == 1:
mem[out_params[0]] = res
else:
mem.update(zip(out_params, res))
res = [mem[k] for k in to_return]
if len(res) == 1:
return res[0]
return list(zip(*res))
def _predict_as_component(self, *args):
mem = dict(zip(self.in_x, args))
for (in_keys, in_params), out_params, component in self.pipe:
x = [mem[k] for k in in_params]
if in_keys:
res = component(**dict(zip(in_keys, x)))
else:
res = component(*x)
if len(out_params) == 1:
mem[out_params[0]] = res
else:
mem.update(zip(out_params, res))
res = [mem[k] for k in self.out_params]
if len(res) == 1:
res = res[0]
return res
def get_main_component(self):
return self.main or self.pipe[-1][-1]
def save(self):
self.get_main_component().save()
def load(self):
for component in self.pipe:
if inspect.ismethod(getattr(component, 'load', None)):
component.load()