Source code for deeppavlov.skills.rasa_skill.rasa_skill

# Copyright 2019 Neural Networks and Deep Learning lab, MIPT
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import uuid
from functools import reduce
from pathlib import Path
from typing import Tuple, Optional, List

from rasa.cli.utils import get_validated_path
from rasa.constants import DEFAULT_MODELS_PATH
from rasa.core.agent import Agent
from rasa.core.channels import CollectingOutputChannel
from rasa.core.channels import UserMessage
from rasa.model import get_model

from deeppavlov.core.common.registry import register
from deeppavlov.core.models.component import Component

logger = logging.getLogger(__name__)

[docs]@register("rasa_skill") class RASASkill(Component): """RASASkill lets you to wrap RASA Agent as a Skill within DeepPavlov environment. The component requires path to your RASA models (folder with timestamped tar.gz archieves) as you use in command `rasa run -m models --enable-api --log-file out.log` """ def __init__(self, path_to_models: str, **kwargs) -> None: """ Constructs RASA Agent as a DeepPavlov skill: read model folder, initialize rasa.core.agent.Agent and wrap it's interfaces Args: path_to_models: string path to folder with RASA models """ # we need absolute path (expanded for user home and resolved if it relative path): self.path_to_models = Path(path_to_models).expanduser().resolve() model = get_validated_path(self.path_to_models, "model", DEFAULT_MODELS_PATH) model_path = get_model(model) if not model_path: # can not laod model path raise Exception("can not load model path: %s" % model) self._agent = Agent.load(model_path) self.ioloop = asyncio.new_event_loop()"path to RASA models is: `{self.path_to_models}`") def __call__(self, utterances_batch: List[str], states_batch: Optional[List] = None) -> Tuple[List[str], List[float], list]: """Returns skill inference result. Returns batches of skill inference results, estimated confidence levels and up to date states corresponding to incoming utterance batch. Args: utterances_batch: A batch of utterances of str type. states_batch: A batch of arbitrary typed states for each utterance. Returns: response: A batch of arbitrary typed skill inference results. confidence: A batch of float typed confidence levels for each of skill inference result. output_states_batch: A batch of arbitrary typed states for each utterance. """ user_ids, output_states_batch = self._handle_user_identification(utterances_batch, states_batch) ################################################################################# # RASA use asyncio for handling messages and handle_text is async function, # so we need to instantiate event loop # futures = [rasa_confident_response_decorator(self._agent, utt, sender_id=uid) for utt, uid in futures = [self.rasa_confident_response_decorator(self._agent, utt, sender_id=uid) for utt, uid in zip(utterances_batch, user_ids)] asyncio.set_event_loop(self.ioloop) results = self.ioloop.run_until_complete(asyncio.gather(*futures)) responses_batch, confidences_batch = zip(*results) return responses_batch, confidences_batch, output_states_batch
[docs] async def rasa_confident_response_decorator(self, rasa_agent, text_message, sender_id): """ Args: rasa_agent: rasa.core.agent.Agent instance text_message: str with utterance from user sender_id: id of the user Returns: None or tuple with str and float, where first element is a message and second is confidence """ resp = await self.rasa_handle_text_verbosely(rasa_agent, text_message, sender_id) if resp: responses, confidences, actions = resp else: logger.warning("Null response from RASA Skill") return None # for adaptation to deep pavlov arch we need to merge multi-messages into single string: texts = [each_resp['text'] for each_resp in responses if 'text' in each_resp] merged_message = "\n".join(texts) merged_confidence = reduce(lambda a, b: a * b, confidences) # TODO possibly it better to choose another function for calculation of final confidence # current realisation of confidence propagation may cause confidence decay for long actions # chains. If long chains is your case, try max(confidence) or confidence[0] return merged_message, merged_confidence
[docs] async def rasa_handle_text_verbosely(self, rasa_agent, text_message, sender_id): """ This function reimplements RASA's rasa.core.agent.Agent.handle_text method to allow to retrieve message responses with confidence estimation altogether. It reconstructs with merge RASA's methods: This required to allow RASA to output confidences with actions altogether (Out of the box RASA does not support such use case). Args: rasa_agent: rasa.core.agent.Agent instance text_message: str with utterance from user sender_id: id of the user Returns: None or tuple where first element is a list of messages dicts, the second element is a list of confidence scores for all actions (it is longer than messages list, because some actions does not produce messages) """ message = UserMessage(text_message, output_channel=None, sender_id=sender_id) processor = rasa_agent.create_processor() tracker = processor._get_tracker(message.sender_id) confidences = [] actions = [] await processor._handle_message_with_tracker(message, tracker) # save tracker state to continue conversation from this state processor._save_tracker(tracker) # here we restore some of logic in RASA management. # ###### Loop of IntraStep decisions ########################################################## # await processor._predict_and_execute_next_action(msg, tracker): # # keep taking actions decided by the policy until it chooses to 'listen' should_predict_another_action = True num_predicted_actions = 0 def is_action_limit_reached(): return (num_predicted_actions == processor.max_number_of_predictions and should_predict_another_action) # action loop. predicts actions until we hit action listen while (should_predict_another_action and processor._should_handle_message(tracker) and num_predicted_actions < processor.max_number_of_predictions): # this actually just calls the policy's method by the same name action, policy, confidence = processor.predict_next_action(tracker) confidences.append(confidence) actions.append(action) should_predict_another_action = await processor._run_action( action, tracker, message.output_channel, processor.nlg, policy, confidence ) num_predicted_actions += 1 if is_action_limit_reached(): # circuit breaker was tripped logger.warning( "Circuit breaker tripped. Stopped predicting " "more actions for sender '{}'".format(tracker.sender_id)) if processor.on_circuit_break: # call a registered callback processor.on_circuit_break(tracker, message.output_channel, processor.nlg) if isinstance(message.output_channel, CollectingOutputChannel): return message.output_channel.messages, confidences, actions else: return None
def _generate_user_id(self) -> str: """ Here you put user id generative logic if you want to implement it in the skill. Although it is better to delegate user_id generation to Agent Layer Returns: str """ return uuid.uuid1().hex def _handle_user_identification(self, utterances_batch, states_batch): """Method preprocesses states batch to guarantee that all users are identified (or identifiers are generated for all users). Args: utterances_batch: batch of utterances states_batch: batch of states Returns: """ # grasp user_ids from states batch. # We expect that skill receives None or dict of state for each utterance. # if state has user_id then skill uses it, otherwise it generates user_id and calls the # user with this name in further. # In this implementation we use current datetime for generating uniqe ids output_states_batch = [] user_ids = [] if states_batch is None: # generate states batch matching batch of utterances: states_batch = [None] * len(utterances_batch) for state in states_batch: if not state: user_id = self._generate_user_id() new_state = {'user_id': user_id} elif 'user_id' not in state: new_state = state user_id = self._generate_user_id() new_state['user_id'] = self._generate_user_id() else: new_state = state user_id = new_state['user_id'] user_ids.append(user_id) output_states_batch.append(new_state) return user_ids, output_states_batch def destroy(self): self.ioloop.close() super().destroy()