import uuid
import asyncio
import logging
from pathlib import Path
from typing import Tuple, Optional, List
from functools import reduce
from deeppavlov.core.common.registry import register
from deeppavlov.core.skill.skill import Skill
from rasa.core.agent import Agent
from rasa.core.channels import UserMessage
from rasa.core.channels import CollectingOutputChannel
from rasa.model import get_model
from rasa.cli.utils import get_validated_path
from rasa.constants import DEFAULT_MODELS_PATH
logger = logging.getLogger(__name__)
[docs]@register("rasa_skill")
class RASASkill(Skill):
"""RASASkill lets you to wrap RASA Agent as a Skill within DeepPavlov environment.
The component requires path to your RASA models (folder with timestamped tar.gz archieves)
as you use in command `rasa run -m models --enable-api --log-file out.log`
"""
def __init__(self, path_to_models: str, **kwargs) -> None:
"""
Constructs RASA Agent as a DeepPavlov skill:
read model folder,
initialize rasa.core.agent.Agent and wrap it's interfaces
Args:
path_to_models: string path to folder with RASA models
"""
# we need absolute path (expanded for user home and resolved if it relative path):
self.path_to_models = Path(path_to_models).expanduser().resolve()
model = get_validated_path(self.path_to_models, "model", DEFAULT_MODELS_PATH)
model_path = get_model(model)
if not model_path:
# can not laod model path
raise Exception("can not load model path: %s" % model)
self._agent = Agent.load(model_path)
self.ioloop = asyncio.new_event_loop()
logger.info(f"path to RASA models is: `{self.path_to_models}`")
def __call__(self, utterances_batch: List[str],
history_batch: Optional[List]=None,
states_batch: Optional[List]=None) -> Tuple[List[str], List[float], list]:
"""Returns skill inference result.
Returns batches of skill inference results, estimated confidence
levels and up to date states corresponding to incoming utterance
batch.
Args:
utterances_batch: A batch of utterances of str type.
history_batch: A batch of list typed histories for each utterance.
states_batch: A batch of arbitrary typed states for
each utterance.
Returns:
response: A batch of arbitrary typed skill inference results.
confidence: A batch of float typed confidence levels for each of
skill inference result.
output_states_batch: A batch of arbitrary typed states for
each utterance.
"""
user_ids, output_states_batch = self._handle_user_identification(utterances_batch, states_batch)
#################################################################################
# RASA use asyncio for handling messages and handle_text is async function,
# so we need to instantiate event loop
# futures = [rasa_confident_response_decorator(self._agent, utt, sender_id=uid) for utt, uid in
futures = [self.rasa_confident_response_decorator(self._agent, utt, sender_id=uid) for utt, uid in
zip(utterances_batch, user_ids)]
asyncio.set_event_loop(self.ioloop)
results = self.ioloop.run_until_complete(asyncio.gather(*futures))
responses_batch, confidences_batch = zip(*results)
return responses_batch, confidences_batch, output_states_batch
[docs] async def rasa_confident_response_decorator(self, rasa_agent, text_message, sender_id):
"""
Args:
rasa_agent: rasa.core.agent.Agent instance
text_message: str with utterance from user
sender_id: id of the user
Returns: None or tuple with str and float, where first element is a message and second is
confidence
"""
resp = await self.rasa_handle_text_verbosely(rasa_agent, text_message, sender_id)
if resp:
responses, confidences, actions = resp
else:
logger.warning("Null response from RASA Skill")
return None
# for adaptation to deep pavlov arch we need to merge multi-messages into single string:
texts = [each_resp['text'] for each_resp in responses if 'text' in each_resp]
merged_message = "\n".join(texts)
merged_confidence = reduce(lambda a, b: a * b, confidences)
# TODO possibly it better to choose another function for calculation of final confidence
# current realisation of confidence propagation may cause confidence decay for long actions
# chains. If long chains is your case, try max(confidence) or confidence[0]
return merged_message, merged_confidence
[docs] async def rasa_handle_text_verbosely(self, rasa_agent, text_message, sender_id):
"""
This function reimplements RASA's rasa.core.agent.Agent.handle_text method to allow to retrieve
message responses with confidence estimation altogether.
It reconstructs with merge RASA's methods:
https://github.com/RasaHQ/rasa_core/blob/master/rasa/core/agent.py#L401
https://github.com/RasaHQ/rasa_core/blob/master/rasa/core/agent.py#L308
https://github.com/RasaHQ/rasa/blob/master/rasa/core/processor.py#L327
This required to allow RASA to output confidences with actions altogether
(Out of the box RASA does not support such use case).
Args:
rasa_agent: rasa.core.agent.Agent instance
text_message: str with utterance from user
sender_id: id of the user
Returns: None or
tuple where first element is a list of messages dicts, the second element is a list
of confidence scores for all actions (it is longer than messages list, because some actions
does not produce messages)
"""
message = UserMessage(text_message,
output_channel=None,
sender_id=sender_id)
processor = rasa_agent.create_processor()
tracker = processor._get_tracker(message.sender_id)
confidences = []
actions = []
await processor._handle_message_with_tracker(message, tracker)
# save tracker state to continue conversation from this state
processor._save_tracker(tracker)
# here we restore some of logic in RASA management.
# ###### Loop of IntraStep decisions ##########################################################
# await processor._predict_and_execute_next_action(msg, tracker):
# https://github.com/RasaHQ/rasa/blob/master/rasa/core/processor.py#L327-L362
# keep taking actions decided by the policy until it chooses to 'listen'
should_predict_another_action = True
num_predicted_actions = 0
def is_action_limit_reached():
return (num_predicted_actions == processor.max_number_of_predictions and
should_predict_another_action)
# action loop. predicts actions until we hit action listen
while (should_predict_another_action and
processor._should_handle_message(tracker) and
num_predicted_actions < processor.max_number_of_predictions):
# this actually just calls the policy's method by the same name
action, policy, confidence = processor.predict_next_action(tracker)
confidences.append(confidence)
actions.append(action)
should_predict_another_action = await processor._run_action(
action,
tracker,
message.output_channel,
processor.nlg,
policy, confidence
)
num_predicted_actions += 1
if is_action_limit_reached():
# circuit breaker was tripped
logger.warning(
"Circuit breaker tripped. Stopped predicting "
"more actions for sender '{}'".format(tracker.sender_id))
if processor.on_circuit_break:
# call a registered callback
processor.on_circuit_break(tracker, message.output_channel, processor.nlg)
if isinstance(message.output_channel, CollectingOutputChannel):
return message.output_channel.messages, confidences, actions
else:
return None
def _generate_user_id(self) -> str:
"""
Here you put user id generative logic if you want to implement it in the skill.
Although it is better to delegate user_id generation to Agent Layer
Returns: str
"""
return uuid.uuid1().hex
def _handle_user_identification(self, utterances_batch, states_batch):
"""Method preprocesses states batch to guarantee that all users are identified (or
identifiers are generated for all users).
Args:
utterances_batch: batch of utterances
states_batch: batch of states
Returns:
"""
# grasp user_ids from states batch.
# We expect that skill receives None or dict of state for each utterance.
# if state has user_id then skill uses it, otherwise it generates user_id and calls the
# user with this name in further.
# In this implementation we use current datetime for generating uniqe ids
output_states_batch = []
user_ids = []
if states_batch is None:
# generate states batch matching batch of utterances:
states_batch = [None] * len(utterances_batch)
for state in states_batch:
if not state:
user_id = self._generate_user_id()
new_state = {'user_id': user_id}
elif 'user_id' not in state:
new_state = state
user_id = self._generate_user_id()
new_state['user_id'] = self._generate_user_id()
else:
new_state = state
user_id = new_state['user_id']
user_ids.append(user_id)
output_states_batch.append(new_state)
return user_ids, output_states_batch
def destroy(self):
self.ioloop.close()
super().destroy()