Skip to content

Commit

Permalink
EVA-3669 New job for deprecating SVE variants by reading accessions o…
Browse files Browse the repository at this point in the history
…f variants from file (#463)

* new job for deprecating ss variants - reading ids from file
  • Loading branch information
nitin-ebi authored Oct 21, 2024
1 parent 9296d13 commit dadf741
Show file tree
Hide file tree
Showing 14 changed files with 679 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2022 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.accession.deprecate.batch.io;

import com.mongodb.BasicDBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import uk.ac.ebi.eva.accession.core.model.eva.SubmittedVariantEntity;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Read all SubmittedVariants for a given assembly whose ids are given in the input file
*/
public class SubmittedVariantsFileReader implements ItemStreamReader<SubmittedVariantEntity> {

private static final Logger logger = LoggerFactory.getLogger(SubmittedVariantsFileReader.class);

private static final String ASSEMBLY_FIELD = "seq";
private static final String ACCESSION_FIELD = "accession";

private BufferedReader reader;
private boolean endOfFile = false;
private String assembly;
private String variantIdFile;
private MongoCursor<Document> evaCursor;
private MongoConverter converter;
private MongoTemplate mongoTemplate;
private int chunkSize;

public SubmittedVariantsFileReader(String assembly, String variantIdFile, MongoTemplate mongoTemplate, int chunkSize) {
this.assembly = assembly;
this.variantIdFile = variantIdFile;
this.mongoTemplate = mongoTemplate;
this.chunkSize = chunkSize;
}

@Override
public SubmittedVariantEntity read() {
if (evaCursor == null || !evaCursor.hasNext()) {
if (endOfFile) {
return null;
}

loadNextBatchAndQuery();

if (evaCursor == null || !evaCursor.hasNext()) {
return null;
}
}

Document nextElement = evaCursor.next();
return getSubmittedVariantEntity(nextElement);
}

private SubmittedVariantEntity getSubmittedVariantEntity(Document document) {
return converter.read(SubmittedVariantEntity.class, new BasicDBObject(document));
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
try {
reader = new BufferedReader(new FileReader(variantIdFile));
} catch (IOException e) {
throw new ItemStreamException("Failed to open the file (" + variantIdFile + ") with variant IDs", e);
}
initializeReader();
}

public void initializeReader() {
converter = mongoTemplate.getConverter();
loadNextBatchAndQuery();
}

private void loadNextBatchAndQuery() {
List<Long> variantIds = new ArrayList<>();
String line;

try {
while (variantIds.size() < chunkSize && (line = reader.readLine()) != null) {
variantIds.add(Long.parseLong(line.trim()));
}
if (variantIds.isEmpty()) {
endOfFile = true;
return;
}
} catch (IOException e) {
throw new ItemStreamException("Error reading variant IDs from file", e);
}

Bson query = Filters.and(Filters.in(ACCESSION_FIELD, variantIds), Filters.eq(ASSEMBLY_FIELD, assembly));
logger.info("Issuing find in EVA collection for a batch of IDs: {}", query);
FindIterable<Document> submittedVariantsEVA = getSubmittedVariants(query, SubmittedVariantEntity.class);
evaCursor = submittedVariantsEVA.iterator();
}

private FindIterable<Document> getSubmittedVariants(Bson query, Class<?> entityClass) {
return mongoTemplate.getCollection(mongoTemplate.getCollectionName(entityClass))
.find(query)
.noCursorTimeout(true)
.batchSize(chunkSize);
}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {

}

@Override
public void close() throws ItemStreamException {
try {
if (evaCursor != null) {
evaCursor.close();
}
if (reader != null) {
reader.close();
}
} catch (IOException e) {
throw new ItemStreamException("Exception while closing resources", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ public class BeanNames {

public static final String STUDY_SUMITTED_VARIANTS_READER = "STUDY_SUMITTED_VARIANTS_READER";

public static final String SUBMITTED_VARIANTS_FILE_READER = "SUBMITTED_VARIANTS_FILE_READER";

public static final String STUDY_DEPRECATION_WRITER = "STUDY_DEPRECATION_WRITER";

public static final String DEPRECATE_STUDY_SUBMITTED_VARIANTS_STEP = "DEPRECATE_STUDY_SUBMITTED_VARIANTS_STEP";

public static final String DEPRECATE_STUDY_SUBMITTED_VARIANTS_JOB = "DEPRECATE_STUDY_SUBMITTED_VARIANTS_JOB";

public static final String DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_STEP = "DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_STEP";

public static final String DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_JOB = "DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_JOB";

public static final String DEPRECATION_PROGRESS_LISTENER = "DEPRECATION_PROGRESS_LISTENER";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.accession.deprecate.configuration.batch.io;

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.mongodb.core.MongoTemplate;
import uk.ac.ebi.eva.accession.core.configuration.nonhuman.MongoConfiguration;
import uk.ac.ebi.eva.accession.deprecate.batch.io.SubmittedVariantsFileReader;
import uk.ac.ebi.eva.accession.deprecate.configuration.BeanNames;
import uk.ac.ebi.eva.accession.deprecate.parameters.InputParameters;

@Configuration
@Import({MongoConfiguration.class})
public class StudySubmittedVariantsFileReaderConfiguration {

@Bean(BeanNames.SUBMITTED_VARIANTS_FILE_READER)
@StepScope
SubmittedVariantsFileReader submittedVariantsFileReader(MongoTemplate mongoTemplate, InputParameters parameters) {
return new SubmittedVariantsFileReader(parameters.getAssemblyAccession(), parameters.getVariantIdFile(),
mongoTemplate, parameters.getChunkSize());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.accession.deprecate.configuration.batch.jobs;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.deprecate.configuration.BeanNames;
import uk.ac.ebi.eva.commons.batch.configuration.SpringBoot1CompatibilityConfiguration;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
public class DeprecateSubmittedVariantsFromFileJobConfiguration {

@Autowired
@Qualifier(BeanNames.DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_STEP)
private Step deprecateSubmittedVariantsFromFileStep;

@Bean(BeanNames.DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_JOB)
public Job deprecateStudySubmittedVariantsFromFileJob(JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get(BeanNames.DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_JOB)
.incrementer(new RunIdIncrementer())
.start(deprecateSubmittedVariantsFromFileStep)
.build();
}

@Bean
public BatchConfigurer configurer(DataSource dataSource, EntityManagerFactory entityManagerFactory)
throws Exception {
return SpringBoot1CompatibilityConfiguration.getSpringBoot1CompatibleBatchConfigurer(dataSource,
entityManagerFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.accession.deprecate.configuration.batch.steps;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.ac.ebi.eva.accession.core.model.eva.SubmittedVariantEntity;
import uk.ac.ebi.eva.accession.deprecate.configuration.BeanNames;

@Configuration
@EnableBatchProcessing
public class DeprecateSubmittedVariantsFromFileStepConfiguration {

@Autowired
@Qualifier(BeanNames.SUBMITTED_VARIANTS_FILE_READER)
private ItemStreamReader<SubmittedVariantEntity> submittedVariantsFileReader;

@Autowired
@Qualifier(BeanNames.STUDY_DEPRECATION_WRITER)
private ItemWriter<SubmittedVariantEntity> submittedVariantDeprecationWriter;

@Autowired
@Qualifier(BeanNames.DEPRECATION_PROGRESS_LISTENER)
private StepExecutionListener progressListener;

@Bean(BeanNames.DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_STEP)
public Step deprecateSubmittedVariantsFromFileStep(StepBuilderFactory stepBuilderFactory,
SimpleCompletionPolicy chunkSizeCompletionPolicy) {
TaskletStep step = stepBuilderFactory.get(BeanNames.DEPRECATE_SUBMITTED_VARIANTS_FROM_FILE_STEP)
.<SubmittedVariantEntity, SubmittedVariantEntity>chunk(chunkSizeCompletionPolicy)
.reader(submittedVariantsFileReader)
.writer(submittedVariantDeprecationWriter)
.listener(progressListener)
.build();
return step;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class InputParameters {

private String deprecationReason;

private String variantIdFile;

public int getChunkSize() {
return chunkSize;
}
Expand Down Expand Up @@ -66,4 +68,12 @@ public String getDeprecationReason() {
public void setDeprecationReason(String deprecationReason) {
this.deprecationReason = deprecationReason;
}

public String getVariantIdFile() {
return variantIdFile;
}

public void setVariantIdFile(String variantIdFile) {
this.variantIdFile = variantIdFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ public static void populateTestDB(MongoTemplate mongoTemplate) {
mongoTemplate.save(rs2, mongoTemplate.getCollectionName(ClusteredVariantEntity.class));
}

public static void populateTestDBForFile(MongoTemplate mongoTemplate) {
// rs1 -> ss1,ss2
// rs2 -> ss3,ss4
ss1 = createSS(STUDY1, 5L, 1L, 100L, "C", "T");
ss2 = createSS(STUDY1, 6L, 1L, 100L, "C", "A");
ss3 = createSS(STUDY2, 7L, 5L, 102L, "T", "G");
ss4 = createSS(STUDY2, 8L, 5L, 102L, "T", "A");

rs1 = createRS(ss1);
mongoTemplate.save(rs1, mongoTemplate.getCollectionName(DbsnpClusteredVariantEntity.class));

mongoTemplate.insert(Arrays.asList(ss1, ss2, ss3, ss4), SubmittedVariantEntity.class);
rs2 = createRS(ss3);
mongoTemplate.save(rs2, mongoTemplate.getCollectionName(ClusteredVariantEntity.class));
}

public static void assertPostDeprecationDatabaseState(MongoTemplate mongoTemplate) {
// ss4 was not deprecated and still remains
assertEquals(1, mongoTemplate.findAll(SubmittedVariantEntity.class).size());
Expand Down
Loading

0 comments on commit dadf741

Please sign in to comment.