Skip to content

Commit

Permalink
Merge branch 'develop' into master-dev-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
YaphetKG committed Aug 3, 2023
2 parents c73bd8b + 9629d5e commit a0a02dd
Show file tree
Hide file tree
Showing 7 changed files with 872 additions and 29 deletions.
2 changes: 0 additions & 2 deletions bin/get_dbgap_data_dicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def download_dbgap_study(dbgap_accession_id, dbgap_output_dir):

# Step 2: Check to see if there's a GapExchange file in the parent folder
# and if there is, get it.

try:
ftp.cwd(study_id_path)
except error_temp as e:
Expand All @@ -84,7 +83,6 @@ def download_dbgap_study(dbgap_accession_id, dbgap_output_dir):
resp = ftp.cwd(study_id_path)
if resp[:1] == '2':
logging.info("command success")

ftp_filelist = ftp.nlst(".")
for ftp_filename in ftp_filelist:
if 'GapExchange' in ftp_filename:
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ services:
#################################################################################
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1
platform: "linux/amd64"
networks:
- dug-network
environment:
Expand All @@ -75,6 +76,7 @@ services:
##
#################################################################################
redis:
platform: "linux/amd64"
image: 'bitnami/redis:5.0.8'
networks:
- dug-network
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ six==1.16.0

# Click for command line arguments
# We use Click 7.0 because that's what one of the pinned packages above use.
click~=7.0
click~=7.0
httpx>=0.24.1
98 changes: 75 additions & 23 deletions src/dug/core/async_search.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
"""Implements search methods using async interfaces"""

import logging
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_scan
Expand Down Expand Up @@ -31,17 +32,24 @@ def __init__(self, cfg: Config, indices=None):
indices = ['concepts_index', 'variables_index', 'kg_index']

self._cfg = cfg
logger.debug(f"Connecting to elasticsearch host: {self._cfg.elastic_host} at port: {self._cfg.elastic_port}")
logger.debug(f"Connecting to elasticsearch host: "
f"{self._cfg.elastic_host} at port: "
f"{self._cfg.elastic_port}")

self.indices = indices
self.hosts = [{'host': self._cfg.elastic_host, 'port': self._cfg.elastic_port}]
self.hosts = [{'host': self._cfg.elastic_host,
'port': self._cfg.elastic_port}]

logger.debug(f"Authenticating as user {self._cfg.elastic_username} to host:{self.hosts}")
logger.debug(f"Authenticating as user "
f"{self._cfg.elastic_username} "
f"to host:{self.hosts}")

self.es = AsyncElasticsearch(hosts=self.hosts,
http_auth=(self._cfg.elastic_username, self._cfg.elastic_password))
http_auth=(self._cfg.elastic_username,
self._cfg.elastic_password))

async def dump_concepts(self, index, query={}, size=None, fuzziness=1, prefix_length=3):
async def dump_concepts(self, index, query={}, size=None,
fuzziness=1, prefix_length=3):
"""
Get everything from concept index
"""
Expand Down Expand Up @@ -87,15 +95,15 @@ async def agg_data_type(self):
index="variables_index",
body=body
)
data_type_list = [data_type['key'] for data_type in results['aggregations']['data_type']['buckets']]
data_type_list = [data_type['key'] for data_type in
results['aggregations']['data_type']['buckets']]
results.update({'data type list': data_type_list})
return data_type_list

async def search_concepts(self, query, offset=0, size=None, fuzziness=1, prefix_length=3):
"""
Changed to a long boolean match query to optimize search results
"""
query = {
@staticmethod
def _build_concepts_query(query, fuzziness=1, prefix_length=3):
"Static data structure populator, pulled for easier testing"
query_object = {
"bool": {
"filter": {
"bool": {
Expand Down Expand Up @@ -196,20 +204,56 @@ async def search_concepts(self, query, offset=0, size=None, fuzziness=1, prefix_
"minimum_should_match": 1,
}
}
body = json.dumps({'query': query})
total_items = await self.es.count(body=body, index="concepts_index")
return query_object

async def search_concepts(self, query, offset=0, size=None, types=None,
**kwargs):
"""
Changed to a long boolean match query to optimize search results
"""
query_dict = self._build_concepts_query(query, **kwargs)
total_items = await self.es.count(
body={"query": query_dict},
index="concepts_index")
# Get aggregated counts of biolink types
search_body = {"query": query_dict}
search_body['aggs'] = {'type-count': {'terms': {'field': 'type'}}}
# Add post_filter on types
if types:
assert isinstance(types, list)
search_body['post_filter'] = {
"bool": {
"should": [
{'term': {'type': {'value': t}}} for t in types
],
"minimum_should_match": 1
}
}
search_results = await self.es.search(
index="concepts_index",
body=body,
filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source', 'hits.hits._score', 'hits.hits._explanation'],
body=search_body,
filter_path=['hits.hits._id', 'hits.hits._type',
'hits.hits._source', 'hits.hits._score',
'hits.hits._explanation', 'aggregations'],
from_=offset,
size=size,
explain=True
)

# Simplify the data structure we get from aggregations to put into the
# return value. This should be a count of documents hit for every type
# in the search results.
aggregations = search_results.pop('aggregations')
concept_types = {
bucket['key']: bucket['doc_count'] for bucket in
aggregations['type-count']['buckets']
}
search_results.update({'total_items': total_items['count']})
search_results['concept_types'] = concept_types
return search_results

async def search_variables(self, concept="", query="", size=None, data_type=None, offset=0, fuzziness=1,
async def search_variables(self, concept="", query="", size=None,
data_type=None, offset=0, fuzziness=1,
prefix_length=3, index=None):
"""
In variable search, the concept MUST match one of the identifiers in the list
Expand Down Expand Up @@ -343,7 +387,8 @@ async def search_variables(self, concept="", query="", size=None, data_type=None
search_results = await self.es.search(
index="variables_index",
body=body,
filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source', 'hits.hits._score'],
filter_path=['hits.hits._id', 'hits.hits._type',
'hits.hits._source', 'hits.hits._score'],
from_=offset,
size=size
)
Expand Down Expand Up @@ -383,7 +428,8 @@ async def search_variables(self, concept="", query="", size=None, data_type=None
# save document
new_results[elem_type][coll_id] = doc

# Case: collection already in dictionary for given element_type; append elem_info. Assumes no duplicate
# Case: collection already in dictionary for given
# element_type; append elem_info. Assumes no duplicate
# elements
else:
new_results[elem_type][coll_id]['elements'].append(elem_info)
Expand All @@ -400,7 +446,9 @@ async def search_variables(self, concept="", query="", size=None, data_type=None
new_results = {}
return new_results

async def search_vars_unscored(self, concept="", query="", size=None, data_type=None, offset=0, fuzziness=1,
async def search_vars_unscored(self, concept="", query="",
size=None, data_type=None,
offset=0, fuzziness=1,
prefix_length=3):
"""
In variable search, the concept MUST match one of the identifiers in the list
Expand Down Expand Up @@ -570,7 +618,9 @@ async def search_vars_unscored(self, concept="", query="", size=None, data_type=
# save document
new_results[elem_type][coll_id] = doc

# Case: collection already in dictionary for given element_type; append elem_info. Assumes no duplicate elements
# Case: collection already in dictionary for given
# element_type; append elem_info. Assumes no duplicate
# elements
else:
new_results[elem_type][coll_id]['elements'].append(elem_info)

Expand All @@ -586,7 +636,8 @@ async def search_vars_unscored(self, concept="", query="", size=None, data_type=
new_results = {}
return new_results

async def search_kg(self, unique_id, query, offset=0, size=None, fuzziness=1, prefix_length=3):
async def search_kg(self, unique_id, query, offset=0, size=None,
fuzziness=1, prefix_length=3):
"""
In knowledge graph search the concept MUST match the unique ID
The query MUST match search_targets. The updated query allows for
Expand Down Expand Up @@ -614,7 +665,8 @@ async def search_kg(self, unique_id, query, offset=0, size=None, fuzziness=1, pr
search_results = await self.es.search(
index="kg_index",
body=body,
filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'],
filter_path=['hits.hits._id', 'hits.hits._type',
'hits.hits._source'],
from_=offset,
size=size
)
Expand Down
5 changes: 2 additions & 3 deletions src/dug/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SearchConceptQuery(BaseModel):
index: str = "concepts_index"
offset: int = 0
size: int = 20
types: list = None

class SearchVariablesQuery(BaseModel):
query: str
Expand All @@ -40,10 +41,8 @@ class SearchKgQuery(BaseModel):
index: str = "kg_index"
size:int = 100


search = Search(Config.from_env())


@APP.on_event("shutdown")
def shutdown_event():
asyncio.run(search.es.close())
Expand Down Expand Up @@ -101,4 +100,4 @@ async def search_var(search_query: SearchVariablesQuery):


if __name__ == '__main__':
uvicorn.run(APP)
uvicorn.run(APP)
34 changes: 34 additions & 0 deletions tests/integration/test_async_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"Integration tests for the async_search module"

import asyncio
from unittest import TestCase

from fastapi.testclient import TestClient
from elasticsearch.exceptions import ConnectionError
class APISearchTestCase(TestCase):
"API search with mocked elasticsearch"

def test_concepts_types_parameter(self):
"Test API concepts search with types parameter"
# This should patch the elasticsearch object with the mock
from dug.server import APP
client = TestClient(APP)
types = ['anatomical entity', 'drug']
body = {
"index": "concepts_index",
"query": "brain",
"offset": 0,
"size":20,
"types": types
}
try:
response = client.post("/search", json=body)
except ConnectionError:
self.fail("For the integration test, a populated elasticsearch "
"instance must be available and configured in the "
"environment variables. See dug.config for more.")
self.assertEqual(response.status_code, 200)
response_obj = response.json()
response_types = set(hit['_source']['type'] for hit in
response_obj['result']['hits']['hits'])
self.assertEqual(response_types, set(types))
Loading

0 comments on commit a0a02dd

Please sign in to comment.