Source code for deeppavlov.skills.rasa_skill.rasa_skill

import uuid
import asyncio
import logging
from pathlib import Path
from typing import Tuple, Optional, List
from functools import reduce

from deeppavlov.core.common.registry import register
from deeppavlov.core.skill.skill import Skill

from rasa.core.agent import Agent
from rasa.core.channels import UserMessage
from rasa.core.channels import CollectingOutputChannel
from rasa.model import get_model
from rasa.cli.utils import get_validated_path
from rasa.constants import DEFAULT_MODELS_PATH

logger = logging.getLogger(__name__)


[docs]@register("rasa_skill") class RASASkill(Skill): """RASASkill lets you to wrap RASA Agent as a Skill within DeepPavlov environment. The component requires path to your RASA models (folder with timestamped tar.gz archieves) as you use in command `rasa run -m models --enable-api --log-file out.log` """ def __init__(self, path_to_models: str, **kwargs) -> None: """ Constructs RASA Agent as a DeepPavlov skill: read model folder, initialize rasa.core.agent.Agent and wrap it's interfaces Args: path_to_models: string path to folder with RASA models """ # we need absolute path (expanded for user home and resolved if it relative path): self.path_to_models = Path(path_to_models).expanduser().resolve() model = get_validated_path(self.path_to_models, "model", DEFAULT_MODELS_PATH) model_path = get_model(model) if not model_path: # can not laod model path raise Exception("can not load model path: %s" % model) self._agent = Agent.load(model_path) self.ioloop = asyncio.new_event_loop() logger.info(f"path to RASA models is: `{self.path_to_models}`") def __call__(self, utterances_batch: List[str], history_batch: Optional[List]=None, states_batch: Optional[List]=None) -> Tuple[List[str], List[float], list]: """Returns skill inference result. Returns batches of skill inference results, estimated confidence levels and up to date states corresponding to incoming utterance batch. Args: utterances_batch: A batch of utterances of str type. history_batch: A batch of list typed histories for each utterance. states_batch: A batch of arbitrary typed states for each utterance. Returns: response: A batch of arbitrary typed skill inference results. confidence: A batch of float typed confidence levels for each of skill inference result. output_states_batch: A batch of arbitrary typed states for each utterance. """ user_ids, output_states_batch = self._handle_user_identification(utterances_batch, states_batch) ################################################################################# # RASA use asyncio for handling messages and handle_text is async function, # so we need to instantiate event loop # futures = [rasa_confident_response_decorator(self._agent, utt, sender_id=uid) for utt, uid in futures = [self.rasa_confident_response_decorator(self._agent, utt, sender_id=uid) for utt, uid in zip(utterances_batch, user_ids)] asyncio.set_event_loop(self.ioloop) results = self.ioloop.run_until_complete(asyncio.gather(*futures)) responses_batch, confidences_batch = zip(*results) return responses_batch, confidences_batch, output_states_batch
[docs] async def rasa_confident_response_decorator(self, rasa_agent, text_message, sender_id): """ Args: rasa_agent: rasa.core.agent.Agent instance text_message: str with utterance from user sender_id: id of the user Returns: None or tuple with str and float, where first element is a message and second is confidence """ resp = await self.rasa_handle_text_verbosely(rasa_agent, text_message, sender_id) if resp: responses, confidences, actions = resp else: logger.warning("Null response from RASA Skill") return None # for adaptation to deep pavlov arch we need to merge multi-messages into single string: texts = [each_resp['text'] for each_resp in responses if 'text' in each_resp] merged_message = "\n".join(texts) merged_confidence = reduce(lambda a, b: a * b, confidences) # TODO possibly it better to choose another function for calculation of final confidence # current realisation of confidence propagation may cause confidence decay for long actions # chains. If long chains is your case, try max(confidence) or confidence[0] return merged_message, merged_confidence
[docs] async def rasa_handle_text_verbosely(self, rasa_agent, text_message, sender_id): """ This function reimplements RASA's rasa.core.agent.Agent.handle_text method to allow to retrieve message responses with confidence estimation altogether. It reconstructs with merge RASA's methods: https://github.com/RasaHQ/rasa_core/blob/master/rasa/core/agent.py#L401 https://github.com/RasaHQ/rasa_core/blob/master/rasa/core/agent.py#L308 https://github.com/RasaHQ/rasa/blob/master/rasa/core/processor.py#L327 This required to allow RASA to output confidences with actions altogether (Out of the box RASA does not support such use case). Args: rasa_agent: rasa.core.agent.Agent instance text_message: str with utterance from user sender_id: id of the user Returns: None or tuple where first element is a list of messages dicts, the second element is a list of confidence scores for all actions (it is longer than messages list, because some actions does not produce messages) """ message = UserMessage(text_message, output_channel=None, sender_id=sender_id) processor = rasa_agent.create_processor() tracker = processor._get_tracker(message.sender_id) confidences = [] actions = [] await processor._handle_message_with_tracker(message, tracker) # save tracker state to continue conversation from this state processor._save_tracker(tracker) # here we restore some of logic in RASA management. # ###### Loop of IntraStep decisions ########################################################## # await processor._predict_and_execute_next_action(msg, tracker): # https://github.com/RasaHQ/rasa/blob/master/rasa/core/processor.py#L327-L362 # keep taking actions decided by the policy until it chooses to 'listen' should_predict_another_action = True num_predicted_actions = 0 def is_action_limit_reached(): return (num_predicted_actions == processor.max_number_of_predictions and should_predict_another_action) # action loop. predicts actions until we hit action listen while (should_predict_another_action and processor._should_handle_message(tracker) and num_predicted_actions < processor.max_number_of_predictions): # this actually just calls the policy's method by the same name action, policy, confidence = processor.predict_next_action(tracker) confidences.append(confidence) actions.append(action) should_predict_another_action = await processor._run_action( action, tracker, message.output_channel, processor.nlg, policy, confidence ) num_predicted_actions += 1 if is_action_limit_reached(): # circuit breaker was tripped logger.warning( "Circuit breaker tripped. Stopped predicting " "more actions for sender '{}'".format(tracker.sender_id)) if processor.on_circuit_break: # call a registered callback processor.on_circuit_break(tracker, message.output_channel, processor.nlg) if isinstance(message.output_channel, CollectingOutputChannel): return message.output_channel.messages, confidences, actions else: return None
def _generate_user_id(self) -> str: """ Here you put user id generative logic if you want to implement it in the skill. Although it is better to delegate user_id generation to Agent Layer Returns: str """ return uuid.uuid1().hex def _handle_user_identification(self, utterances_batch, states_batch): """Method preprocesses states batch to guarantee that all users are identified (or identifiers are generated for all users). Args: utterances_batch: batch of utterances states_batch: batch of states Returns: """ # grasp user_ids from states batch. # We expect that skill receives None or dict of state for each utterance. # if state has user_id then skill uses it, otherwise it generates user_id and calls the # user with this name in further. # In this implementation we use current datetime for generating uniqe ids output_states_batch = [] user_ids = [] if states_batch is None: # generate states batch matching batch of utterances: states_batch = [None] * len(utterances_batch) for state in states_batch: if not state: user_id = self._generate_user_id() new_state = {'user_id': user_id} elif 'user_id' not in state: new_state = state user_id = self._generate_user_id() new_state['user_id'] = self._generate_user_id() else: new_state = state user_id = new_state['user_id'] user_ids.append(user_id) output_states_batch.append(new_state) return user_ids, output_states_batch def destroy(self): self.ioloop.close() super().destroy()