Skip to content

Commit

Permalink
Merge latest change from getVariantsResolver.ts manually
Browse files Browse the repository at this point in the history
  • Loading branch information
jennyziyi-xu committed Aug 22, 2022
1 parent c92cbdf commit 036163a
Showing 1 changed file with 123 additions and 100 deletions.
223 changes: 123 additions & 100 deletions server/src/resolvers/getVariantsResolver/getVariantsResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import logger from '../../logger';
import {
CADDAnnotationQueryResponse,
CombinedVariantQueryResponse,
GnomadAnnotationQueryResponse,
QueryInput,
SourceError,
VariantQueryDataResult,
Expand All @@ -11,8 +12,10 @@ import getLocalQuery from './adapters/localQueryAdapter';
import getRemoteTestNodeQuery from './adapters/remoteTestNodeAdapter';
import fetchCaddAnnotations from './utils/fetchCaddAnnotations';
import annotateCadd from './utils/annotateCadd';
import fetchGnomadAnnotations from './utils/fetchGnomadAnnotations';
import annotateGnomad from './utils/annotateGnomad';
import liftover from './utils/liftOver';
import { QueryResponseError } from './utils/queryResponseError';
import getG4rdNodeQuery from './adapters/g4rdAdapter';
import { timeitAsync } from '../../utils/timeit';
import { SlurmApi, Configuration } from '../../slurm';
Expand All @@ -24,117 +27,137 @@ const isVariantQuery = (
arg: VariantQueryResponse | CADDAnnotationQueryResponse
): arg is VariantQueryResponse => arg.source !== 'CADD annotations';

const resolveVariantQuery = async (args: QueryInput): Promise<CombinedVariantQueryResponse> => {
const {
input: {
sources,
variant: { assemblyId },
gene: { position },
},
} = args;

let annotationPosition = position;

// fetch data
const queries = sources.map(source => buildSourceQuery(source, args));
const settledQueries = await Promise.allSettled([...queries]);

const errors: SourceError[] = [];
const combinedResults: VariantQueryDataResult[] = [];

/* inspect variant results and combine if no errors */
settledQueries.forEach(response => {
if (
response.status === 'fulfilled' &&
isVariantQuery(response.value) &&
!response.value.error
) {
combinedResults.push(...response.value.data);
} else if (response.status === 'fulfilled' && !!response.value.error) {
const message =
process.env.NODE_ENV === 'production' && response.value.error.code === 500
? 'Something went wrong!'
: response.value.error.message;
errors.push({
source: response.value.source,
error: { ...response.value.error!, message },
});
} else if (response.status === 'rejected') {
logger.error('UNHANDLED REJECTION!');
logger.error(response.reason);
throw new Error(response.reason);
}
});

// Send dummy hello world
const [start, end] = annotationPosition.replace(/.+:/, '').split('-');
const size = +end - +start;
if (size > 600000){
const slurm = new SlurmApi(
new Configuration({
basePath: process.env.SLURM_ENDPOINT!,
})
);

const headers = {
'X-SLURM-USER-NAME': process.env.SLURM_USER!,
'X-SLURM-USER-TOKEN': process.env.SLURM_JWT!,
};
const resolveVariantQuery = timeitAsync('resolveVariantQuery')(
async (args: QueryInput): Promise<CombinedVariantQueryResponse> => {
const {
input: {
sources,
variant: { assemblyId },
gene: { position },
},
} = args;

let annotationPosition = position;

// fetch data
const queries = sources.map(source => buildSourceQuery(source, args));
const settledQueries = await Promise.allSettled([...queries]);

const errors: SourceError[] = [];
const combinedResults: VariantQueryDataResult[] = [];

/* inspect variant results and combine if no errors */
settledQueries.forEach(response => {
if (
response.status === 'fulfilled' &&
isVariantQuery(response.value) &&
!response.value.error
) {
combinedResults.push(...response.value.data);
} else if (response.status === 'fulfilled' && !!response.value.error) {
const message =
process.env.NODE_ENV === 'production' && response.value.error.code === 500
? 'Something went wrong!'
: response.value.error.message;

errors.push({
source: response.value.source,
error: { ...response.value.error!, message },
});
} else if (response.status === 'rejected') {
logger.error('UNHANDLED REJECTION!');
logger.error(response.reason);
throw new Error(response.reason);
}
});

logger.debug(`${combinedResults.length} partipants found from queries`);

// Send dummy hello world
const slurmJob = await slurm.slurmctldSubmitJob(
{
script: '#!/bin/bash\necho Hello World!',
job: {
environment: {"SBATCH_DEBUG":1},
current_working_directory: '/home/jxu',
standard_output: 'test.out',
const [start, end] = annotationPosition.replace(/.+:/, '').split('-');
const size = +end - +start;
if (size > 600000) {
const slurm = new SlurmApi(
new Configuration({
basePath: process.env.SLURM_ENDPOINT!,
})
);

const headers = {
'X-SLURM-USER-NAME': process.env.SLURM_USER!,
'X-SLURM-USER-TOKEN': process.env.SLURM_JWT!,
};

const slurmJob = await slurm.slurmctldSubmitJob(
{
script: '#!/bin/bash\necho Hello World!',
job: {
environment: { SBATCH_DEBUG: 1 },
current_working_directory: '/home/jxu',
standard_output: 'test.out',
},
},
},
{
url: `${process.env.SLURM_ENDPOINT}/slurm/v0.0.37/job/submit`,
headers,
}
);
console.log(slurmJob.data.job_id);
}
{
url: `${process.env.SLURM_ENDPOINT}/slurm/v0.0.37/job/submit`,
headers,
}
);
console.log(slurmJob.data.job_id);
}

// filter data that are not in user requested assemblyId
const dataForLiftover = combinedResults.filter(v => v.variant.assemblyId !== assemblyId);
// filter data that are already in user requested assemlbyId
let dataForAnnotation = combinedResults.filter(v => {
if (v.variant.assemblyId === assemblyId) {
v.variant.assemblyIdCurrent = assemblyId;
return true;
} else return false;
});
let unliftedVariants: VariantQueryDataResult[] = [];

// perform liftOver if needed
if (dataForLiftover.length) {
const liftoverResults = await liftover(dataForAnnotation, dataForLiftover, assemblyId);
({ unliftedVariants, dataForAnnotation, annotationPosition } = liftoverResults);
}

// Cadd annotations for data in user requested assemblyId
let data: VariantQueryDataResult[] = dataForAnnotation;

// Only perform CADD and gnomAD annotations if there are variants to annotate
if (data.length) {
try {
const { data: caddAnnotations } = await fetchCaddAnnotations(
annotationPosition,
assemblyId
);

// filter data that are not in user requested assemblyId
const dataForLiftover = combinedResults.filter(v => v.variant.assemblyId !== assemblyId);
// filter data that are already in user requested assemlbyId
let dataForAnnotation = combinedResults.filter(v => {
if (v.variant.assemblyId === assemblyId) {
v.variant.assemblyIdCurrent = assemblyId;
return true;
} else return false;
});
let unliftedVariants: VariantQueryDataResult[] = [];
data = annotateCadd(dataForAnnotation, caddAnnotations);
} catch (err) {
if (err instanceof QueryResponseError) {
const { source, ...error } = err as QueryResponseError;

// perform liftOver if needed
if (dataForLiftover.length) {
const liftoverResults = await liftover(dataForAnnotation, dataForLiftover, assemblyId);
({ unliftedVariants, dataForAnnotation, annotationPosition } = liftoverResults);
}
errors.push({ source, error });
}
}

// Cadd annotations for data in user requested assemblyId
let data: VariantQueryDataResult[] = dataForAnnotation;
const caddAnnotationsPromise = fetchCaddAnnotations(annotationPosition, assemblyId);
const settledCadd = await Promise.allSettled([caddAnnotationsPromise]);
const caddAannotations = settledCadd.find(
res => res.status === 'fulfilled' && !isVariantQuery(res.value)
) as PromiseFulfilledResult<CADDAnnotationQueryResponse>;
try {
const { data: gnomadAnnotations } = await fetchGnomadAnnotations(
assemblyId,
annotationPosition,
data
);

if (!!caddAannotations && !caddAannotations.value.error) {
data = annotateCadd(dataForAnnotation, caddAannotations.value.data);
}
data = annotateGnomad(dataForAnnotation, gnomadAnnotations);
} catch (err) {
if (err instanceof QueryResponseError) {
const { source, ...error } = err;

// gnomAD annotations TODO: gnomAD annotations for GRCh38 are not available yet.
if (assemblyId === 'GRCh37') {
data = await annotateGnomad(data ?? dataForAnnotation);
}
errors.push({ source, error });
}
}
}

// return unmapped variants if there's any
if (unliftedVariants.length) {
Expand Down

0 comments on commit 036163a

Please sign in to comment.