Skip to content

Commit

Permalink
ALS-4461: Refactor genomic dataset merger to support testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Aug 17, 2023
1 parent 15c9f00 commit 0a5fde5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,15 @@ public void updateStorageDirectory(File storageDirectory) {
allValues.updateStorageDirectory(storageDirectory);
}

public void write(File outputFile)
throws IOException {
FileOutputStream fos = new FileOutputStream(outputFile);
GZIPOutputStream gzos = new GZIPOutputStream(fos);
ObjectOutputStream oos = new ObjectOutputStream(gzos);
oos.writeObject(this);
oos.flush();
oos.close();
gzos.flush();
gzos.close();
fos.flush();
fos.close();
public void write(File outputFile) {
try(
FileOutputStream fos = new FileOutputStream(outputFile);
GZIPOutputStream gzos = new GZIPOutputStream(fos);
ObjectOutputStream oos = new ObjectOutputStream(gzos);) {
oos.writeObject(this);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,26 @@

public class GenomicDatasetMerger {

public static final String INFO_STORE_JAVABIN_SUFFIX = "infoStore.javabin";
public static final String VARIANT_SPEC_INDEX_FILENAME = "variantSpecIndex.javabin";
private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class);

private final VariantStore variantStore1;
private final VariantStore variantStore2;

private final VariantStore mergedVariantStore;

private final String genomicDirectory1;
private final String genomicDirectory2;
private final Map<String, FileBackedByteIndexedInfoStore> infoStores1;
private final Map<String, FileBackedByteIndexedInfoStore> infoStores2;

private final String outputDirectory;

public GenomicDatasetMerger(String genomicDirectory1, String genomicDirectory2, String outputDirectory) throws IOException, ClassNotFoundException, InterruptedException {
this.genomicDirectory1 = genomicDirectory1;
this.genomicDirectory2 = genomicDirectory2;
this.variantStore1 = VariantStore.readInstance(genomicDirectory1);
this.variantStore2 = VariantStore.readInstance(genomicDirectory2);
this.mergedVariantStore = new VariantStore();
private final VariantStore mergedVariantStore;

validate();
public GenomicDatasetMerger(VariantStore variantStore1, VariantStore variantStore2, Map<String, FileBackedByteIndexedInfoStore> infoStores1, Map<String, FileBackedByteIndexedInfoStore> infoStores2, String outputDirectory) {
this.variantStore1 = variantStore1;
this.variantStore2 = variantStore2;
this.mergedVariantStore = new VariantStore();
this.infoStores1 = infoStores1;
this.infoStores2 = infoStores2;
this.outputDirectory = outputDirectory;
validate();
}

private void validate() {
Expand All @@ -54,19 +51,6 @@ private void validate() {
}
}

/**
* args[0]: directory containing genomic dataset 1
* args[1]: directory containing genomic dataset 2
* args[2]: output directory
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String outputDirectory = args[2];
GenomicDatasetMerger genomicDatasetMerger = new GenomicDatasetMerger(args[0], args[1], outputDirectory);
VariantStore mergedVariantStore = genomicDatasetMerger.merge();

mergedVariantStore.writeInstance(outputDirectory);
}

public VariantStore merge() throws IOException {
Map<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMasks>>> mergedChromosomeMasks = mergeChromosomeMasks();
VariantStore mergedVariantStore = mergeVariantStore(mergedChromosomeMasks);
Expand Down Expand Up @@ -103,8 +87,8 @@ public VariantStore mergeVariantStore(Map<String, FileBackedJsonIndexStorage<Int
* @throws IOException
*/
public Map<String, FileBackedByteIndexedInfoStore> mergeVariantIndexes() throws IOException {
String[] variantIndex1 = VariantStore.loadVariantIndexFromFile(genomicDirectory1);
String[] variantIndex2 = VariantStore.loadVariantIndexFromFile(genomicDirectory2);
String[] variantIndex1 = variantStore1.getVariantSpecIndex();
String[] variantIndex2 = variantStore2.getVariantSpecIndex();

Map<String, Integer> variantSpecToIndexMap = new HashMap<>();
LinkedList<String> variantSpecList = new LinkedList<>(Arrays.asList(variantIndex1));
Expand Down Expand Up @@ -132,8 +116,6 @@ public Map<String, FileBackedByteIndexedInfoStore> mergeVariantIndexes() throws
}
}

Map<String, FileBackedByteIndexedInfoStore> infoStores1 = loadInfoStores(genomicDirectory1);
Map<String, FileBackedByteIndexedInfoStore> infoStores2 = loadInfoStores(genomicDirectory2);
Map<String, FileBackedByteIndexedInfoStore> mergedInfoStores = new HashMap<>();

if (!infoStores1.keySet().equals(infoStores2.keySet())) {
Expand All @@ -160,36 +142,12 @@ public Map<String, FileBackedByteIndexedInfoStore> mergeVariantIndexes() throws
infoStore.allValues = mergedInfoStoreValues;
FileBackedByteIndexedInfoStore mergedStore = new FileBackedByteIndexedInfoStore(new File(outputDirectory), infoStore);
mergedInfoStores.put(infoStores1Entry.getKey(), mergedStore);
mergedStore.write(new File(outputDirectory + infoStore.column_key + "_" + INFO_STORE_JAVABIN_SUFFIX));
}

mergedVariantStore.setVariantSpecIndex(variantSpecList.toArray(new String[0]));
return mergedInfoStores;
}

private Map<String, FileBackedByteIndexedInfoStore> loadInfoStores(String directory) {
Map<String, FileBackedByteIndexedInfoStore> infoStores = new HashMap<>();
File genomicDataDirectory = new File(directory);
if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) {
Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith(INFO_STORE_JAVABIN_SUFFIX);}))
.forEach((String filename)->{
try (
FileInputStream fis = new FileInputStream(directory + filename);
GZIPInputStream gis = new GZIPInputStream(fis);
ObjectInputStream ois = new ObjectInputStream(gis)
){
log.info("loading " + filename);
FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject();
infoStore.updateStorageDirectory(genomicDataDirectory);
infoStores.put(filename.replace("_" + INFO_STORE_JAVABIN_SUFFIX, ""), infoStore);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}
return infoStores;
}

/**
* Merge patient ids from both variant stores. We are simply appending patients from store 2 to patients from store 1
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package edu.harvard.hms.dbmi.avillach.hpds.etl.genotype;

import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantStore;
import edu.harvard.hms.dbmi.avillach.hpds.storage.FileBackedJsonIndexStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.GZIPInputStream;

public class GenomicDatasetMergerRunner {

private static Logger log = LoggerFactory.getLogger(GenomicDatasetMerger.class);

public static final String INFO_STORE_JAVABIN_SUFFIX = "infoStore.javabin";
public static final String VARIANT_SPEC_INDEX_FILENAME = "variantSpecIndex.javabin";

private static String genomicDirectory1;
private static String genomicDirectory2;

/**
* args[0]: directory containing genomic dataset 1
* args[1]: directory containing genomic dataset 2
* args[2]: output directory
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// todo :validation
genomicDirectory1 = args[0];
genomicDirectory2 = args[1];
String outputDirectory = args[2];

Map<String, FileBackedByteIndexedInfoStore> infoStores1 = loadInfoStores(genomicDirectory1);
Map<String, FileBackedByteIndexedInfoStore> infoStores2 = loadInfoStores(genomicDirectory2);

GenomicDatasetMerger genomicDatasetMerger = new GenomicDatasetMerger(VariantStore.readInstance(genomicDirectory1),VariantStore.readInstance(genomicDirectory2), infoStores1, infoStores2, outputDirectory);

Map<String, FileBackedJsonIndexStorage<Integer, ConcurrentHashMap<String, VariantMasks>>> mergedChromosomeMasks = genomicDatasetMerger.mergeChromosomeMasks();
VariantStore mergedVariantStore = genomicDatasetMerger.mergeVariantStore(mergedChromosomeMasks);
Map<String, FileBackedByteIndexedInfoStore> variantIndexes = genomicDatasetMerger.mergeVariantIndexes();

mergedVariantStore.writeInstance(outputDirectory);
variantIndexes.values().forEach(variantIndex -> {
variantIndex.write(new File(outputDirectory + variantIndex.column_key + "_" + INFO_STORE_JAVABIN_SUFFIX));
});
}

private static Map<String, FileBackedByteIndexedInfoStore> loadInfoStores(String directory) {
Map<String, FileBackedByteIndexedInfoStore> infoStores = new HashMap<>();
File genomicDataDirectory = new File(directory);
if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) {
Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith(INFO_STORE_JAVABIN_SUFFIX);}))
.forEach((String filename)->{
try (
FileInputStream fis = new FileInputStream(directory + filename);
GZIPInputStream gis = new GZIPInputStream(fis);
ObjectInputStream ois = new ObjectInputStream(gis)
){
log.info("loading " + filename);
FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject();
infoStore.updateStorageDirectory(genomicDataDirectory);
infoStores.put(filename.replace("_" + INFO_STORE_JAVABIN_SUFFIX, ""), infoStore);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}
return infoStores;
}
}

0 comments on commit 0a5fde5

Please sign in to comment.