Skip to content

Commit

Permalink
ParallelBlockCompressedOutputStream was implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
SilinPavel committed Jun 22, 2017
1 parent cc538e2 commit c4c6ec7
Show file tree
Hide file tree
Showing 7 changed files with 616 additions and 236 deletions.
39 changes: 28 additions & 11 deletions src/main/java/htsjdk/samtools/BAMFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
*/
package htsjdk.samtools;

import htsjdk.samtools.util.AbstractBlockCompressedOutputStream;
import htsjdk.samtools.util.BinaryCodec;
import htsjdk.samtools.util.BlockCompressedOutputStream;
import htsjdk.samtools.util.ParallelBlockCompressedOutputStream;
import htsjdk.samtools.util.RuntimeIOException;
import htsjdk.samtools.util.zip.DeflaterFactory;

Expand All @@ -42,35 +44,50 @@ class BAMFileWriter extends SAMFileWriterImpl {

private final BinaryCodec outputBinaryCodec;
private BAMRecordCodec bamRecordCodec = null;
private final BlockCompressedOutputStream blockCompressedOutputStream;
private final AbstractBlockCompressedOutputStream blockCompressedOutputStream;
private BAMIndexer bamIndexer = null;

protected BAMFileWriter(final File path) {
blockCompressedOutputStream = new BlockCompressedOutputStream(path);
protected BAMFileWriter(final File path, boolean createIndex) {
blockCompressedOutputStream = Defaults.ZIP_THREADS > 0 && !createIndex ?
new ParallelBlockCompressedOutputStream(path) :
new BlockCompressedOutputStream(path);

outputBinaryCodec = new BinaryCodec(new DataOutputStream(blockCompressedOutputStream));
outputBinaryCodec.setOutputFileName(path.getAbsolutePath());
}

protected BAMFileWriter(final File path, final int compressionLevel) {
blockCompressedOutputStream = new BlockCompressedOutputStream(path, compressionLevel);
protected BAMFileWriter(final File path, final int compressionLevel, boolean createIndex) {
blockCompressedOutputStream = Defaults.ZIP_THREADS > 0 && !createIndex ?
new ParallelBlockCompressedOutputStream(path, compressionLevel) :
new BlockCompressedOutputStream(path, compressionLevel);

outputBinaryCodec = new BinaryCodec(new DataOutputStream(blockCompressedOutputStream));
outputBinaryCodec.setOutputFileName(path.getAbsolutePath());
}

protected BAMFileWriter(final OutputStream os, final File file) {
blockCompressedOutputStream = new BlockCompressedOutputStream(os, file);
protected BAMFileWriter(final OutputStream os, final File file, boolean createIndex) {
blockCompressedOutputStream = Defaults.ZIP_THREADS > 0 && !createIndex ?
new ParallelBlockCompressedOutputStream(os, file) :
new BlockCompressedOutputStream(os, file);

outputBinaryCodec = new BinaryCodec(new DataOutputStream(blockCompressedOutputStream));
outputBinaryCodec.setOutputFileName(getPathString(file));
}

protected BAMFileWriter(final OutputStream os, final File file, final int compressionLevel) {
blockCompressedOutputStream = new BlockCompressedOutputStream(os, file, compressionLevel);
protected BAMFileWriter(final OutputStream os, final File file, final int compressionLevel, boolean createIndex) {
blockCompressedOutputStream = Defaults.ZIP_THREADS > 0 && !createIndex ?
new ParallelBlockCompressedOutputStream(os, file, compressionLevel) :
new BlockCompressedOutputStream(os, file, compressionLevel);

outputBinaryCodec = new BinaryCodec(new DataOutputStream(blockCompressedOutputStream));
outputBinaryCodec.setOutputFileName(getPathString(file));
}

protected BAMFileWriter(final OutputStream os, final File file, final int compressionLevel, final DeflaterFactory deflaterFactory) {
blockCompressedOutputStream = new BlockCompressedOutputStream(os, file, compressionLevel, deflaterFactory);
protected BAMFileWriter(final OutputStream os, final File file, final int compressionLevel,
final DeflaterFactory deflaterFactory, boolean createIndex) {
blockCompressedOutputStream = Defaults.ZIP_THREADS > 0 && !createIndex ?
new ParallelBlockCompressedOutputStream(os, file, compressionLevel, deflaterFactory) :
new BlockCompressedOutputStream(os, file, compressionLevel, deflaterFactory);
outputBinaryCodec = new BinaryCodec(new DataOutputStream(blockCompressedOutputStream));
outputBinaryCodec.setOutputFileName(getPathString(file));
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/htsjdk/samtools/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public class Defaults {
private static Log log = Log.getInstance(Defaults.class);

/** Should BAM index files be created when writing out coordinate sorted BAM files? Default = false. */
public static final boolean CREATE_INDEX;

Expand Down Expand Up @@ -84,6 +84,7 @@ public class Defaults {
*/
public static final boolean SRA_LIBRARIES_DOWNLOAD;

public static final int ZIP_THREADS;

static {
CREATE_INDEX = getBooleanProperty("create_index", false);
Expand All @@ -104,6 +105,7 @@ public class Defaults {
CUSTOM_READER_FACTORY = getStringProperty("custom_reader", "");
SAM_FLAG_FIELD_FORMAT = SamFlagField.valueOf(getStringProperty("sam_flag_field_format", SamFlagField.DECIMAL.name()));
SRA_LIBRARIES_DOWNLOAD = getBooleanProperty("sra_libraries_download", false);
ZIP_THREADS = getIntProperty("zip_threads", 0);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/htsjdk/samtools/SAMFileWriterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public SAMFileWriter makeBAMWriter(final SAMFileHeader header, final boolean pre
}
OutputStream os = IOUtil.maybeBufferOutputStream(new FileOutputStream(outputFile, false), bufferSize);
if (createMd5File) os = new Md5CalculatingOutputStream(os, new File(outputFile.getAbsolutePath() + ".md5"));
final BAMFileWriter ret = new BAMFileWriter(os, outputFile, compressionLevel, deflaterFactory);
final BAMFileWriter ret = new BAMFileWriter(os, outputFile, compressionLevel, deflaterFactory, createIndex);
final boolean createIndex = this.createIndex && IOUtil.isRegularPath(outputFile);
if (this.createIndex && !createIndex) {
log.warn("Cannot create index for BAM because output file is not a regular file: " + outputFile.getAbsolutePath());
Expand Down Expand Up @@ -347,7 +347,7 @@ public SAMFileWriter makeSAMWriter(final SAMFileHeader header, final boolean pre
*/

public SAMFileWriter makeBAMWriter(final SAMFileHeader header, final boolean presorted, final OutputStream stream) {
return initWriter(header, presorted, new BAMFileWriter(stream, null, this.getCompressionLevel(), this.deflaterFactory));
return initWriter(header, presorted, new BAMFileWriter(stream, null, this.getCompressionLevel(), this.deflaterFactory, false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* The MIT License
*
* Copyright (c) 2010 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package htsjdk.samtools.util;

import htsjdk.samtools.util.zip.DeflaterFactory;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Deflater;

/**
* Writer for a file that is a series of gzip blocks (BGZF format). The caller just treats it as an
* OutputStream, and under the covers a gzip block is written when the amount of uncompressed as-yet-unwritten
* bytes reaches a threshold.
*
* The advantage of BGZF over conventional gzip is that BGZF allows for seeking without having to scan through
* the entire file up to the position being sought.
*
* Note that the flush() method should not be called by client
* unless you know what you're doing, because it forces a gzip block to be written even if the
* number of buffered bytes has not reached threshold. close(), on the other hand, must be called
* when done writing in order to force the last gzip block to be written.
*
* c.f. http://samtools.sourceforge.net/SAM1.pdf for details of BGZF file format.
*/
public abstract class AbstractBlockCompressedOutputStream extends OutputStream implements LocationAware {

protected static DeflaterFactory defaultDeflaterFactory = new DeflaterFactory();
protected static int defaultCompressionLevel = BlockCompressedStreamConstants.DEFAULT_COMPRESSION_LEVEL;

protected final BinaryCodec codec;
protected final byte[] uncompressedBuffer = new byte[BlockCompressedStreamConstants.DEFAULT_UNCOMPRESSED_BLOCK_SIZE];
protected int numUncompressedBytes = 0;
protected long mBlockAddress = 0;
protected File file = null;

// Really a local variable, but allocate once to reduce GC burden.
protected final byte[] singleByteArray = new byte[1];


/**
* Sets the default {@link DeflaterFactory} that will be used for all instances unless specified otherwise in the constructor.
* If this method is not called the default is a factory that will create the JDK {@link Deflater}.
* @param deflaterFactory non-null default factory.
*/
public static void setDefaultDeflaterFactory(final DeflaterFactory deflaterFactory) {
if (deflaterFactory == null) {
throw new IllegalArgumentException("null deflaterFactory");
}
defaultDeflaterFactory = deflaterFactory;
}

public static DeflaterFactory getDefaultDeflaterFactory() {
return defaultDeflaterFactory;
}

/**
* Sets the GZip compression level for subsequent BlockCompressedOutputStream object creation
* that do not specify the compression level.
* @param compressionLevel 1 <= compressionLevel <= 9
*/
public static void setDefaultCompressionLevel(final int compressionLevel) {
if (compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) {
throw new IllegalArgumentException("Invalid compression level: " + compressionLevel);
}
defaultCompressionLevel = compressionLevel;
}

public static int getDefaultCompressionLevel() {
return defaultCompressionLevel;
}

/**
* Prepare to compress at the given compression level
* @param file file to output
*/
public AbstractBlockCompressedOutputStream(final File file) {
this.file = file;
codec = new BinaryCodec(file, true);
}


public AbstractBlockCompressedOutputStream(final OutputStream os, final File file) {
this.file = file;
codec = new BinaryCodec(os);
if (file != null) {
codec.setOutputFileName(file.getAbsolutePath());
}
}

/**
*
* @param location May be null. Used for error messages, and for checking file termination.
* @param output May or not already be a BlockCompressedOutputStream.
* @return A BlockCompressedOutputStream, either by wrapping the given OutputStream, or by casting if it already
* is a BCOS.
*/
public static BlockCompressedOutputStream maybeBgzfWrapOutputStream(final File location, OutputStream output) {
if (!(output instanceof BlockCompressedOutputStream)) {
return new BlockCompressedOutputStream(output, location);
} else {
return (BlockCompressedOutputStream)output;
}
}

/**
* Writes b.length bytes from the specified byte array to this output stream. The general contract for write(b)
* is that it should have exactly the same effect as the call write(b, 0, b.length).
* @param bytes the data
*/
@Override
public void write(final byte[] bytes) throws IOException {
write(bytes, 0, bytes.length);
}

/**
* Writes len bytes from the specified byte array starting at offset off to this output stream. The general
* contract for write(b, off, len) is that some of the bytes in the array b are written to the output stream in order;
* element b[off] is the first byte written and b[off+len-1] is the last byte written by this operation.
*
* @param bytes the data
* @param startIndex the start offset in the data
* @param numBytes the number of bytes to write
*/
@Override
public void write(final byte[] bytes, int startIndex, int numBytes) throws IOException {
assert(numUncompressedBytes < uncompressedBuffer.length);
while (numBytes > 0) {
final int bytesToWrite = Math.min(uncompressedBuffer.length - numUncompressedBytes, numBytes);
System.arraycopy(bytes, startIndex, uncompressedBuffer, numUncompressedBytes, bytesToWrite);
numUncompressedBytes += bytesToWrite;
startIndex += bytesToWrite;
numBytes -= bytesToWrite;
assert(numBytes >= 0);
if (numUncompressedBytes == uncompressedBuffer.length) {
deflateBlock();
}
}
}

/**
* WARNING: flush() affects the output format, because it causes the current contents of uncompressedBuffer
* to be compressed and written, even if it isn't full. Unless you know what you're doing, don't call flush().
* Instead, call close(), which will flush any unwritten data before closing the underlying stream.
*
*/
@Override
public void flush() throws IOException {
while (numUncompressedBytes > 0) {
deflateBlock();
}
codec.getOutputStream().flush();
}

/**
* close() must be called in order to flush any remaining buffered bytes. An unclosed file will likely be
* defective.
*
*/
@Override
public void close() throws IOException {
flush();
// For debugging...
// if (numberOfThrottleBacks > 0) {
// System.err.println("In BlockCompressedOutputStream, had to throttle back " + numberOfThrottleBacks +
// " times for file " + codec.getOutputFileName());
// }
codec.writeBytes(BlockCompressedStreamConstants.EMPTY_GZIP_BLOCK);
codec.close();
// Can't re-open something that is not a regular file, e.g. a named pipe or an output stream
if (this.file == null || !this.file.isFile()) return;
if (BlockCompressedInputStream.checkTermination(this.file) !=
BlockCompressedInputStream.FileTermination.HAS_TERMINATOR_BLOCK) {
throw new IOException("Terminator block not found after closing BGZF file " + this.file);
}
}

/**
* Writes the specified byte to this output stream. The general contract for write is that one byte is written
* to the output stream. The byte to be written is the eight low-order bits of the argument b.
* The 24 high-order bits of b are ignored.
* @param bite
* @throws IOException
*/
public void write(final int bite) throws IOException {
singleByteArray[0] = (byte)bite;
write(singleByteArray);
}

/** Encode virtual file pointer
* Upper 48 bits is the byte offset into the compressed stream of a block.
* Lower 16 bits is the byte offset into the uncompressed stream inside the block.
*/
public long getFilePointer(){
return BlockCompressedFilePointerUtil.makeFilePointer(mBlockAddress, numUncompressedBytes);
}

@Override
public long getPosition() {
return getFilePointer();
}

/**
* Attempt to write the data in uncompressedBuffer to the underlying file in a gzip block.
* If the entire uncompressedBuffer does not fit in the maximum allowed size, reduce the amount
* of data to be compressed, and slide the excess down in uncompressedBuffer so it can be picked
* up in the next deflate event.
* @return size of gzip block that was written.
*/
protected abstract int deflateBlock();

/**
* Writes the entire gzip block, assuming the compressed data is stored in uncompressedBuffer
* @return size of gzip block that was written.
*/
protected int writeGzipBlock(final byte[] compressedBuffer, final int compressedSize, final int uncompressedSize, final long crc) {
// Init gzip header
codec.writeByte(BlockCompressedStreamConstants.GZIP_ID1);
codec.writeByte(BlockCompressedStreamConstants.GZIP_ID2);
codec.writeByte(BlockCompressedStreamConstants.GZIP_CM_DEFLATE);
codec.writeByte(BlockCompressedStreamConstants.GZIP_FLG);
codec.writeInt(0); // Modification time
codec.writeByte(BlockCompressedStreamConstants.GZIP_XFL);
codec.writeByte(BlockCompressedStreamConstants.GZIP_OS_UNKNOWN);
codec.writeShort(BlockCompressedStreamConstants.GZIP_XLEN);
codec.writeByte(BlockCompressedStreamConstants.BGZF_ID1);
codec.writeByte(BlockCompressedStreamConstants.BGZF_ID2);
codec.writeShort(BlockCompressedStreamConstants.BGZF_LEN);
final int totalBlockSize = compressedSize + BlockCompressedStreamConstants.BLOCK_HEADER_LENGTH +
BlockCompressedStreamConstants.BLOCK_FOOTER_LENGTH;

// I don't know why we store block size - 1, but that is what the spec says
codec.writeShort((short)(totalBlockSize - 1));
codec.writeBytes(compressedBuffer, 0, compressedSize);
codec.writeInt((int)crc);
codec.writeInt(uncompressedSize);
return totalBlockSize;
}
}
4 changes: 2 additions & 2 deletions src/main/java/htsjdk/samtools/util/AsyncBufferedIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ private void backgroundRun() {
}
}
/**
* Block of records from the underlying iterator
* Block of records from the underlying iterator
*/
private static class IteratorBuffer<U> implements Iterator<U> {
private final Throwable exception;
private final Iterator<U> it;
public IteratorBuffer(Iterable<U> it) {
this.it = it != null ? it.iterator() : null;;
this.it = it != null ? it.iterator() : null;
this.exception = null;
}

Expand Down
Loading

0 comments on commit c4c6ec7

Please sign in to comment.