Skip to content

Commit

Permalink
Merge pull request #348 from EGA-archive/develop
Browse files Browse the repository at this point in the history
Runing results by dataset in parallel tasks
  • Loading branch information
costero-e authored Jul 6, 2024
2 parents b50b1ac + 7dbd6fb commit 84c6edb
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 37 deletions.
7 changes: 6 additions & 1 deletion beacon/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def main(path=None):
#)

beacon = web.Application(
middlewares=[web.normalize_path_middleware(), middlewares.error_middleware, cors_middleware(origins=["https://beacon-network-test.ega-archive.org", "https://beacon-network-test2.ega-archive.org", "https://beacon-network-demo.ega-archive.org","https://beacon-network-demo2.ega-archive.org", "http://localhost:3000", "http://localhost:3010", "https://beacon-network-cineca-demo.ega-archive.org", "https://cancer-beacon-demo.ega-archive.org"])]
middlewares=[web.normalize_path_middleware(), middlewares.error_middleware, cors_middleware(origins=["https://beacon-network-test.ega-archive.org", "https://beacon-network-test2.ega-archive.org", "https://beacon-network-demo.ega-archive.org","https://beacon-network-demo2.ega-archive.org", "http://localhost:3000", "http://localhost:3010", "https://beacon-network-cineca-demo.ega-archive.org", "https://beacon.ega-archive.org", "https://cancer-beacon-demo.ega-archive.org"])]
)


Expand Down Expand Up @@ -116,6 +116,11 @@ async def main(path=None):
expose_headers="*",
allow_methods=("POST", "PATCH", "GET", "OPTIONS"),
allow_headers=DEFAULT_ALLOW_HEADERS),
"https://beacon.ega-archive.org":
aiohttp_cors.ResourceOptions(allow_credentials=True,
expose_headers="*",
allow_methods=("POST", "PATCH", "GET", "OPTIONS"),
allow_headers=DEFAULT_ALLOW_HEADERS),
"http://localhost:3010":
aiohttp_cors.ResourceOptions(allow_credentials=True,
expose_headers="*",
Expand Down
2 changes: 1 addition & 1 deletion beacon/api_version.yml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
api_version: v2.0-b8fa53a
api_version: v2.0-b50b1ac
2 changes: 1 addition & 1 deletion beacon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@
#
ontologies_folder = "ontologies"

alphanumeric_terms = ['libraryStrategy', 'molecularAttributes.geneIds', 'diseases.ageOfOnset.iso8601duration']
alphanumeric_terms = ['libraryStrategy', 'molecularAttributes.geneIds', 'diseases.ageOfOnset.iso8601duration', 'molecularAttributes.aminoacidChanges']

ontology_files={"NCIT": "http://purl.obolibrary.org/obo/NCIT.obo"}
6 changes: 3 additions & 3 deletions beacon/db/analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_analyses(entry_id: Optional[str], qparams: RequestParams, dataset: str):
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_analysis_with_id(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'analyses'
Expand All @@ -63,7 +63,7 @@ def get_analysis_with_id(entry_id: Optional[str], qparams: RequestParams, datase
if limit > 100 or limit == 0:
limit = 100
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_variants_of_analysis(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'analyses'
Expand All @@ -86,7 +86,7 @@ def get_variants_of_analysis(entry_id: Optional[str], qparams: RequestParams, da
limit = 100
idq="caseLevelData.biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_filtering_terms_of_analyse(entry_id: Optional[str], qparams: RequestParams):
query = {'scopes': 'analysis'}
Expand Down
10 changes: 5 additions & 5 deletions beacon/db/biosamples.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def get_biosamples(entry_id: Optional[str], qparams: RequestParams, dataset: str
limit = 100
idq="id"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_biosample_with_id(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand All @@ -64,7 +64,7 @@ def get_biosample_with_id(entry_id: Optional[str], qparams: RequestParams, datas
limit = 100
idq="id"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_variants_of_biosample(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'g_variants'
Expand All @@ -82,7 +82,7 @@ def get_variants_of_biosample(entry_id: Optional[str], qparams: RequestParams, d
limit = 100
idq="caseLevelData.biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_analyses_of_biosample(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand All @@ -102,7 +102,7 @@ def get_analyses_of_biosample(entry_id: Optional[str], qparams: RequestParams, d
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_runs_of_biosample(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'biosamples'
Expand All @@ -120,7 +120,7 @@ def get_runs_of_biosample(entry_id: Optional[str], qparams: RequestParams, datas
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_filtering_terms_of_biosample(entry_id: Optional[str], qparams: RequestParams):
query = {'scopes': 'biosample'}
Expand Down
15 changes: 9 additions & 6 deletions beacon/db/g_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from beacon.request.model import AlphanumericFilter, Operator, RequestParams
from beacon.db import client
import yaml
import time
from aiohttp import web


Expand Down Expand Up @@ -238,6 +239,7 @@ def apply_request_parameters(query: Dict[str, List[dict]], qparams: RequestParam


def get_variants(entry_id: Optional[str], qparams: RequestParams, dataset: str):
LOG.debug(time.time())
collection = 'g_variants'
mongo_collection = client.beacon.genomicVariations
parameters_as_filters=False
Expand Down Expand Up @@ -267,7 +269,8 @@ def get_variants(entry_id: Optional[str], qparams: RequestParams, dataset: str):
datasets_dict = yaml.safe_load(datasets_file)
#LOG.debug(query)
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
LOG.debug(time.time())
return schema, count, dataset_count, docs, dataset


def get_variant_with_id(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand All @@ -287,7 +290,7 @@ def get_variant_with_id(entry_id: Optional[str], qparams: RequestParams, dataset
limit = 100
idq="caseLevelData.biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_biosamples_of_variant(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand Down Expand Up @@ -323,7 +326,7 @@ def get_biosamples_of_variant(entry_id: Optional[str], qparams: RequestParams, d
limit = 100
idq="id"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_runs_of_variant(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'g_variants'
Expand Down Expand Up @@ -358,7 +361,7 @@ def get_runs_of_variant(entry_id: Optional[str], qparams: RequestParams, dataset
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_analyses_of_variant(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand Down Expand Up @@ -394,7 +397,7 @@ def get_analyses_of_variant(entry_id: Optional[str], qparams: RequestParams, dat
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_filtering_terms_of_genomicvariation(entry_id: Optional[str], qparams: RequestParams):
query = {'scopes': 'genomicVariation'}
Expand Down Expand Up @@ -447,4 +450,4 @@ def get_individuals_of_variant(entry_id: Optional[str], qparams: RequestParams,
limit = 100
idq="id"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset
14 changes: 8 additions & 6 deletions beacon/db/individuals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from beacon.db.utils import query_id, get_count, get_documents
from beacon.request.model import RequestParams
from beacon.db.g_variants import apply_request_parameters
import time
import yaml
from aiohttp import web

Expand All @@ -18,6 +19,7 @@ def include_resultset_responses(query: Dict[str, List[dict]], qparams: RequestPa
return query

def get_individuals(entry_id: Optional[str], qparams: RequestParams, dataset: str):
LOG.debug(time.time())
collection = 'individuals'
mongo_collection = client.beacon.individuals
parameters_as_filters=False
Expand Down Expand Up @@ -45,7 +47,7 @@ def get_individuals(entry_id: Optional[str], qparams: RequestParams, dataset: st
limit = 100
idq="id"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_individual_with_id(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand All @@ -65,7 +67,7 @@ def get_individual_with_id(entry_id: Optional[str], qparams: RequestParams, data
if limit > 100 or limit == 0:
limit = 100
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_variants_of_individual(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand All @@ -89,7 +91,7 @@ def get_variants_of_individual(entry_id: Optional[str], qparams: RequestParams,
limit = 100
idq="caseLevelData.biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_biosamples_of_individual(entry_id: Optional[str], qparams: RequestParams, dataset: str):
Expand All @@ -109,7 +111,7 @@ def get_biosamples_of_individual(entry_id: Optional[str], qparams: RequestParams
limit = 100
idq="id"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset


def get_filtering_terms_of_individual(entry_id: Optional[str], qparams: RequestParams):
Expand Down Expand Up @@ -144,7 +146,7 @@ def get_runs_of_individual(entry_id: Optional[str], qparams: RequestParams, data
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_analyses_of_individual(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'individuals'
Expand All @@ -163,4 +165,4 @@ def get_analyses_of_individual(entry_id: Optional[str], qparams: RequestParams,
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset
8 changes: 4 additions & 4 deletions beacon/db/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_runs(entry_id: Optional[str], qparams: RequestParams, dataset: str):
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_run_with_id(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'runs'
Expand All @@ -59,7 +59,7 @@ def get_run_with_id(entry_id: Optional[str], qparams: RequestParams, dataset: st
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset



Expand All @@ -83,7 +83,7 @@ def get_variants_of_run(entry_id: Optional[str], qparams: RequestParams, dataset
limit = 100
idq="caseLevelData.biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_analyses_of_run(entry_id: Optional[str], qparams: RequestParams, dataset: str):
collection = 'runs'
Expand All @@ -102,7 +102,7 @@ def get_analyses_of_run(entry_id: Optional[str], qparams: RequestParams, dataset
limit = 100
idq="biosampleId"
count, dataset_count, docs = get_docs_by_response_type(include, query, datasets_dict, dataset, limit, skip, mongo_collection, idq)
return schema, count, dataset_count, docs
return schema, count, dataset_count, docs, dataset

def get_filtering_terms_of_run(entry_id: Optional[str], qparams: RequestParams):
query = {'scopes': 'run'}
Expand Down
14 changes: 7 additions & 7 deletions beacon/request/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml
import jwt
import requests
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from beacon.request import ontologies
from beacon.request.model import Granularity, RequestParams
Expand Down Expand Up @@ -59,8 +60,6 @@ async def wrapper(request: Request):
[r for r in records] if records else []
)
LOG.debug(entity_schema)
LOG.debug(response_converted)
LOG.debug(type(response_converted))
response = build_beacon_collection_response(
response_converted, count, qparams, lambda x, y: x, entity_schema
)
Expand Down Expand Up @@ -221,11 +220,12 @@ async def wrapper(request: Request):
#LOG.debug(response_datasets)
new_count=0
loop = asyncio.get_running_loop()
for dataset in response_datasets:
with ThreadPoolExecutor() as pool:
entity_schema, count, dataset_count, records = await loop.run_in_executor(pool, db_fn, entry_id, qparams, dataset)
#LOG.debug(dataset)

with ThreadPoolExecutor() as pool:
done, pending = await asyncio.wait(fs=[loop.run_in_executor(pool, db_fn, entry_id, qparams, dataset) for dataset in response_datasets],
return_when=asyncio.ALL_COMPLETED
)
for task in done:
entity_schema, count, dataset_count, records, dataset = task.result()
if dataset_count != -1:
new_count+=dataset_count
datasets_docs[dataset]=records
Expand Down
2 changes: 1 addition & 1 deletion deploy/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@
#
ontologies_folder = "ontologies"

alphanumeric_terms = ['libraryStrategy', 'molecularAttributes.geneIds', 'diseases.ageOfOnset.iso8601duration']
alphanumeric_terms = ['libraryStrategy', 'molecularAttributes.geneIds', 'diseases.ageOfOnset.iso8601duration', 'molecularAttributes.aminoacidChanges']

ontology_files={"NCIT": "http://purl.obolibrary.org/obo/NCIT.obo"}
3 changes: 1 addition & 2 deletions permissions/public_datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ public_datasets:
- CINECA_dataset
- AV_Dataset
- rd-connect_dataset
- coadread_tcga_pan_can_atlas_2018
- B1MG-COADREAD
- coadread_tcga_pan_can_atlas_2018

0 comments on commit 84c6edb

Please sign in to comment.