From cba5a0953aa6f65d5a821ce9afc5bff5abc7f4db Mon Sep 17 00:00:00 2001 From: Pavel Silin Date: Wed, 14 Jun 2017 16:44:32 +0300 Subject: [PATCH] New AsyncWriteSortingCollection was implemented --- src/main/java/htsjdk/samtools/Defaults.java | 5 +- .../util/AsyncWriteSortingCollection.java | 182 ++++++++++++++++++ .../samtools/util/SortingCollection.java | 131 ++++++++----- 3 files changed, 265 insertions(+), 53 deletions(-) create mode 100644 src/main/java/htsjdk/samtools/util/AsyncWriteSortingCollection.java diff --git a/src/main/java/htsjdk/samtools/Defaults.java b/src/main/java/htsjdk/samtools/Defaults.java index 5e3f6dab17..5b3ae0504c 100644 --- a/src/main/java/htsjdk/samtools/Defaults.java +++ b/src/main/java/htsjdk/samtools/Defaults.java @@ -15,7 +15,7 @@ */ public class Defaults { private static Log log = Log.getInstance(Defaults.class); - + /** Should BAM index files be created when writing out coordinate sorted BAM files? Default = false. */ public static final boolean CREATE_INDEX; @@ -84,6 +84,7 @@ public class Defaults { */ public static final boolean SRA_LIBRARIES_DOWNLOAD; + public static final int SORTING_COLLECTION_THREADS; static { CREATE_INDEX = getBooleanProperty("create_index", false); @@ -104,6 +105,7 @@ public class Defaults { CUSTOM_READER_FACTORY = getStringProperty("custom_reader", ""); SAM_FLAG_FIELD_FORMAT = SamFlagField.valueOf(getStringProperty("sam_flag_field_format", SamFlagField.DECIMAL.name())); SRA_LIBRARIES_DOWNLOAD = getBooleanProperty("sra_libraries_download", false); + SORTING_COLLECTION_THREADS = getIntProperty("sort_col_threads", 0); } /** @@ -126,6 +128,7 @@ public static SortedMap allDefaults(){ result.put("EBI_REFERENCE_SERVICE_URL_MASK", EBI_REFERENCE_SERVICE_URL_MASK); result.put("CUSTOM_READER_FACTORY", CUSTOM_READER_FACTORY); result.put("SAM_FLAG_FIELD_FORMAT", SAM_FLAG_FIELD_FORMAT); + result.put("SORTING_COLLECTION_THREADS", SORTING_COLLECTION_THREADS); return Collections.unmodifiableSortedMap(result); } diff --git a/src/main/java/htsjdk/samtools/util/AsyncWriteSortingCollection.java b/src/main/java/htsjdk/samtools/util/AsyncWriteSortingCollection.java new file mode 100644 index 0000000000..8713889bef --- /dev/null +++ b/src/main/java/htsjdk/samtools/util/AsyncWriteSortingCollection.java @@ -0,0 +1,182 @@ +/* + * The MIT License + * + * Copyright (c) 2009 The Broad Institute + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package htsjdk.samtools.util; + +import htsjdk.samtools.Defaults; +import htsjdk.samtools.SAMException; + +import java.io.File; +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This class has exactly the same API that SortingCollection, however, + * sorts and spills the data to disk in a separate ExecutorService. Identify the maximum number of records + * in memory that is guaranteed not to exceed the number of records that would have the object class SortingCollection. + * + * @author Pavel_Silin@epam.com, EPAM Systems, Inc. + */ +public class AsyncWriteSortingCollection extends SortingCollection { + + private static final ExecutorService service = Executors.newFixedThreadPool( + Defaults.SORTING_COLLECTION_THREADS, + r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + ); + + private final BlockingQueue instancePool; + private CompletableFuture finishFlagFuture; + + @SuppressWarnings("unchecked") + public AsyncWriteSortingCollection(final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRam, + final File... tmpDir) { + super(componentType, codec, comparator, + // calculate available space for one buffer + maxRecordsInRam / (Defaults.SORTING_COLLECTION_THREADS + 1), + tmpDir + ); + + if (Defaults.SORTING_COLLECTION_THREADS <= 0) { + throw new IllegalArgumentException("JVM parameter sort_col_threads can't be <= 0"); + } + + instancePool = new LinkedBlockingQueue<>(Defaults.SORTING_COLLECTION_THREADS); + for (int i = 0; i < Defaults.SORTING_COLLECTION_THREADS; i++) { + instancePool.offer((T[]) Array.newInstance(componentType, + maxRecordsInRam / (Defaults.SORTING_COLLECTION_THREADS + 1)) + ); + } + finishFlagFuture = CompletableFuture.completedFuture(null); + } + + @Override + public void doneAdding() { + super.doneAdding(); + if (instancePool != null && !instancePool.isEmpty()) { + instancePool.clear(); + } + } + + /** + * @see SortingCollection + */ + @SuppressWarnings("unused") + public static AsyncWriteSortingCollection newInstance( + final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRAM, + final File... tmpDir) { + return new AsyncWriteSortingCollection<>(componentType, codec, + comparator, maxRecordsInRAM, tmpDir); + + } + + /** + * @see SortingCollection + */ + @SuppressWarnings("unused") + public static AsyncWriteSortingCollection newInstance( + final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRAM, + final Collection tmpDirs) { + return new AsyncWriteSortingCollection<>(componentType, codec, + comparator, maxRecordsInRAM, + tmpDirs.toArray(new File[tmpDirs.size()])); + } + + /** + * @see SortingCollection + */ + @SuppressWarnings("unused") + public static AsyncWriteSortingCollection newInstance( + final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRAM) { + + final File tmpDir = new File(System.getProperty("java.io.tmpdir")); + return new AsyncWriteSortingCollection<>(componentType, codec, + comparator, maxRecordsInRAM, tmpDir); + } + + /** + This method is called from SortingCollection.add method to perform spill + to disk operation We override it here to put collected ramRecords to the runnable task. + The pack then will be sorted and spilled to disk in a separate thread(s). + */ + @Override + protected void performSpillToDisk() { + try { + final T[] buffRamRecords = this.ramRecords; + final int buffNumRecordsInRam = this.numRecordsInRam; + this.ramRecords = instancePool.take(); + this.numRecordsInRam = 0; + + //run task, and then it's done, put buffer in pool + CompletableFuture sortSpillTask = CompletableFuture.supplyAsync( + () -> { + Arrays.sort(buffRamRecords, 0, buffNumRecordsInRam, comparator); + spill(buffRamRecords, buffNumRecordsInRam, codec.clone()); + return buffRamRecords; + }, + service + ).thenAccept(this::returnRamRecordsToPool); + + finishFlagFuture = CompletableFuture.allOf(finishFlagFuture, sortSpillTask); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SAMException("Failed to spill to disk once the tmp file.", e); + } + } + + private void returnRamRecordsToPool(T[] t) { + try { + instancePool.put(t); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SAMException( + "Failed to put in ramRecordPool once the ramRecords array.", e + ); + } + } + + //This method can be called to the fact that a thread calling it could wait correctly completion work. + @Override + protected void finishing() { + finishFlagFuture.join(); + } +} diff --git a/src/main/java/htsjdk/samtools/util/SortingCollection.java b/src/main/java/htsjdk/samtools/util/SortingCollection.java index 69ce2556b9..4c83ba9fbd 100644 --- a/src/main/java/htsjdk/samtools/util/SortingCollection.java +++ b/src/main/java/htsjdk/samtools/util/SortingCollection.java @@ -34,14 +34,14 @@ import java.io.OutputStream; import java.io.Serializable; import java.lang.reflect.Array; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; +import java.util.Queue; import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedQueue; /** * Collection to which many records can be added. After all records are added, the collection can be @@ -63,23 +63,23 @@ public class SortingCollection implements Iterable { * read from file. */ public interface Codec extends Cloneable { + /** * Where to write encoded output * @param os */ void setOutputStream(OutputStream os); - /** * Where to read encoded input from * @param is */ void setInputStream(InputStream is); + /** * Write object to output stream * @param val what to write */ void encode(T val); - /** * Read the next record from the input stream and convert into a java object. * @return null if no more records. Should throw exception if EOF is encountered in the middle of @@ -93,42 +93,42 @@ public interface Codec extends Cloneable { * that each is reading a separate file. */ Codec clone(); - } + } /** Directories where files of sorted records go. */ - private final File[] tmpDirs; + protected final File[] tmpDirs; /** The minimum amount of space free on a temp filesystem to write a file there. */ - private final long TMP_SPACE_FREE = IOUtil.FIVE_GBS; + protected final long TMP_SPACE_FREE = IOUtil.FIVE_GBS; /** * Used to write records to file, and used as a prototype to create codecs for reading. */ - private final SortingCollection.Codec codec; + protected final SortingCollection.Codec codec; /** * For sorting, both when spilling records to file, and merge sorting. */ - private final Comparator comparator; - private final int maxRecordsInRam; - private int numRecordsInRam = 0; - private T[] ramRecords; - private boolean iterationStarted = false; - private boolean doneAdding = false; + protected final Comparator comparator; + protected final int maxRecordsInRam; + protected int numRecordsInRam = 0; + protected T[] ramRecords; + protected boolean iterationStarted = false; + protected boolean doneAdding = false; /** * Set to true when all temp files have been cleaned up */ - private boolean cleanedUp = false; + protected boolean cleanedUp = false; /** * List of files in tmpDir containing sorted records */ - private final List files = new ArrayList(); + protected final Queue files = new ConcurrentLinkedQueue<>(); - private boolean destructiveIteration = true; + protected boolean destructiveIteration = true; - private TempStreamFactory tempStreamFactory = new TempStreamFactory(); + protected TempStreamFactory tempStreamFactory = new TempStreamFactory(); /** * Prepare to accumulate records to be sorted @@ -138,7 +138,7 @@ public interface Codec extends Cloneable { * @param maxRecordsInRam how many records to accumulate before spilling to disk * @param tmpDir Where to write files of records that will not fit in RAM */ - private SortingCollection(final Class componentType, final SortingCollection.Codec codec, + protected SortingCollection(final Class componentType, final SortingCollection.Codec codec, final Comparator comparator, final int maxRecordsInRam, final File... tmpDir) { if (maxRecordsInRam <= 0) { throw new IllegalArgumentException("maxRecordsInRam must be > 0"); @@ -163,11 +163,15 @@ public void add(final T rec) { throw new IllegalStateException("Cannot add after calling iterator()"); } if (numRecordsInRam == maxRecordsInRam) { - spillToDisk(); + performSpillToDisk(); } ramRecords[numRecordsInRam++] = rec; } + void performSpillToDisk() { + defaultSpillToDisk(); + } + /** * This method can be called after caller is done adding to collection, in order to possibly free * up memory. If iterator() is called immediately after caller is done adding, this is not necessary, @@ -183,18 +187,24 @@ public void doneAdding() { doneAdding = true; + finishing(); + if (this.files.isEmpty()) { return; } if (this.numRecordsInRam > 0) { - spillToDisk(); + defaultSpillToDisk(); } // Facilitate GC this.ramRecords = null; } + void finishing() { + // hook to be overriden, do nothing here + } + /** * @return True if this collection is allowed to discard data during iteration in order to reduce memory * footprint, precluding a second iteration over the collection. @@ -214,18 +224,24 @@ public void setDestructiveIteration(boolean destructiveIteration) { /** * Sort the records in memory, write them to a file, and clear the buffer of records in memory. */ - private void spillToDisk() { + protected void defaultSpillToDisk() { + Arrays.sort(this.ramRecords, 0, this.numRecordsInRam, this.comparator); + spill(this.ramRecords, this.numRecordsInRam, this.codec); + numRecordsInRam = 0; + } + + void spill(T[] buffRamRecords, int buffNumRecordsInRam, Codec codec) { try { - Arrays.sort(this.ramRecords, 0, this.numRecordsInRam, this.comparator); final File f = newTempFile(); OutputStream os = null; try { - os = tempStreamFactory.wrapTempOutputStream(new FileOutputStream(f), Defaults.BUFFER_SIZE); - this.codec.setOutputStream(os); - for (int i = 0; i < this.numRecordsInRam; ++i) { - this.codec.encode(ramRecords[i]); + os = tempStreamFactory.wrapTempOutputStream( + new FileOutputStream(f), Defaults.BUFFER_SIZE); + codec.setOutputStream(os); + for (int i = 0; i < buffNumRecordsInRam; ++i) { + codec.encode(buffRamRecords[i]); // Facilitate GC - this.ramRecords[i] = null; + buffRamRecords[i] = null; } os.flush(); @@ -238,7 +254,6 @@ private void spillToDisk() { } } - this.numRecordsInRam = 0; this.files.add(f); } @@ -293,13 +308,18 @@ public void cleanup() { * @param maxRecordsInRAM how many records to accumulate in memory before spilling to disk * @param tmpDir Where to write files of records that will not fit in RAM */ - public static SortingCollection newInstance(final Class componentType, - final SortingCollection.Codec codec, - final Comparator comparator, - final int maxRecordsInRAM, - final File... tmpDir) { - return new SortingCollection(componentType, codec, comparator, maxRecordsInRAM, tmpDir); - + public static SortingCollection newInstance( + final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRAM, + final File... tmpDir) { + if (Defaults.SORTING_COLLECTION_THREADS > 0) { + return new AsyncWriteSortingCollection<>(componentType, codec, + comparator, maxRecordsInRAM, tmpDir); + } else { + return new SortingCollection<>(componentType, codec, comparator, + maxRecordsInRAM, tmpDir); + } } /** @@ -311,17 +331,18 @@ public static SortingCollection newInstance(final Class componentType, * @param maxRecordsInRAM how many records to accumulate in memory before spilling to disk * @param tmpDirs Where to write files of records that will not fit in RAM */ - public static SortingCollection newInstance(final Class componentType, - final SortingCollection.Codec codec, - final Comparator comparator, - final int maxRecordsInRAM, - final Collection tmpDirs) { - return new SortingCollection(componentType, - codec, - comparator, - maxRecordsInRAM, - tmpDirs.toArray(new File[tmpDirs.size()])); - + public static SortingCollection newInstance( + final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRAM, + final Collection tmpDirs) { + if (Defaults.SORTING_COLLECTION_THREADS > 0) { + return new AsyncWriteSortingCollection<>(componentType, codec, + comparator, maxRecordsInRAM, tmpDirs.toArray(new File[tmpDirs.size()])); + } else { + return new SortingCollection<>(componentType, codec, comparator, + maxRecordsInRAM, tmpDirs.toArray(new File[tmpDirs.size()])); + } } @@ -333,13 +354,19 @@ public static SortingCollection newInstance(final Class componentType, * @param comparator Defines output sort order * @param maxRecordsInRAM how many records to accumulate in memory before spilling to disk */ - public static SortingCollection newInstance(final Class componentType, - final SortingCollection.Codec codec, - final Comparator comparator, - final int maxRecordsInRAM) { + public static SortingCollection newInstance( + final Class componentType, + final SortingCollection.Codec codec, + final Comparator comparator, final int maxRecordsInRAM) { final File tmpDir = new File(System.getProperty("java.io.tmpdir")); - return new SortingCollection(componentType, codec, comparator, maxRecordsInRAM, tmpDir); + if (Defaults.SORTING_COLLECTION_THREADS > 0) { + return new AsyncWriteSortingCollection<>(componentType, codec, + comparator, maxRecordsInRAM, tmpDir); + } else { + return new SortingCollection<>(componentType, codec, comparator, + maxRecordsInRAM, tmpDir); + } } /**