# Copyright 2017 Neural Networks and Deep Learning lab, MIPT
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from logging import getLogger
from typing import Tuple, List, Optional, Union, Any, Set

from bs4 import BeautifulSoup
from whapi import search, get_html

from deeppavlov.core.commands.utils import expand_path
from deeppavlov.core.common.file import read_json
from deeppavlov.core.models.component import Component
from deeppavlov.core.models.serializable import Serializable
from deeppavlov.models.entity_extraction.entity_linking import EntityLinker
from deeppavlov.models.kbqa.rel_ranking_infer import RelRankerInfer
from deeppavlov.models.kbqa.template_matcher import TemplateMatcher

log = getLogger(__name__)

[docs]class QueryGeneratorBase(Component, Serializable): """ This class takes as input entity substrings, defines the template of the query and fills the slots of the template with candidate entities and relations. """
[docs] def __init__(self, template_matcher: TemplateMatcher, entity_linker: EntityLinker, rel_ranker: RelRankerInfer, load_path: str, rank_rels_filename_1: str, rank_rels_filename_2: str, sparql_queries_filename: str, wiki_parser=None, entities_to_leave: int = 5, rels_to_leave: int = 7, syntax_structure_known: bool = False, use_wp_api_requester: bool = False, use_el_api_requester: bool = False, use_alt_templates: bool = True, use_add_templates: bool = False, *args, **kwargs) -> None: """ Args: template_matcher: component deeppavlov.models.kbqa.template_matcher entity_linker: component deeppavlov.models.entity_extraction.entity_linking for linking of entities rel_ranker: component deeppavlov.models.kbqa.rel_ranking_infer load_path: path to folder with wikidata files rank_rels_filename_1: file with list of rels for first rels in questions with ranking rank_rels_filename_2: file with list of rels for second rels in questions with ranking sparql_queries_filename: file with sparql query templates wiki_parser: component deeppavlov.models.kbqa.wiki_parser entities_to_leave: how many entities to leave after entity linking rels_to_leave: how many relations to leave after relation ranking syntax_structure_known: if syntax tree parser was used to define query template type use_wp_api_requester: whether deeppavlov.models.api_requester.api_requester component will be used for Wiki Parser use_el_api_requester: whether deeppavlov.models.api_requester.api_requester component will be used for Entity Linking use_alt_templates: whether to use alternative templates if no answer was found for default query template """ super().__init__(save_path=None, load_path=load_path) self.template_matcher = template_matcher self.entity_linker = entity_linker self.wiki_parser = wiki_parser self.rel_ranker = rel_ranker self.rank_rels_filename_1 = rank_rels_filename_1 self.rank_rels_filename_2 = rank_rels_filename_2 self.rank_list_0 = [] self.rank_list_1 = [] self.entities_to_leave = entities_to_leave self.rels_to_leave = rels_to_leave self.syntax_structure_known = syntax_structure_known self.use_wp_api_requester = use_wp_api_requester self.use_el_api_requester = use_el_api_requester self.use_alt_templates = use_alt_templates self.use_add_templates = use_add_templates self.sparql_queries_filename = sparql_queries_filename self.load()
def load(self) -> None: with open(self.load_path / self.rank_rels_filename_1, 'r') as fl1: lines = fl1.readlines() self.rank_list_0 = [line.split('\t')[0] for line in lines] with open(self.load_path / self.rank_rels_filename_2, 'r') as fl2: lines = fl2.readlines() self.rank_list_1 = [line.split('\t')[0] for line in lines] self.template_queries = read_json(str(expand_path(self.sparql_queries_filename))) def save(self) -> None: pass def find_candidate_answers(self, question: str, question_sanitized: str, template_types: Union[List[str], str], entities_from_ner: List[str], entity_tags: List[str], answer_types: Set[str]) -> Tuple[Union[Union[List[List[Union[str, float]]], List[Any]], Any], Union[str, Any], Union[List[Any], Any]]: candidate_outputs = [] self.template_nums = template_types replace_tokens = [(' - ', '-'), (' .', ''), ('{', ''), ('}', ''), (' ', ' '), ('"', "'"), ('(', ''), (')', ''), ('–', '-')] for old, new in replace_tokens: question = question.replace(old, new) entities_from_template, types_from_template, rels_from_template, rel_dirs_from_template, query_type_template, \ entity_types, template_answer, answer_types, template_found = self.template_matcher(question_sanitized, entities_from_ner) self.template_nums = [query_type_template] templates_nums = [] log.debug( f"question: {question} entities_from_template {entities_from_template} template_type {self.template_nums} " f"types from template {types_from_template} rels_from_template {rels_from_template}") if entities_from_template or types_from_template: if rels_from_template[0][0] == "PHOW": how_to_content = self.find_answer_wikihow(entities_from_template[0]) candidate_outputs = [["PHOW", how_to_content, 1.0]] else: entity_ids = self.get_entity_ids(entities_from_template, entity_tags, question) log.debug(f"entities_from_template {entities_from_template}") log.debug(f"entity_types {entity_types}") log.debug(f"types_from_template {types_from_template}") log.debug(f"rels_from_template {rels_from_template}") log.debug(f"entity_ids {entity_ids}") candidate_outputs, templates_nums = \ self.sparql_template_parser(question_sanitized, entity_ids, [], answer_types, rels_from_template, rel_dirs_from_template) if not candidate_outputs and entities_from_ner: log.debug(f"(__call__)entities_from_ner: {entities_from_ner}") entity_ids = self.get_entity_ids(entities_from_ner, entity_tags, question) log.debug(f"(__call__)entity_ids: {entity_ids}") self.template_nums = template_types log.debug(f"(__call__)self.template_nums: {self.template_nums}") if not self.syntax_structure_known: entity_ids = entity_ids[:3] candidate_outputs, templates_nums = self.sparql_template_parser(question_sanitized, entity_ids, [], answer_types) return candidate_outputs, template_answer, templates_nums def get_entity_ids(self, entities: List[str], tags: List[str], question: str) -> List[List[str]]: entity_ids = [] el_output = [] try: el_output = self.entity_linker([entities], [tags], [[question]], [None], [None]) except json.decoder.JSONDecodeError: log.warning("not received output from entity linking") if el_output: if self.use_el_api_requester: el_output = el_output[0] if el_output: if isinstance(el_output[0], dict): entity_ids = [entity_info.get("entity_ids", []) for entity_info in el_output] if isinstance(el_output[0], list): entity_ids, *_ = el_output if not self.use_el_api_requester and entity_ids: entity_ids = entity_ids[0] return entity_ids def sparql_template_parser(self, question: str, entity_ids: List[List[str]], type_ids: List[List[str]], answer_types: List[str], rels_from_template: Optional[List[Tuple[str]]] = None, rel_dirs_from_template: Optional[List[str]] = None) -> Tuple[Union[None, List[Any]], List[Any]]: candidate_outputs = [] log.debug(f"use alternative templates {self.use_alt_templates}") log.debug(f"(find_candidate_answers)self.template_nums: {self.template_nums}") templates = [] templates_nums = [] for template_num in self.template_nums: for num, template in self.template_queries.items(): if (num == template_num and self.syntax_structure_known) or \ (template["template_num"] == template_num and not self.syntax_structure_known): templates.append(template) templates_nums.append(num) new_templates = [] new_templates_nums = [] for template, template_num in zip(templates, templates_nums): if (not self.syntax_structure_known and [len(entity_ids), len(type_ids)] == template[ "entities_and_types_num"]) or self.syntax_structure_known: new_templates.append(template) new_templates_nums.append(template_num) templates = new_templates templates_nums = new_templates_nums templates_string = '\n'.join([template["query_template"] for template in templates]) log.debug(f"{templates_string}") if not templates: return candidate_outputs, [] if rels_from_template is not None: query_template = {} for template in templates: if template["rel_dirs"] == rel_dirs_from_template: query_template = template if query_template: entities_and_types_select = query_template["entities_and_types_select"] candidate_outputs = self.query_parser(question, query_template, entities_and_types_select, entity_ids, type_ids, answer_types, rels_from_template) else: for template in templates: entities_and_types_select = template["entities_and_types_select"] candidate_outputs = self.query_parser(question, template, entities_and_types_select, entity_ids, type_ids, answer_types, rels_from_template) if self.use_add_templates: additional_templates = template.get("additional_templates", []) templates_nums += additional_templates for add_template_num in additional_templates: candidate_outputs += self.query_parser(question, self.template_queries[add_template_num], entities_and_types_select, entity_ids, type_ids, answer_types, rels_from_template) if candidate_outputs: templates_nums = list(set(templates_nums)) return candidate_outputs, templates_nums if not candidate_outputs and self.use_alt_templates: alternative_templates = templates[0]["alternative_templates"] for template_num, entities_and_types_select in alternative_templates: candidate_outputs = self.query_parser(question, self.template_queries[template_num], entities_and_types_select, entity_ids, type_ids, answer_types, rels_from_template) templates_nums.append(template_num) if candidate_outputs: templates_nums = list(set(templates_nums)) return candidate_outputs, templates_nums log.debug("candidate_rels_and_answers:\n" + '\n'.join([str(output) for output in candidate_outputs[:5]])) templates_nums = list(set(templates_nums)) return candidate_outputs, templates_nums def find_top_rels(self, question: str, entity_ids: List[List[str]], triplet_info: Tuple) -> List[Tuple[str, Any]]: ex_rels = [] direction, source, rel_type = triplet_info if source == "wiki": queries_list = list({(entity, direction, rel_type) for entity_id in entity_ids for entity in entity_id[:self.entities_to_leave]}) parser_info_list = ["find_rels" for i in range(len(queries_list))] try: ex_rels = self.wiki_parser(parser_info_list, queries_list) except json.decoder.JSONDecodeError: log.warning("find_top_rels, not received output from wiki parser") if self.use_wp_api_requester and ex_rels: ex_rels = [rel[0] for rel in ex_rels] ex_rels = list(set(ex_rels)) ex_rels = [rel.split('/')[-1] for rel in ex_rels] elif source == "rank_list_1": ex_rels = self.rank_list_0 elif source == "rank_list_2": ex_rels = self.rank_list_1 rels_with_scores = [] ex_rels = [rel for rel in ex_rels if rel.startswith("P")] if ex_rels: rels_with_scores = self.rel_ranker.rank_rels(question, ex_rels) return rels_with_scores[:self.rels_to_leave] def find_answer_wikihow(self, howto_sentence: str) -> str: tags = [] search_results = search(howto_sentence, 5) if search_results: article_id = search_results[0]["article_id"] html = get_html(article_id) page = BeautifulSoup(html, 'lxml') tags = list(page.find_all(['p'])) if tags: howto_content = f"{tags[0].text.strip()}@en" else: howto_content = "Not Found" return howto_content def query_parser(self, question, query_template, entities_and_types_select, entity_ids, type_ids, answer_types, rels_from_template): raise NotImplementedError