From e6780097072af141a1365d262442f11b88324b82 Mon Sep 17 00:00:00 2001 From: Jonatas Grosman Date: Thu, 1 Feb 2024 12:06:53 -0300 Subject: [PATCH] [feat] add some query utils --- findpapers/searchers/rxiv_searcher.py | 14 +-- findpapers/utils/query_util.py | 150 +++++++++++++++++++++++++- tests/unit/test_query_util.py | 82 ++++++++++++++ 3 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 tests/unit/test_query_util.py diff --git a/findpapers/searchers/rxiv_searcher.py b/findpapers/searchers/rxiv_searcher.py index 7fc57ef..1118907 100644 --- a/findpapers/searchers/rxiv_searcher.py +++ b/findpapers/searchers/rxiv_searcher.py @@ -44,15 +44,11 @@ def _get_search_urls(search: Search, database: str) -> List[str]: raise ValueError('NOT connectors aren\'t supported') # Parentheses are used for URL splitting purposes and only 1-level grouping is supported with an OR connector between the groups - current_level = 0 - for character in search.query: - if character == '(': - current_level += 1 - elif character == ')': - current_level -= 1 - - if current_level > 1: - raise ValueError('Max 1-level parentheses grouping exceeded') + + max_group_level = query_util.get_max_group_level(search.query) + + if max_group_level > 1: + raise ValueError('Max 1-level parentheses grouping exceeded') if ') AND (' in search.query: raise ValueError('Only the OR connector can be used between the groups') diff --git a/findpapers/utils/query_util.py b/findpapers/utils/query_util.py index dd14f32..6ae95f8 100644 --- a/findpapers/utils/query_util.py +++ b/findpapers/utils/query_util.py @@ -75,4 +75,152 @@ def apply_on_each_term(query: str, function: Callable) -> str: else: final_query += character - return final_query \ No newline at end of file + return final_query + + +def get_max_group_level(query: str) -> int: + """ + Get the max group level of a query + + Parameters + ---------- + query : str + A search query + + Returns + ------- + int + The max group level of the query + """ + + current_level = 0 + max_level = 0 + for character in query: + if character == '(': + current_level += 1 + if current_level > max_level: + max_level = current_level + elif character == ')': + current_level -= 1 + + return max_level + + +def get_query_tree(query: str, parent: dict = None) -> dict: + """ + Get the tree of a query + + Given the following query: + [term A] OR [term B] AND ([term C] OR [term D] OR [term E] OR ([term F] AND [term G] AND NOT [term H])) AND NOT [term I] + + The following tree will be returned: + {"node_type": "root", "children" : [ + {"node_type": "term", "value": "term A"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term B"}, + {"node_type": "connector", "value": "AND"}, + {"node_type": "group", "children": [ + {"node_type": "term", "value": "term C"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term D"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term E"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "group", "children": [ + {"node_type": "term", "value": "term F"}, + {"node_type": "connector", "value": "AND"}, + {"node_type": "term", "value": "term G"}, + {"node_type": "connector", "value": "AND NOT"}, + {"node_type": "term", "value": "term H"}, + ]} + ]}, + {"node_type": "connector", "value": "AND NOT"}, + {"node_type": "term", "value": "term I"} + ]} + + Parameters + ---------- + query : str + A search query + parent : dict, optional + The parent node, by default None + + Returns + ------- + dict + The query tree + + """ + + if parent is None: + parent = {"node_type": "root", "children": []} + + query_iterator = iter(query) + current_character = next(query_iterator, None) + current_connector = None + + while current_character is not None: + + if current_character == '(': # is a beginning of a group + + if current_connector is not None: + parent["children"].append({"node_type": "connector", "value": current_connector.strip()}) + current_connector = None + + subquery = "" + subquery_group_level = 1 + + while True: + + current_character = next(query_iterator, None) + + if current_character is None: + raise ValueError('Unbalanced parentheses') + + if current_character == '(': # has a nested group + subquery_group_level += 1 + + elif current_character == ')': + subquery_group_level -= 1 + if subquery_group_level == 0: # end of the group + break + + subquery += current_character + + group_node = {"node_type": "group", "children": []} + parent["children"].append(group_node) + + get_query_tree(subquery, group_node) + + elif current_character == "[": # is a beginning of a term + + if current_connector is not None: + parent["children"].append({"node_type": "connector", "value": current_connector.strip()}) + current_connector = None + + term_query = "" + + while True: + + current_character = next(query_iterator, None) + + if current_character is None: + raise ValueError('Missing term closing bracket') + + if current_character == ']': + break + + term_query += current_character + + parent["children"].append({"node_type": "term", "value": term_query}) + + else: # is a connector + + if current_connector is None: + current_connector = "" + + current_connector += current_character + + current_character = next(query_iterator, None) + + return parent diff --git a/tests/unit/test_query_util.py b/tests/unit/test_query_util.py new file mode 100644 index 0000000..b2436be --- /dev/null +++ b/tests/unit/test_query_util.py @@ -0,0 +1,82 @@ +import pytest +import findpapers.utils.query_util as query_util + + +@pytest.mark.parametrize('query, tree', [ + ("[term A] OR [term B]", + {"node_type": "root", "children" : [ + {"node_type": "term", "value": "term A"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term B"}, + ]} + ), + ("[term A] AND [term B]", + {"node_type": "root", "children" : [ + {"node_type": "term", "value": "term A"}, + {"node_type": "connector", "value": "AND"}, + {"node_type": "term", "value": "term B"}, + ]} + ), + ("[term A] AND NOT [term B]", + {"node_type": "root", "children" : [ + {"node_type": "term", "value": "term A"}, + {"node_type": "connector", "value": "AND NOT"}, + {"node_type": "term", "value": "term B"}, + ]} + ), + ("[term A] AND NOT [term B] OR ([term C] AND [term D])", + {"node_type": "root", "children" : [ + {"node_type": "term", "value": "term A"}, + {"node_type": "connector", "value": "AND NOT"}, + {"node_type": "term", "value": "term B"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "group", "children": [ + {"node_type": "term", "value": "term C"}, + {"node_type": "connector", "value": "AND"}, + {"node_type": "term", "value": "term D"}, + ]} + ]} + ), + ("[term A] OR [term B] AND ([term C] OR [term D] OR [term E] OR ([term F] AND [term G] AND NOT [term H])) AND NOT [term I]", + {"node_type": "root", "children" : [ + {"node_type": "term", "value": "term A"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term B"}, + {"node_type": "connector", "value": "AND"}, + {"node_type": "group", "children": [ + {"node_type": "term", "value": "term C"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term D"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "term", "value": "term E"}, + {"node_type": "connector", "value": "OR"}, + {"node_type": "group", "children": [ + {"node_type": "term", "value": "term F"}, + {"node_type": "connector", "value": "AND"}, + {"node_type": "term", "value": "term G"}, + {"node_type": "connector", "value": "AND NOT"}, + {"node_type": "term", "value": "term H"}, + ]} + ]}, + {"node_type": "connector", "value": "AND NOT"}, + {"node_type": "term", "value": "term I"} + ]} + ), +]) +def test_get_query_tree(query: str, tree: dict): + + query_tree = query_util.get_query_tree(query) + + assert query_tree == tree + +@pytest.mark.parametrize('query, max_group_level', [ + ("[term A] OR [term B] OR [term C] OR [term D]", 0), + ("[term A] OR [term B] AND ([term C] OR [term D])", 1), + ("[term A] OR [term B] AND ([term C] OR [term D] OR [term E] OR ([term F] AND [term G] AND NOT [term H])) AND NOT [term I]", 2), + ("[term A] OR [term B] AND ([term C] OR [term D] AND ([term E] OR [term F] OR ([term G] AND [term H]))) AND term I", 3) +]) +def test_get_max_group_level(query: str, max_group_level: int): + + query_max_group_level = query_util.get_max_group_level(query) + + assert query_max_group_level == max_group_level \ No newline at end of file