Skip to content

Commit

Permalink
[ALS-4461] Allow incremental vcf loading (#73)
Browse files Browse the repository at this point in the history
* ALS-4461: Deserialize variant index from disk

* ALS-4461: Add variant index builder for VCF loading

* ALS-4461: Upgrade major version

* ALS-4461: Store variants by index instead of full variant spec. Refactoring to support incremental vcf loading

* ALS-4461: Initial commit for genomic dataset merger

* ALS-4461: Add jar with dependencies build instructions

* ALS-4461: Fix issue with hardcoded directory

* ALS-4461: Fix more issues with non-relative file paths, various refactoring

* ALS-4461: Parallelize chromosome mask merging

* ALS-4461: Updated hpds version in dockerfile

* ALS-4461: Update genomic directory on loading for variant index stores

* ALS-4461: Change type of variant index store from String (variant spec) to Integer (variant id)

* ALS-4461: Refactor duplicated variant store read/write code

* ALS-4461: Fixing thread issues at startup

* ALS-4461: Fix error handling

* ALS-4461: Clean up error handling in file backed storages

* ALS-4461: Remove IOExceptions thrown from FBBIS

* ALS-4461: Fix deserialization issue

* ALS-4461: Add comment explaining chromosome index merging

* ALS-4461: Add comments

* ALS-4461: Changes per PR

* ALS-4461: Refactor variant spec index to make testing easier

* ALS-4461: Refactor genomic dataset merger to support testing

* ALS-4461: Add validation to prevent patient id duplicates

* ALS-4461: Add main args validation

* ALS-4461: Remove unused code

* ALS-4461: Remove potential race condition
  • Loading branch information
ramari16 authored Aug 23, 2023
1 parent 0b6d03e commit a67d8fe
Show file tree
Hide file tree
Showing 35 changed files with 878 additions and 499 deletions.
4 changes: 2 additions & 2 deletions client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<artifactId>client-api</artifactId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>

<name>client-api</name>
<!-- FIXME change it to the project's website -->
Expand Down
10 changes: 9 additions & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<artifactId>common</artifactId>
Expand All @@ -21,5 +21,13 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,44 +1,52 @@
package edu.harvard.hms.dbmi.avillach.hpds.storage;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.io.*;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.io.output.ByteArrayOutputStream;

public class FileBackedByteIndexedStorage <K, V extends Serializable> implements Serializable {
public abstract class FileBackedByteIndexedStorage <K, V extends Serializable> implements Serializable {
private static final long serialVersionUID = -7297090745384302635L;
private transient RandomAccessFile storage;
private ConcurrentHashMap<K, Long[]> index;
private File storageFile;
private boolean completed = false;
private Long maxStorageSize; //leave this in to not break serialization
protected transient RandomAccessFile storage;
protected ConcurrentHashMap<K, Long[]> index;
protected File storageFile;
protected boolean completed = false;


public FileBackedByteIndexedStorage(Class<K> keyClass, Class<V> valueClass, File storageFile) throws FileNotFoundException {
this.index = new ConcurrentHashMap<K, Long[]>();
this.storageFile = storageFile;
this.storage = new RandomAccessFile(this.storageFile, "rw");
}

public void updateStorageDirectory(File storageDirectory) {
if (!storageDirectory.isDirectory()) {
throw new IllegalArgumentException("storageDirectory is not a directory");
}
String currentStoreageFilename = storageFile.getName();
storageFile = new File(storageDirectory, currentStoreageFilename);
}

public Set<K> keys(){
return index.keySet();
}

public void put(K key, V value) throws IOException {
public void put(K key, V value) {
if(completed) {
throw new RuntimeException("A completed FileBackedByteIndexedStorage cannot be modified.");
}
Long[] recordIndex = store(value);
Long[] recordIndex;
try (ByteArrayOutputStream out = writeObject(value)) {
recordIndex = new Long[2];
synchronized (storage) {
storage.seek(storage.length());
recordIndex[0] = storage.getFilePointer();
storage.write(out.toByteArray());
recordIndex[1] = storage.getFilePointer() - recordIndex[0];
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
index.put(key, recordIndex);
}

Expand All @@ -63,60 +71,44 @@ public void complete() {
this.completed = true;
}

public boolean isComplete() {
return this.completed;
}

private Long[] store(V value) throws IOException {

ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out));
oos.writeObject(value);
oos.flush();
oos.close();

Long[] recordIndex = new Long[2];
synchronized(storage) {
storage.seek(storage.length());
recordIndex[0] = storage.getFilePointer();
storage.write(out.toByteArray());
recordIndex[1] = storage.getFilePointer() - recordIndex[0];
// maxStorageSize = storage.getFilePointer();
}
return recordIndex;
}

public V get(K key) throws IOException {
if(this.storage==null) {
public V get(K key) {
try {
// todo: make this class immutable and remove this lock/check altogether
synchronized(this) {
this.open();
}
}
Long[] offsetsInStorage = index.get(key);
if(offsetsInStorage != null) {
Long offsetInStorage = index.get(key)[0];
int offsetLength = index.get(key)[1].intValue();
if(offsetInStorage != null && offsetLength>0) {
byte[] buffer = new byte[offsetLength];
synchronized(storage) {
storage.seek(offsetInStorage);
storage.readFully(buffer);
if(this.storage==null) {
this.open();
}
ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer)));

try {
V readObject = (V) in.readObject();
}
Long[] offsetsInStorage = index.get(key);
if(offsetsInStorage != null) {
Long offsetInStorage = index.get(key)[0];
int offsetLength = index.get(key)[1].intValue();
if(offsetInStorage != null && offsetLength>0) {
byte[] buffer = new byte[offsetLength];
synchronized(storage) {
storage.seek(offsetInStorage);
storage.readFully(buffer);
}
V readObject = readObject(buffer);
return readObject;
} catch (ClassNotFoundException e) {
throw new RuntimeException("This should never happen.");
} finally {
in.close();
}else {
return null;
}
}else {
} else {
return null;
}
} else {
return null;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected abstract V readObject(byte[] buffer);

protected abstract ByteArrayOutputStream writeObject(V value) throws IOException;

public V getOrELse(K key, V defaultValue) {
V result = get(key);
return result == null ? defaultValue : result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package edu.harvard.hms.dbmi.avillach.hpds.storage;

import java.io.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class FileBackedJavaIndexedStorage <K, V extends Serializable> extends FileBackedByteIndexedStorage<K, V> {
public FileBackedJavaIndexedStorage(Class<K> keyClass, Class<V> valueClass, File storageFile) throws FileNotFoundException {
super(keyClass, valueClass, storageFile);
}

protected ByteArrayOutputStream writeObject(V value) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(new GZIPOutputStream(out));
oos.writeObject(value);
oos.flush();
oos.close();
return out;
}

@Override
protected V readObject(byte[] buffer) {
try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(buffer)));) {
V readObject = (V) in.readObject();
return readObject;
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package edu.harvard.hms.dbmi.avillach.hpds.storage;

import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

import java.io.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public abstract class FileBackedJsonIndexStorage <K, V extends Serializable> extends FileBackedByteIndexedStorage<K, V> {
private static final long serialVersionUID = -1086729119489479152L;

protected transient ObjectMapper objectMapper = new ObjectMapper();

public FileBackedJsonIndexStorage(File storageFile) throws FileNotFoundException {
super(null, null, storageFile);
}

protected ByteArrayOutputStream writeObject(V value) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
objectMapper.writeValue(new GZIPOutputStream(out), value);
return out;
}

protected V readObject(byte[] buffer) {
try {
return objectMapper.readValue(new GZIPInputStream(new ByteArrayInputStream(buffer)), getTypeReference());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

// Required to populate the objectMapper on deserialization
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
objectMapper = new ObjectMapper();
}

public abstract TypeReference<V> getTypeReference();
}
2 changes: 1 addition & 1 deletion data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
</parent>

<artifactId>data</artifactId>
Expand Down
Loading

0 comments on commit a67d8fe

Please sign in to comment.