Skip to content

Commit

Permalink
Merge pull request #2515 from opencb/TASK-6983-3.2.2
Browse files Browse the repository at this point in the history
 TASK-6983 - NPE at DetectIllegalConcurrentFileLoadingsMigration
  • Loading branch information
j-coll authored Oct 7, 2024
2 parents c48210d + d07a744 commit 874d220
Showing 1 changed file with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opencb.opencga.app.migrations.v2.v2_12_5.storage;

import org.apache.commons.lang3.tuple.Pair;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
Expand Down Expand Up @@ -67,6 +68,8 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
if (fileSets.isEmpty()) {
logger.info("No concurrent file loadings found in study '{}'", study);
return;
} else {
logger.info("Found {} sets of files with shared samples in study '{}'", fileSets.size(), study);
}

Map<Integer, TaskMetadata> fileTasks = new HashMap<>();
Expand All @@ -85,10 +88,12 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}

Set<Set<Integer>> fileSetsToInvalidate = new HashSet<>();
Set<Integer> affectedFiles = new HashSet<>();
Set<Integer> affectedSamples = new HashSet<>();
for (Set<Integer> fileSet : fileSets) {
Set<Integer> affectedFiles = new HashSet<>();
Set<Integer> affectedSamples = new HashSet<>();
Set<Integer> invalidFiles = new HashSet<>();
Set<Integer> invalidSampleIndexes = new HashSet<>();

// Check if any task from this file set overlaps in time
List<TaskMetadata> tasks = new ArrayList<>();
for (Integer fileId : fileSet) {
Expand All @@ -97,8 +102,11 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
tasks.add(task);
}
}
if (tasks.size() > 1) {
logger.info("Found {} tasks loading files {}", tasks.size(), fileSet);
if (tasks.size() <= 1) {
continue;
} else {
logger.info("--------------------");
logger.info("Found {} tasks loading files {} in study {}", tasks.size(), fileSet, study);
for (int i = 0; i < tasks.size(); i++) {
TaskMetadata task1 = tasks.get(i);
Date task1start = task1.getStatus().firstKey();
Expand All @@ -108,8 +116,7 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
Date task2start = task2.getStatus().firstKey();
Date task2end = task2.getStatus().lastKey();
if (task1start.before(task2end) && task1end.after(task2start)) {
fileSetsToInvalidate.add(fileSet);
affectedFiles.addAll(task1.getFileIds());
affectedFiles.addAll(fileSet);

List<String> task1Files = task1.getFileIds().stream().map(fileId -> "'" + metadataManager.getFileName(studyId, fileId) + "'(" + fileId + ")").collect(Collectors.toList());
List<String> task2Files = task2.getFileIds().stream().map(fileId -> "'" + metadataManager.getFileName(studyId, fileId) + "'(" + fileId + ")").collect(Collectors.toList());
Expand All @@ -131,8 +138,6 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}

Set<Integer> invalidFiles = new HashSet<>();
List<Integer> invalidSampleIndexes = new ArrayList<>();
for (Integer sampleId : affectedSamples) {
String sampleName = metadataManager.getSampleName(studyId, sampleId);
SampleMetadata sampleMetadata = metadataManager.getSampleMetadata(studyId, sampleId);
Expand All @@ -145,7 +150,7 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
metadataManager.getFileName(studyId, file), file);
}
}
} else if (sampleMetadata.getSampleIndexStatus(Optional.of(sampleMetadata.getSampleIndexVersion()).orElse(-1)) == TaskMetadata.Status.READY) {
} else if (sampleMetadata.getSampleIndexStatus(Optional.ofNullable(sampleMetadata.getSampleIndexVersion()).orElse(-1)) == TaskMetadata.Status.READY) {
for (Integer fileId : sampleMetadata.getFiles()) {
if (affectedFiles.contains(fileId)) {
FileMetadata fileMetadata = metadataManager.getFileMetadata(studyId, fileId);
Expand Down Expand Up @@ -195,6 +200,8 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}
} else {
logger.info("Sample '{}'({}) sample index is not in READY status. Invalidate to ensure rebuild", sampleName, sampleId);
logger.info(" - Invalidating sample index for sample '{}'({})", sampleName, sampleId);
invalidSampleIndexes.add(sampleId);
}
}
Expand All @@ -210,9 +217,19 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
invalidSamples.addAll(metadataManager.getSampleIdsFromFileId(studyId, fileId));
}

logger.info("Affected files: {}", invalidFiles);
logger.info("Affected samples: {}", invalidSamples);
logger.info("Affected sample indexes: {}", invalidSampleIndexes);
logger.info("Study '{}'", study);
List<Pair<String, Integer>> invalidFilesPairs = invalidFiles.stream()
.map(fileId -> Pair.of(metadataManager.getFileName(studyId, fileId), fileId))
.collect(Collectors.toList());
logger.info("Affected files: {}", invalidFilesPairs);
List<Pair<String, Integer>> invalidSamplesPairs = invalidSamples.stream()
.map(sampleId -> Pair.of(metadataManager.getSampleName(studyId, sampleId), sampleId))
.collect(Collectors.toList());
logger.info("Affected samples: {}", invalidSamplesPairs);
List<Pair<String, Integer>> invalidSampleIndexesPairs = invalidSampleIndexes.stream()
.map(sampleId -> Pair.of(metadataManager.getSampleName(studyId, sampleId), sampleId))
.collect(Collectors.toList());
logger.info("Affected sample indexes: {}", invalidSampleIndexesPairs);
}
} else {
ObjectMap event = new ObjectMap()
Expand Down

0 comments on commit 874d220

Please sign in to comment.