diff --git a/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/MarkDuplicatesSparkArgumentCollection.java b/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/MarkDuplicatesSparkArgumentCollection.java new file mode 100644 index 00000000000..abbe126aec3 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/MarkDuplicatesSparkArgumentCollection.java @@ -0,0 +1,23 @@ +package org.broadinstitute.hellbender.cmdline.argumentcollections; + +import org.broadinstitute.barclay.argparser.Argument; +import org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSpark; +import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; +import org.broadinstitute.hellbender.utils.read.markduplicates.OpticalDuplicateFinder; + +import java.io.Serializable; + + +/** + * An argument collection for use with tools that mark optical + * duplicates. + */ +public final class MarkDuplicatesSparkArgumentCollection implements Serializable { + private static final long serialVersionUID = 1L; + + @Argument(shortName = "DS", fullName = "DUPLICATE_SCORING_STRATEGY", doc = "The scoring strategy for choosing the non-duplicate among candidates.") + public MarkDuplicatesScoringStrategy duplicatesScoringStrategy = MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES; + + @Argument(fullName = MarkDuplicatesSpark.DO_NOT_MARK_UNMAPPED_MATES, doc = "Enabling this option will mean unmapped mates of duplicate marked reads will not be marked as duplicates.") + public boolean dontMarkUnmappedMates = false; +} \ No newline at end of file diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKRegistrator.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKRegistrator.java index 90ad1920a00..d9d2f934801 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKRegistrator.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKRegistrator.java @@ -6,8 +6,9 @@ import htsjdk.samtools.*; import org.apache.spark.serializer.KryoRegistrator; import org.bdgenomics.adam.serialization.ADAMKryoRegistrator; +import org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSparkUtils; import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; -import org.broadinstitute.hellbender.utils.read.markduplicates.PairedEnds; +import org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords.*; import java.util.Collections; @@ -77,8 +78,10 @@ private void registerGATKClasses(Kryo kryo) { kryo.register(SAMFileHeader.SortOrder.class); kryo.register(SAMProgramRecord.class); kryo.register(SAMReadGroupRecord.class); - - //register to avoid writing the full name of this class over and over - kryo.register(PairedEnds.class, new FieldSerializer<>(kryo, PairedEnds.class)); + kryo.register(EmptyFragment.class, new FieldSerializer(kryo, EmptyFragment.class)); + kryo.register(Fragment.class, new FieldSerializer(kryo, Fragment.class)); + kryo.register(Pair.class, new Pair.Serializer()); + kryo.register(Passthrough.class, new FieldSerializer(kryo, Passthrough.class)); + kryo.register(MarkDuplicatesSparkUtils.IndexPair.class, new FieldSerializer(kryo, MarkDuplicatesSparkUtils.IndexPair.class)); } } diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java index dcbb5d23a18..20c0356fcb8 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java @@ -16,7 +16,6 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction2; import org.apache.spark.broadcast.Broadcast; import org.bdgenomics.formats.avro.AlignmentRecord; @@ -27,10 +26,7 @@ import org.broadinstitute.hellbender.utils.SimpleInterval; import org.broadinstitute.hellbender.utils.gcs.BucketUtils; import org.broadinstitute.hellbender.utils.io.IOUtils; -import org.broadinstitute.hellbender.utils.read.BDGAlignmentRecordToGATKReadAdapter; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.read.ReadConstants; -import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; +import org.broadinstitute.hellbender.utils.read.*; import org.broadinstitute.hellbender.utils.spark.SparkUtils; import org.seqdoop.hadoop_bam.AnySAMInputFormat; import org.seqdoop.hadoop_bam.BAMInputFormat; @@ -211,10 +207,10 @@ public boolean accept(Path path) { /** * Ensure reads in a pair fall in the same partition (input split), if the reads are queryname-sorted, - * so they are processed together. No shuffle is needed. + * or querygroup sorted, so they are processed together. No shuffle is needed. */ JavaRDD putPairsInSamePartition(final SAMFileHeader header, final JavaRDD reads) { - if (!header.getSortOrder().equals(SAMFileHeader.SortOrder.queryname)) { + if (!ReadUtils.isReadNameGroupedBam(header)) { return reads; } int numPartitions = reads.getNumPartitions(); diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/BwaAndMarkDuplicatesPipelineSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/BwaAndMarkDuplicatesPipelineSpark.java index d3aa204f7b5..f9c0304ffab 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/BwaAndMarkDuplicatesPipelineSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/BwaAndMarkDuplicatesPipelineSpark.java @@ -8,6 +8,7 @@ import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; import org.broadinstitute.barclay.help.DocumentedFeature; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.cmdline.argumentcollections.MarkDuplicatesSparkArgumentCollection; import picard.cmdline.programgroups.ReadDataManipulationProgramGroup; import org.broadinstitute.hellbender.engine.filters.ReadFilter; import org.broadinstitute.hellbender.engine.spark.GATKSparkTool; @@ -47,20 +48,20 @@ public final class BwaAndMarkDuplicatesPipelineSpark extends GATKSparkTool { @ArgumentCollection public final BwaArgumentCollection bwaArgs = new BwaArgumentCollection(); - @Argument(shortName = "DS", fullName ="duplicates_scoring_strategy", doc = "The scoring strategy for choosing the non-duplicate among candidates.") - public MarkDuplicatesScoringStrategy duplicatesScoringStrategy = MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES; - @Argument(doc = "the output bam", shortName = StandardArgumentDefinitions.OUTPUT_SHORT_NAME, fullName = StandardArgumentDefinitions.OUTPUT_LONG_NAME) protected String output; + @ArgumentCollection + protected MarkDuplicatesSparkArgumentCollection markDuplicatesSparkArgumentCollection = new MarkDuplicatesSparkArgumentCollection(); + + @Override protected void runTool(final JavaSparkContext ctx) { try (final BwaSparkEngine bwaEngine = new BwaSparkEngine(ctx, referenceArguments.getReferenceFileName(), bwaArgs.indexImageFile, getHeaderForReads(), getReferenceSequenceDictionary())) { final ReadFilter filter = makeReadFilter(bwaEngine.getHeader()); final JavaRDD alignedReads = bwaEngine.alignPaired(getUnfilteredReads()).filter(filter::test); - final JavaRDD markedReadsWithOD = MarkDuplicatesSpark.mark(alignedReads, bwaEngine.getHeader(), duplicatesScoringStrategy, new OpticalDuplicateFinder(), getRecommendedNumReducers()); - final JavaRDD markedReads = MarkDuplicatesSpark.cleanupTemporaryAttributes(markedReadsWithOD); + final JavaRDD markedReads = MarkDuplicatesSpark.mark(alignedReads, bwaEngine.getHeader(), markDuplicatesSparkArgumentCollection.duplicatesScoringStrategy, new OpticalDuplicateFinder(), getRecommendedNumReducers(), markDuplicatesSparkArgumentCollection.dontMarkUnmappedMates); try { ReadsSparkSink.writeReads(ctx, output, referenceArguments.getReferencePath().toAbsolutePath().toUri().toString(), diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSpark.java index b083a8bd934..b9e29b9e119 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSpark.java @@ -12,6 +12,7 @@ import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; import org.broadinstitute.barclay.help.DocumentedFeature; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.cmdline.argumentcollections.MarkDuplicatesSparkArgumentCollection; import org.broadinstitute.hellbender.cmdline.programgroups.ShortVariantDiscoveryProgramGroup; import org.broadinstitute.hellbender.engine.ReadContextData; import org.broadinstitute.hellbender.engine.filters.ReadFilter; @@ -113,10 +114,10 @@ public class ReadsPipelineSpark extends GATKSparkTool { private JoinStrategy joinStrategy = JoinStrategy.BROADCAST; @ArgumentCollection - public final BwaArgumentCollection bwaArgs = new BwaArgumentCollection(); + protected MarkDuplicatesSparkArgumentCollection markDuplicatesSparkArgumentCollection = new MarkDuplicatesSparkArgumentCollection(); - @Argument(shortName = "DS", fullName ="duplicates-scoring-strategy", doc = "The scoring strategy for choosing the non-duplicate among candidates.") - public MarkDuplicatesScoringStrategy duplicatesScoringStrategy = MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES; + @ArgumentCollection + public final BwaArgumentCollection bwaArgs = new BwaArgumentCollection(); /** * all the command line arguments for BQSR and its covariates @@ -166,8 +167,7 @@ protected void runTool(final JavaSparkContext ctx) { header = getHeaderForReads(); } - final JavaRDD markedReadsWithOD = MarkDuplicatesSpark.mark(alignedReads, header, duplicatesScoringStrategy, new OpticalDuplicateFinder(), getRecommendedNumReducers()); - final JavaRDD markedReads = MarkDuplicatesSpark.cleanupTemporaryAttributes(markedReadsWithOD); + final JavaRDD markedReads = MarkDuplicatesSpark.mark(alignedReads, header, markDuplicatesSparkArgumentCollection.duplicatesScoringStrategy, new OpticalDuplicateFinder(), getRecommendedNumReducers(), markDuplicatesSparkArgumentCollection.dontMarkUnmappedMates); // The markedReads have already had the WellformedReadFilter applied to them, which // is all the filtering that MarkDupes and ApplyBQSR want. BQSR itself wants additional diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java index 905a1938a9b..5af3793e4f3 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSpark.java @@ -2,6 +2,7 @@ import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.metrics.MetricsFile; +import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -11,19 +12,25 @@ import org.broadinstitute.barclay.argparser.CommandLineProgramProperties; import org.broadinstitute.barclay.help.DocumentedFeature; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.cmdline.argumentcollections.MarkDuplicatesSparkArgumentCollection; import org.broadinstitute.hellbender.cmdline.argumentcollections.OpticalDuplicatesArgumentCollection; -import picard.cmdline.programgroups.ReadDataManipulationProgramGroup; import org.broadinstitute.hellbender.engine.filters.ReadFilter; import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary; import org.broadinstitute.hellbender.engine.spark.GATKSparkTool; +import org.broadinstitute.hellbender.utils.Utils; import org.broadinstitute.hellbender.utils.read.GATKRead; import org.broadinstitute.hellbender.utils.read.ReadUtils; +import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; import org.broadinstitute.hellbender.utils.read.markduplicates.DuplicationMetrics; import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; import org.broadinstitute.hellbender.utils.read.markduplicates.OpticalDuplicateFinder; +import picard.cmdline.programgroups.ReadDataManipulationProgramGroup; +import scala.Tuple2; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @DocumentedFeature @CommandLineProgramProperties( @@ -33,6 +40,7 @@ @BetaFeature public final class MarkDuplicatesSpark extends GATKSparkTool { private static final long serialVersionUID = 1L; + public static final String DO_NOT_MARK_UNMAPPED_MATES = "do-not-mark-unmapped-mates"; @Override public boolean requiresReads() { return true; } @@ -45,8 +53,8 @@ public final class MarkDuplicatesSpark extends GATKSparkTool { shortName = "M", fullName = "METRICS_FILE") protected String metricsFile; - @Argument(shortName = "DS", fullName = "DUPLICATE_SCORING_STRATEGY", doc = "The scoring strategy for choosing the non-duplicate among candidates.") - public MarkDuplicatesScoringStrategy duplicatesScoringStrategy = MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES; + @ArgumentCollection + protected MarkDuplicatesSparkArgumentCollection markDuplicatesSparkArgumentCollection = new MarkDuplicatesSparkArgumentCollection(); @ArgumentCollection protected OpticalDuplicatesArgumentCollection opticalDuplicatesArgumentCollection = new OpticalDuplicatesArgumentCollection(); @@ -58,13 +66,69 @@ public List getDefaultReadFilters() { public static JavaRDD mark(final JavaRDD reads, final SAMFileHeader header, final MarkDuplicatesScoringStrategy scoringStrategy, - final OpticalDuplicateFinder opticalDuplicateFinder, final int numReducers) { + final OpticalDuplicateFinder opticalDuplicateFinder, + final int numReducers, final boolean dontMarkUnmappedMates) { + + JavaPairRDD, Integer> namesOfNonDuplicates = MarkDuplicatesSparkUtils.transformToDuplicateNames(header, scoringStrategy, opticalDuplicateFinder, reads, numReducers); + + // Here we explicitly repartition the read names of the unmarked reads to match the partitioning of the original bam + final JavaRDD> repartitionedReadNames = namesOfNonDuplicates + .mapToPair(pair -> new Tuple2<>(pair._1.getIndex(), new Tuple2<>(pair._1.getValue(),pair._2))) + .partitionBy(new KnownIndexPartitioner(reads.getNumPartitions())) + .values(); + + // Here we combine the original bam with the repartitioned unmarked readnames to produce our marked reads + return reads.zipPartitions(repartitionedReadNames, (readsIter, readNamesIter) -> { + final Map namesOfNonDuplicateReadsAndOpticalCounts = Utils.stream(readNamesIter).collect(Collectors.toMap(Tuple2::_1,Tuple2::_2)); + return Utils.stream(readsIter).peek(read -> { + // Handle reads that have been marked as non-duplicates (which also get tagged with optical duplicate summary statistics) + if( namesOfNonDuplicateReadsAndOpticalCounts.containsKey(read.getName())) { + read.setIsDuplicate(false); + if (!(dontMarkUnmappedMates && read.isUnmapped())) { + int dupCount = namesOfNonDuplicateReadsAndOpticalCounts.replace(read.getName(), -1); + if (dupCount > -1) { + ((SAMRecordToGATKReadAdapter) read).setTransientAttribute(MarkDuplicatesSparkUtils.OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME, dupCount); + } + } + // Mark unmapped read pairs as non-duplicates + } else if (ReadUtils.readAndMateAreUnmapped(read)) { + read.setIsDuplicate(false); + // Everything else is a duplicate + } else{ + if (!(dontMarkUnmappedMates && read.isUnmapped())) { + read.setIsDuplicate(true); + } + } + }).iterator(); + }); + } + + /** + * A custom partitioner designed to cut down on spark shuffle costs. + * This is designed such that getPartition(key) is called on a key which corresponds to the already known target partition + * + * By storing the original partitioning for each read and passing it through the duplicates marking process it + * allows us to get away with just shuffling the small read name objects to the correct partition in the original bam + * while avoiding any shuffle of the larger read objects. + */ + private static class KnownIndexPartitioner extends Partitioner { + private static final long serialVersionUID = 1L; + private final int numPartitions; + + KnownIndexPartitioner(int numPartitions) { + this.numPartitions = numPartitions; + } - JavaRDD primaryReads = reads.filter(v1 -> !ReadUtils.isNonPrimary(v1)); - JavaRDD nonPrimaryReads = reads.filter(v1 -> ReadUtils.isNonPrimary(v1)); - JavaRDD primaryReadsTransformed = MarkDuplicatesSparkUtils.transformReads(header, scoringStrategy, opticalDuplicateFinder, primaryReads, numReducers); + @Override + public int numPartitions() { + return numPartitions; + } - return primaryReadsTransformed.union(nonPrimaryReads); + @Override + @SuppressWarnings("unchecked") + public int getPartition(Object key) { + return (Integer) key; + } } @Override @@ -73,27 +137,17 @@ protected void runTool(final JavaSparkContext ctx) { final OpticalDuplicateFinder finder = opticalDuplicatesArgumentCollection.READ_NAME_REGEX != null ? new OpticalDuplicateFinder(opticalDuplicatesArgumentCollection.READ_NAME_REGEX, opticalDuplicatesArgumentCollection.OPTICAL_DUPLICATE_PIXEL_DISTANCE, null) : null; - final JavaRDD finalReadsForMetrics = mark(reads, getHeaderForReads(), duplicatesScoringStrategy, finder, getRecommendedNumReducers()); + final SAMFileHeader header = getHeaderForReads(); + final JavaRDD finalReadsForMetrics = mark(reads, header, markDuplicatesSparkArgumentCollection.duplicatesScoringStrategy, finder, getRecommendedNumReducers(), markDuplicatesSparkArgumentCollection.dontMarkUnmappedMates); if (metricsFile != null) { - final JavaPairRDD metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics(getHeaderForReads(), finalReadsForMetrics); + final JavaPairRDD metricsByLibrary = MarkDuplicatesSparkUtils.generateMetrics( + header, finalReadsForMetrics); final MetricsFile resultMetrics = getMetricsFile(); - MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, getHeaderForReads(), metricsByLibrary, metricsFile); + MarkDuplicatesSparkUtils.saveMetricsRDD(resultMetrics, header, metricsByLibrary, metricsFile); } - - final JavaRDD finalReads = cleanupTemporaryAttributes(finalReadsForMetrics); - writeReads(ctx, output, finalReads); + header.setSortOrder(SAMFileHeader.SortOrder.coordinate); + writeReads(ctx, output, finalReadsForMetrics, header); } - - /** - * The OD attribute was added to each read for optical dups. - * Now we have to clear it to avoid polluting the output. - */ - public static JavaRDD cleanupTemporaryAttributes(final JavaRDD reads) { - return reads.map(read -> { - read.clearAttribute(MarkDuplicatesSparkUtils.OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME); - return read; - }); - } } diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java index e2c2e0262d4..c8d024d51de 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtils.java @@ -1,17 +1,23 @@ package org.broadinstitute.hellbender.tools.spark.transforms.markduplicates; +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.*; import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.metrics.MetricsFile; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.broadinstitute.hellbender.engine.filters.ReadFilterLibrary; import org.broadinstitute.hellbender.exceptions.GATKException; +import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.metrics.MetricsUtils; import org.broadinstitute.hellbender.utils.Utils; import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator; import org.broadinstitute.hellbender.utils.read.ReadUtils; +import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; import org.broadinstitute.hellbender.utils.read.markduplicates.*; +import org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords.*; import scala.Tuple2; import java.io.Serializable; @@ -25,76 +31,161 @@ public class MarkDuplicatesSparkUtils { // Used to set an attribute on the GATKRead marking this read as an optical duplicate. public static final String OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME = "OD"; + // This comparator represents the tiebreaking for PairedEnds duplicate marking. + // We compare first on score, followed by unclipped start position (which is reversed here because of the expected ordering) + private static final Comparator PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(PairedEnds::getScore) + .thenComparing(PairedEndsCoordinateComparator.INSTANCE.reversed()); + + /** + * Wrapper object used for storing an object and some type of index information. + * + * MarkDuplicates uses this object remember which partition of the original bam each read came from in order to + * efficiently zip their duplicate marked data back into the correct place without shuffling the original bam. + */ + @DefaultSerializer(FieldSerializer.class) + public static class IndexPair{ + private final T value; + private final int index; + + public T getValue() { + return value; + } + + public int getIndex() { + return index; + } + + @VisibleForTesting + IndexPair(T value, int index) { + this.value = value; + this.index = index; + } + } /** * (0) filter: remove unpaired reads and reads with an unmapped mate. * (1) keyReadsByName: label each read with its read group and read name. * (2) GroupByKey: group together reads with the same group and name. - * (3) keyPairedEndsWithAlignmentInfo: - * (a) Sort each group of reads (see GATKOrder below). - * (b) Pair consecutive reads into PairedEnds. In most cases there will only be two reads - * with the same name. TODO: explain why there might be more. + * (3) keyMarkDuplicatesSparkRecords with alignment info: + * (a) Generate a fragment or emptyFragment from each read if it's unpaired. + * (b) Pair grouped reads into MarkDuplicatesSparkRecord. In most cases there will only be two reads + * with the same name. Mapped reads missing mates will be emitted as fragments, more than two reads will cause an exception. * (c) Label each read with alignment information: Library, reference index, * stranded unclipped start and reverse strand. - * (d) Leftover reads are emitted, unmodified, as an unpaired end. - * (4) GroupByKey: Group PairedEnds that share alignment information. These pairs + * (d) Unmapped Pairs, Templates of entirely non-primary reads, etc are passed through as unmarked reads + * (4) GroupByKey: Group MarkDuplicatesSparkRecord that share alignment information. These pairs * are duplicates of each other. * (5) markDuplicatePairs: * (a) For each group created by (4), sort the pairs by score and mark all but the * highest scoring as duplicates. * (b) Determine which duplicates are optical duplicates and increase the overall count. */ - static JavaRDD transformReads(final SAMFileHeader header, final MarkDuplicatesScoringStrategy scoringStrategy, final OpticalDuplicateFinder finder, final JavaRDD reads, final int numReducers) { - - JavaPairRDD> keyedReads; - if (SAMFileHeader.SortOrder.queryname.equals(header.getSortOrder())) { - // reads are already sorted by name, so perform grouping within the partition (no shuffle) - keyedReads = spanReadsByKey(header, reads); - } else { - // sort by group and name (incurs a shuffle) - JavaPairRDD keyReadPairs = reads.mapToPair(read -> new Tuple2<>(ReadsKey.keyForRead(header, read), read)); - keyedReads = keyReadPairs.groupByKey(numReducers); - } + static JavaPairRDD, Integer> transformToDuplicateNames(final SAMFileHeader header, final MarkDuplicatesScoringStrategy scoringStrategy, final OpticalDuplicateFinder finder, final JavaRDD reads, final int numReducers) { + // we treat these specially and don't mark them as duplicates + final JavaRDD mappedReads = reads.filter(ReadFilterLibrary.MAPPED::test); + + final JavaPairRDD>> keyedReads = getReadsGroupedByName(header, mappedReads, numReducers); + + // Place all the reads into a single RDD of MarkDuplicatesSparkRecord objects + final JavaPairRDD pairedEnds = keyedReads.flatMapToPair(keyedRead -> { + final List> out = Lists.newArrayList(); + final IndexPair[] hadNonPrimaryRead = {null}; + + final List> primaryReads = Utils.stream(keyedRead._2()) + ////// Making The Fragments ////// + // Make a PairedEnd object with no second read for each fragment (and an empty one for each paired read) + .peek(readWithIndex -> { + final GATKRead read = readWithIndex.getValue(); + if (!(read.isSecondaryAlignment()||read.isSupplementaryAlignment())) { + PairedEnds fragment = (ReadUtils.readHasMappedMate(read)) ? + MarkDuplicatesSparkRecord.newEmptyFragment(read, header) : + MarkDuplicatesSparkRecord.newFragment(read, header, readWithIndex.getIndex(), scoringStrategy); + + out.add(new Tuple2<>(fragment.key(), fragment)); + } else { + hadNonPrimaryRead[0] = readWithIndex; + } + }) + .filter(readWithIndex -> ReadUtils.readHasMappedMate(readWithIndex.getValue())) - JavaPairRDD> keyedPairs = keyedReads.flatMapToPair(keyedRead -> { - List> out = Lists.newArrayList(); - // Write each read out as a pair with only the first slot filled - for (GATKRead read : keyedRead._2()) { - read.setIsDuplicate(false); - final PairedEnds pair = PairedEnds.of(read); - out.add(new Tuple2<>(pair.keyForFragment(header), pair)); - } + ////// Making The Paired Reads ////// // Write each paired read with a mapped mate as a pair - final List sorted = Lists.newArrayList(Iterables.filter(keyedRead._2(), read -> ReadUtils.readHasMappedMate(read))); - sorted.sort(new GATKOrder(header)); - PairedEnds pair = null; - //Records are sorted, we iterate over them and pair them up. - for (final GATKRead record : sorted) { - if (pair == null) { //first in pair - pair = PairedEnds.of(record); - } else { //second in pair - pair.and(record); - out.add(new Tuple2<>(pair.key(header), pair)); - pair = null; //back to first + .filter(indexPair -> !(indexPair.getValue().isSecondaryAlignment()||indexPair.getValue().isSupplementaryAlignment())) + .collect(Collectors.toList()); + + // Mark duplicates cant properly handle templates with more than two reads in a pair + if (primaryReads.size()>2) { + throw new UserException.UnimplementedFeature(String.format("MarkDuplicatesSpark only supports singleton fragments and pairs. We found the following group with >2 primary reads: ( %d number of reads)." + + " \n%s.", primaryReads.size(),primaryReads.stream().map(Object::toString).collect(Collectors.joining("\n")))); + + // If there are two primary reads in the group pass them as a pair + } else if (primaryReads.size()==2) { + final IndexPair firstRead = primaryReads.get(0); + final IndexPair secondRead = primaryReads.get(1); + final PairedEnds pair = MarkDuplicatesSparkRecord.newPair(firstRead.getValue(), secondRead.getValue(), header, secondRead.getIndex(), scoringStrategy); + out.add(new Tuple2<>(pair.key(), pair)); + + // If there is one paired read in the template this probably means the bam is missing its mate, don't duplicate mark it + } else if (primaryReads.size()==1) { + final IndexPair firstRead = primaryReads.get(0); + final MarkDuplicatesSparkRecord pass = MarkDuplicatesSparkRecord.getPassthrough(firstRead.getValue(), firstRead.getIndex()); + out.add(new Tuple2<>(pass.key(), pass)); + + // else that means there are no non-secondary or supplementary reads, thus we want the group to pass through through unmarked + } else { + if (hadNonPrimaryRead[0] !=null) { + final MarkDuplicatesSparkRecord pass = MarkDuplicatesSparkRecord.getPassthrough((GATKRead)hadNonPrimaryRead[0].getValue(), hadNonPrimaryRead[0].getIndex()); + out.add(new Tuple2<>(pass.key(), pass)); } } - if (pair != null) { //left over read - out.add(new Tuple2<>(pair.key(header), pair)); - } + return out.iterator(); - }).groupByKey(numReducers); + }); + + final JavaPairRDD> keyedPairs = pairedEnds.groupByKey(); //TODO evaluate replacing this with a smart aggregate by key. + + return markDuplicateRecords(keyedPairs, finder); + } - return markPairedEnds(keyedPairs, scoringStrategy, finder, header); + /** + * Method that ensures the reads are grouped together keyed by their readname groups with indexpairs represeting the source partition. + * If the bam is querygrouped/queryname sorted then it calls spanReadsByKey to perform the mapping operation + * If the bam is sorted in some other way it performs a groupBy operation on the key + */ + private static JavaPairRDD>> getReadsGroupedByName(SAMFileHeader header, JavaRDD reads, int numReducers) { + + final JavaPairRDD>> keyedReads; + final JavaRDD> indexedReads = reads.mapPartitionsWithIndex( + (index, iter) -> Utils.stream(iter).map(read -> { + if (!(read.getClass() == SAMRecordToGATKReadAdapter.class)) { + throw new GATKException(String.format("MarkDuplicatesSpark currently only supports SAMRecords as an underlying reads data source class, %s found instead", + read.getClass().toString())); + } + return new IndexPair<>(read, index);}).iterator(), false); + if (ReadUtils.isReadNameGroupedBam(header)) { + // reads are already grouped by name, so perform grouping within the partition (no shuffle) + keyedReads = spanReadsByKey(indexedReads); + } else { + // sort by group and name (incurs a shuffle) + JavaPairRDD> keyReadPairs = indexedReads.mapToPair(read -> new Tuple2<>(ReadsKey.keyForRead( + read.getValue()), read)); + keyedReads = keyReadPairs.groupByKey(numReducers); + } + return keyedReads; } - static JavaPairRDD> spanReadsByKey(final SAMFileHeader header, final JavaRDD reads) { - JavaPairRDD nameReadPairs = reads.mapToPair(read -> new Tuple2<>(read.getName(), read)); + /** + * Method which takes an RDD of reads that is guaranteed to have every readname group placed together on the same + * partition and maps those so a JavaPairRDD with the readname as the key. + */ + private static JavaPairRDD>> spanReadsByKey(final JavaRDD> reads) { + JavaPairRDD> nameReadPairs = reads.mapToPair(read -> new Tuple2<>(read.getValue().getName(), read)); return spanByKey(nameReadPairs).flatMapToPair(namedRead -> { // for each name, separate reads by key (group name) - List>> out = Lists.newArrayList(); - ListMultimap multi = LinkedListMultimap.create(); - for (GATKRead read : namedRead._2()) { - multi.put(ReadsKey.keyForRead(header, read), read); + List>>> out = Lists.newArrayList(); + ListMultimap> multi = LinkedListMultimap.create(); + for (IndexPair read : namedRead._2()) { + multi.put(ReadsKey.keyForRead(read.getValue()), read); } for (String key : multi.keySet()) { // list from Multimap is not serializable by Kryo, so put in a new array list @@ -112,8 +203,8 @@ static JavaPairRDD> spanReadsByKey(final SAMFileHeade * @param type of values * @return an RDD where each the values for each key are grouped into an iterable collection */ - static JavaPairRDD> spanByKey(JavaPairRDD rdd) { - return rdd.mapPartitionsToPair(iter -> spanningIterator(iter)); + private static JavaPairRDD> spanByKey(JavaPairRDD rdd) { + return rdd.mapPartitionsToPair(MarkDuplicatesSparkUtils::spanningIterator); } /** @@ -153,71 +244,113 @@ protected Tuple2> computeNext() { }; } - static JavaRDD markPairedEnds(final JavaPairRDD> keyedPairs, - final MarkDuplicatesScoringStrategy scoringStrategy, - final OpticalDuplicateFinder finder, final SAMFileHeader header) { - return keyedPairs.flatMap(keyedPair -> { - Iterable pairedEnds = keyedPair._2(); - final ImmutableListMultimap paired = Multimaps.index(pairedEnds, pair -> pair.second() != null); - - // Each key corresponds to either fragments or paired ends, not a mixture of both. - - if (ReadsKey.isFragment(keyedPair._1())) { // fragments - return handleFragments(pairedEnds, scoringStrategy, header).iterator(); - } + /** + * Primary landing point for MarkDuplicateSparkRecords: + * - Handles separating out hashed keys into into groups by start position/readgroup + * - Further separates out MarkDuplicatesSparkRecord by their record objects + * - Farms out to methods which handles each group + * - Collects the results and returns an iterator + */ + @SuppressWarnings("unchecked") + private static JavaPairRDD, Integer> markDuplicateRecords(final JavaPairRDD> keyedPairs, + final OpticalDuplicateFinder finder) { + return keyedPairs.flatMapToPair(keyedPair -> { + Iterable pairGroups = keyedPair._2(); + + final List, Integer>> nonDuplicates = Lists.newArrayList(); + + //since we grouped by a non-unique hash code for efficiency we need to regroup by the actual criteria + //todo this should use library and contig as well probably + //todo these should all be one traversal over the records) + final Collection> groups = Utils.stream(pairGroups) + .collect(Collectors.groupingBy(MarkDuplicatesSparkUtils::getGroupKey)).values(); + + for (List duplicateGroup : groups) { + final Map> stratifiedByType = splitByType(duplicateGroup); + + // Each key corresponds to either fragments or paired ends, not a mixture of both. + final List emptyFragments = stratifiedByType.get(MarkDuplicatesSparkRecord.Type.EMPTY_FRAGMENT); + final List fragments = stratifiedByType.get(MarkDuplicatesSparkRecord.Type.FRAGMENT); + final List pairs = (List) (List)(stratifiedByType.get(MarkDuplicatesSparkRecord.Type.PAIR)); + final List passthroughs = stratifiedByType.get(MarkDuplicatesSparkRecord.Type.PASSTHROUGH); + + //empty MarkDuplicatesSparkRecord signify that a pair has a mate somewhere else + // If there are any non-fragment placeholders at this site, mark everything as duplicates, otherwise compute the best score + if (Utils.isNonEmpty(fragments) && !Utils.isNonEmpty(emptyFragments)) { + final Tuple2, Integer> bestFragment = handleFragments(fragments); + nonDuplicates.add(bestFragment); + } - List out = Lists.newArrayList(); + if (Utils.isNonEmpty(pairs)) { + nonDuplicates.add(handlePairs(pairs, finder)); + } - // As in Picard, unpaired ends left alone. - for (final PairedEnds pair : paired.get(false)) { - out.add(pair.first()); + if (Utils.isNonEmpty(passthroughs)) { + nonDuplicates.addAll(handlePassthroughs(passthroughs)); + } } - // Order by score using ReadCoordinateComparator for tie-breaking. - Comparator pairedEndsComparator = - Comparator.comparing(pe -> pe.score(scoringStrategy)).reversed() - .thenComparing((o1, o2) -> new ReadCoordinateComparator(header).compare(o1.first(), o2.first())); - final List scored = paired.get(true).stream().sorted(pairedEndsComparator).collect(Collectors.toList()); - - final PairedEnds best = Iterables.getFirst(scored, null); - if (best == null) { - return out.iterator(); - } + return nonDuplicates.iterator(); + }); + } - // Mark everyone who's not best as a duplicate - for (final PairedEnds pair : Iterables.skip(scored, 1)) { - pair.first().setIsDuplicate(true); - pair.second().setIsDuplicate(true); - } + // Note, this uses bitshift operators in order to perform only a single groupBy operation for all the merged data + private static long getGroupKey(MarkDuplicatesSparkRecord record) { + return record.getClass()==Passthrough.class?-1: + (((long)((PairedEnds)record).getUnclippedStartPosition()) << 32 | + ((PairedEnds)record).getFirstRefIndex() << 16 ); + //| ((PairedEnds)pe).getLibraryIndex())).values(); + } - // Now, add location information to the paired ends - for (final PairedEnds pair : scored) { - // Both elements in the pair have the same name - finder.addLocationInformation(pair.first().getName(), pair); - } + /** + * split MarkDuplicatesSparkRecord into groups by their type + */ + private static Map> splitByType(List duplicateGroup) { + final EnumMap> byType = new EnumMap<>(MarkDuplicatesSparkRecord.Type.class); + for(MarkDuplicatesSparkRecord pair: duplicateGroup) { + byType.compute(pair.getType(), (key, value) -> { + if (value == null) { + final ArrayList pairedEnds = new ArrayList<>(); + pairedEnds.add(pair); + return pairedEnds; + } else { + value.add(pair); + return value; + } + }); + } + return byType; + } - // This must happen last, as findOpticalDuplicates mutates the list. - // Split by orientation and count duplicates in each group separately. - final ImmutableListMultimap groupByOrientation = Multimaps.index(scored, pe -> pe.getOrientationForOpticalDuplicates()); - final int numOpticalDuplicates; - if (groupByOrientation.containsKey(ReadEnds.FR) && groupByOrientation.containsKey(ReadEnds.RF)){ - final List peFR = new ArrayList<>(groupByOrientation.get(ReadEnds.FR)); - final List peRF = new ArrayList<>(groupByOrientation.get(ReadEnds.RF)); - numOpticalDuplicates = countOpticalDuplicates(finder, peFR) + countOpticalDuplicates(finder, peRF); - } else { - numOpticalDuplicates = countOpticalDuplicates(finder, scored); - } - best.first().setAttribute(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME, numOpticalDuplicates); + private static List,Integer>> handlePassthroughs(List passthroughs) { + // Emit the passthrough reads as non-duplicates. + return passthroughs.stream() + .map(pair -> new Tuple2<>(new IndexPair<>(pair.getName(), pair.getPartitionIndex()), -1)) + .collect(Collectors.toList()); + } - for (final PairedEnds pair : scored) { - out.add(pair.first()); - out.add(pair.second()); - } - return out.iterator(); - }); + private static Tuple2, Integer> handlePairs(List pairs, OpticalDuplicateFinder finder) { + final MarkDuplicatesSparkRecord bestPair = pairs.stream() + .max(PAIRED_ENDS_SCORE_COMPARATOR) + .orElseThrow(() -> new GATKException.ShouldNeverReachHereException("There was no best pair because the stream was empty, but it shouldn't have been empty.")); + + // Split by orientation and count duplicates in each group separately. + final Map> groupByOrientation = pairs.stream() + .peek(pair -> finder.addLocationInformation(pair.getName(), pair))//TODO this needs me to handle the name better + .collect(Collectors.groupingBy(Pair::getOrientationForOpticalDuplicates)); + final int numOpticalDuplicates; + //todo do we not have to split the reporting of these by orientation? + if (groupByOrientation.containsKey(ReadEnds.FR) && groupByOrientation.containsKey(ReadEnds.RF)) { + final List peFR = new ArrayList<>(groupByOrientation.get(ReadEnds.FR)); + final List peRF = new ArrayList<>(groupByOrientation.get(ReadEnds.RF)); + numOpticalDuplicates = countOpticalDuplicates(finder, peFR) + countOpticalDuplicates(finder, peRF); + } else { + numOpticalDuplicates = countOpticalDuplicates(finder, pairs); + } + return (new Tuple2<>(new IndexPair<>(bestPair.getName(), bestPair.getPartitionIndex()), numOpticalDuplicates)); } - private static int countOpticalDuplicates(OpticalDuplicateFinder finder, List scored) { + private static int countOpticalDuplicates(OpticalDuplicateFinder finder, List scored) { final boolean[] opticalDuplicateFlags = finder.findOpticalDuplicates(scored); int numOpticalDuplicates = 0; for (final boolean b : opticalDuplicateFlags) { @@ -228,37 +361,17 @@ private static int countOpticalDuplicates(OpticalDuplicateFinder finder, List handleFragments(Iterable pairedEnds, final MarkDuplicatesScoringStrategy scoringStrategy, final SAMFileHeader header) { - List reads = Lists.newArrayList(); - - final Iterable transform = Iterables.transform(pairedEnds, pair -> pair.first()); - Iterable readsCopy = Iterables.transform(transform, GATKRead::copy); - final Map> byPairing = Utils.stream(readsCopy).collect(Collectors.partitioningBy( - read -> ReadUtils.readHasMappedMate(read) - )); - // Note the we emit only fragments from this mapper. - if (byPairing.get(true).isEmpty()) { - // There are no paired reads, mark all but the highest scoring fragment as duplicate. - Comparator fragmentsComparator = Comparator.comparing(read -> scoringStrategy.score(read)).reversed().thenComparing(new ReadCoordinateComparator(header)); - List frags = byPairing.get(false).stream().sorted(fragmentsComparator).collect(Collectors.toList()); - if (!frags.isEmpty()) { - reads.add(frags.get(0)); //highest score - just emit - for (final GATKRead record : Iterables.skip(frags, 1)) { //lower scores - mark as dups and emit - record.setIsDuplicate(true); - reads.add(record); - } - } - } else { - // There are paired ends so we mark all fragments as duplicates. - for (final GATKRead record : byPairing.get(false)) { - record.setIsDuplicate(true); - reads.add(record); - } - } - return reads; + /** + * If there are fragments with no non-fragments overlapping at a site, select the best one according to PAIRED_ENDS_SCORE_COMPARATOR + */ + private static Tuple2, Integer> handleFragments(List duplicateFragmentGroup) { + return duplicateFragmentGroup.stream() + .map(f -> (Fragment)f) + .max(PAIRED_ENDS_SCORE_COMPARATOR) + .map(best -> new Tuple2<>(new IndexPair<>(best.getName(), best.getPartitionIndex()), -1)) + .orElse(null); } - static JavaPairRDD generateMetrics(final SAMFileHeader header, final JavaRDD reads) { return reads.filter(read -> !read.isSecondaryAlignment() && !read.isSupplementaryAlignment()) .mapToPair(read -> { @@ -280,9 +393,12 @@ static JavaPairRDD generateMetrics(final SAMFileHead ++metrics.READ_PAIR_DUPLICATES; } } - if (read.hasAttribute(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME)) { + // NOTE: we use the SAMRecord transientAttribute field here specifically to prevent the already + // serialized read from being parsed again here for performance reasons. + if (((SAMRecordToGATKReadAdapter) read).getTransientAttribute(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME)!=null) { + // NOTE: there is a safety check above in getReadsGroupedByName() metrics.READ_PAIR_OPTICAL_DUPLICATES += - read.getAttributeAsInteger(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME); + (int)((SAMRecordToGATKReadAdapter) read).getTransientAttribute(OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME); } return new Tuple2<>(library, metrics); }) @@ -331,7 +447,7 @@ public static void saveMetricsRDD(final MetricsFile final List sortedListOfLibraryNames = new ArrayList<>(Sets.union(emptyMapByLibrary.keySet(), nonEmptyMetricsByLibrary.keySet())); sortedListOfLibraryNames.sort(Utils.COMPARE_STRINGS_NULLS_FIRST); - for (final String library : sortedListOfLibraryNames){ + for (final String library : sortedListOfLibraryNames) { //if a non-empty exists, take it, otherwise take from the the empties. This is done to include libraries with zero data in them. //But not all libraries are listed in the header (esp in testing data) so we union empty and non-empty final DuplicationMetrics metricsToAdd = nonEmptyMetricsByLibrary.containsKey(library) ? nonEmptyMetricsByLibrary.get(library) : emptyMapByLibrary.get(library); @@ -347,58 +463,61 @@ public static void saveMetricsRDD(final MetricsFile } /** - * GATKRead comparator that compares based on mapping position followed by SAM flags. + * Comparator for sorting Reads by coordinate. Note that a header is required in + * order to meaningfully compare contigs. + * + * Uses the various other fields in a read to break ties for reads that share + * the same location. + * + * Ordering is not the same as {@link htsjdk.samtools.SAMRecordCoordinateComparator}. + * It compares two pairedEnds objects by their first reads according to first their clipped start positions + * (they were matched together based on UnclippedStartOriginally), then the orientation of the strand, followed by + * the readname lexicographical order. + * + * NOTE: Because the original records were grouped by readname, we know that they must be unique once they hit this + * comparator and thus we don't need to worry about further tiebreaking for this method. */ - final static class GATKOrder implements Comparator, Serializable { - private static final long serialVersionUID = 1l; - private final SAMFileHeader header; - // TODO: Unify with other comparators in the codebase + public static final class PairedEndsCoordinateComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; - public GATKOrder(final SAMFileHeader header) { - this.header = header; - } + public static final PairedEndsCoordinateComparator INSTANCE = new PairedEndsCoordinateComparator(); + private PairedEndsCoordinateComparator() { } @Override - public int compare(final GATKRead lhs, final GATKRead rhs) { - if (rhs == lhs) return 0; //shortcut - - final int res1 = Integer.compare(ReadUtils.getReferenceIndex(lhs, header), ReadUtils.getReferenceIndex(rhs, header)); - if (res1 != 0) return res1; - - final int res2 = Long.compare(lhs.getStart(), rhs.getStart()); - if (res2 != 0) return res2; - - final int res3 = Boolean.compare(lhs.isDuplicate(), rhs.isDuplicate()); - if (res3 != 0) return res3; - - final int res4 = Boolean.compare(lhs.failsVendorQualityCheck(), rhs.failsVendorQualityCheck()); - if (res4 != 0) return res4; - - final int res5 = Boolean.compare(lhs.isPaired(), rhs.isPaired()); - if (res5 != 0) return res5; - - final int res6 = Boolean.compare(lhs.isProperlyPaired(), rhs.isProperlyPaired()); - if (res6 != 0) return res6; + public int compare( PairedEnds first, PairedEnds second ) { + int result = compareCoordinates(first, second); + if ( result != 0 ) { + return result; + } - //Note: negate the result because we want first-of-pair to be before second - //ie, want 'second' to be sorted after first, so want to return -1 for (true, false) - final int res7 = -Boolean.compare(lhs.isFirstOfPair(), rhs.isFirstOfPair()); - if (res7 != 0) return res7; + //This is done to mimic SAMRecordCoordinateComparator's behavior + if (first.isR1R() != second.isR1R()) { + return first.isR1R() ? -1: 1; + } - final int res8 = Boolean.compare(lhs.isSecondaryAlignment(), rhs.isSecondaryAlignment()); - if (res8 != 0) return res8; + if ( first.getName() != null && second.getName() != null ) { + result = first.getName().compareTo(second.getName()); + } + return result; + } - final int res9 = Boolean.compare(lhs.isSupplementaryAlignment(), rhs.isSupplementaryAlignment()); - if (res9 != 0) return res9; + public static int compareCoordinates(final PairedEnds first, final PairedEnds second ) { + final int firstRefIndex = first.getFirstRefIndex(); + final int secondRefIndex = second.getFirstRefIndex(); - final int res10 = Integer.compare(lhs.getMappingQuality(), rhs.getMappingQuality()); - if (res10 != 0) return res10; + if ( firstRefIndex == -1 ) { + return (secondRefIndex == -1 ? 0 : 1); + } + else if ( secondRefIndex == -1 ) { + return -1; + } - final int res11 = Integer.compare(ReadUtils.getMateReferenceIndex(lhs, header), ReadUtils.getMateReferenceIndex(rhs, header)); - if (res11 != 0) return res11; + final int refIndexDifference = firstRefIndex - secondRefIndex; + if ( refIndexDifference != 0 ) { + return refIndexDifference; + } - final int res12 = Long.compare(lhs.getMateStart(), rhs.getMateStart()); - return res12; + return Integer.compare(first.getFirstStartPosition(), second.getFirstStartPosition()); } } } \ No newline at end of file diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java index 1833962ee78..d7bbb25c37d 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java @@ -136,9 +136,17 @@ protected void runTool(final JavaSparkContext ctx) { Broadcast bHeader = ctx.broadcast(getHeaderForReads()); // Group the reads of each BAM by MarkDuplicates key, then pair up the the reads for each BAM. - JavaPairRDD firstKeyed = firstReads.mapToPair(read -> new Tuple2<>(ReadsKey.keyForFragment(bHeader.getValue(), read), read)); - JavaPairRDD secondKeyed = secondReads.mapToPair(read -> new Tuple2<>(ReadsKey.keyForFragment(bHeader.getValue(), read), read)); - JavaPairRDD, Iterable>> cogroup = firstKeyed.cogroup(secondKeyed, getRecommendedNumReducers()); + JavaPairRDD firstKeyed = firstReads.mapToPair(read -> new Tuple2<>(ReadsKey.hashKeyForFragment( + ReadUtils.getStrandedUnclippedStart(read), + read.isReverseStrand(), + ReadUtils.getReferenceIndex(read,bHeader.getValue()), + ReadUtils.getLibrary(read, bHeader.getValue())), read)); + JavaPairRDD secondKeyed = secondReads.mapToPair(read -> new Tuple2<>(ReadsKey.hashKeyForFragment( + ReadUtils.getStrandedUnclippedStart(read), + read.isReverseStrand(), + ReadUtils.getReferenceIndex(read,bHeader.getValue()), + ReadUtils.getLibrary(read, bHeader.getValue())), read)); + JavaPairRDD, Iterable>> cogroup = firstKeyed.cogroup(secondKeyed, getRecommendedNumReducers()); // Produces an RDD of MatchTypes, e.g., EQUAL, DIFFERENT_REPRESENTATIVE_READ, etc. per MarkDuplicates key, diff --git a/src/main/java/org/broadinstitute/hellbender/utils/Utils.java b/src/main/java/org/broadinstitute/hellbender/utils/Utils.java index 45382531891..0dd111cc4b0 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/Utils.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/Utils.java @@ -619,6 +619,16 @@ public static > T nonEmpty(T collection, String messa } } + /** + * Checks that a {@link Collection} is not {@code null} and that it is not empty. + * If it's non-null and non-empty it returns the true + * @param collection any Collection + * @return true if the collection exists and has elements + */ + public static boolean isNonEmpty(Collection collection){ + return collection != null && !collection.isEmpty(); + } + /** * Checks that a {@link String} is not {@code null} and that it is not empty. * If it's non-null and non-empty it returns the input, otherwise it throws an {@link IllegalArgumentException} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/ReadUtils.java b/src/main/java/org/broadinstitute/hellbender/utils/read/ReadUtils.java index d389027a82f..7060716174d 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/read/ReadUtils.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/ReadUtils.java @@ -386,6 +386,25 @@ public static OptionalInt getOptionalIntAttribute(final GATKRead read, final Str return obj == null ? OptionalInt.empty() : OptionalInt.of(obj); } + /** + * Helper method for interrogating if a read and its mate (if it exists) are unmapped + * @param read a read with mate information to interrogate + * @return true if this read and its are unmapped + */ + public static boolean readAndMateAreUnmapped(GATKRead read) { + return read.isUnmapped() && (!read.isPaired() || read.mateIsUnmapped()); + } + + /** + * Interrogates the header to determine if the bam is expected to be sorted such that reads with the same name appear in order. + * This can correspond to either a queryname sorted bam or a querygrouped bam (unordered readname groups) + * @param header header corresponding to the bam file in question + * @return true if the header has has the right readname group + */ + public static boolean isReadNameGroupedBam(SAMFileHeader header) { + return SAMFileHeader.SortOrder.queryname.equals(header.getSortOrder()) || SAMFileHeader.GroupOrder.query.equals(header.getGroupOrder()); + } + /** * A marker to tell which end of the read has been clipped */ diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/SAMRecordToGATKReadAdapter.java b/src/main/java/org/broadinstitute/hellbender/utils/read/SAMRecordToGATKReadAdapter.java index 4e240df8a8d..e8959596f28 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/read/SAMRecordToGATKReadAdapter.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/SAMRecordToGATKReadAdapter.java @@ -717,4 +717,26 @@ public void setHeader(SAMFileHeader header) { samRecord.setHeaderStrict(header); } + + /** + * This is used to access the transient attribute store in the underlying SAMRecord. + * + * NOTE: This is an advanced use case for SAMRecord and you should probably use setAttribute() instead + * @param key key whose value is to be retrived + */ + public Object getTransientAttribute(Object key) { + return samRecord.getTransientAttribute(key); + } + + /** + * This is used to access the transient attribute store in the underlying SAMRecord. This is used to store temporary + * attributes that will not be serialized and that do not trigger the SAMRecord to parse the attributes if they are not needed. + * + * NOTE: This is an advanced use case for SAMRecord and you should probably use setAttribute() instead + * @param key key under which the value will be stored + * @param value value to be keyed + */ + public void setTransientAttribute(Object key, Object value) { + samRecord.setTransientAttribute(key, value); + } } diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/PairedEnds.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/PairedEnds.java deleted file mode 100644 index 5c33ecb32a5..00000000000 --- a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/PairedEnds.java +++ /dev/null @@ -1,130 +0,0 @@ -package org.broadinstitute.hellbender.utils.read.markduplicates; - -import htsjdk.samtools.SAMFileHeader; -import org.broadinstitute.hellbender.utils.read.GATKRead; -import org.broadinstitute.hellbender.utils.read.ReadUtils; - -/** - * Struct-like class to store information about the paired reads for mark duplicates. - */ -public class PairedEnds implements OpticalDuplicateFinder.PhysicalLocation { - private GATKRead first, second; - - // Information used to detect optical dupes - public short readGroup = -1; - public short tile = -1; - public short x = -1, y = -1; - public short libraryId = -1; - - PairedEnds(final GATKRead first) { - this.first = first; - } - - public static PairedEnds of(final GATKRead first) { - return new PairedEnds(first); - } - - public PairedEnds and(final GATKRead second) { - if (second != null && - ReadUtils.getStrandedUnclippedStart(first) > ReadUtils.getStrandedUnclippedStart(second)) { - this.second = this.first; - this.first = second; - } else { - this.second = second; - } - return this; - } - - public String key(final SAMFileHeader header) { - return ReadsKey.keyForPairedEnds(header, first, second); - } - - public String keyForFragment(final SAMFileHeader header) { - return ReadsKey.keyForFragment(header, first); - } - - public GATKRead first() { - return first; - } - - public GATKRead second() { - return second; - } - - public int score(final MarkDuplicatesScoringStrategy scoringStrategy) { - return scoringStrategy.score(first) + scoringStrategy.score(second); - } - - @Override - public short getReadGroup() { return this.readGroup; } - - @Override - public void setReadGroup(final short readGroup) { this.readGroup = readGroup; } - - @Override - public short getTile() { return this.tile; } - - @Override - public void setTile(final short tile) { this.tile = tile; } - - @Override - public short getX() { return this.x; } - - @Override - public void setX(final short x) { this.x = x; } - - @Override - public short getY() { return this.y; } - - @Override - public void setY(final short y) { this.y = y; } - - @Override - public short getLibraryId() { return this.libraryId; } - - @Override - public void setLibraryId(final short libraryId) { this.libraryId = libraryId; } - /** - * returns a deep(ish) copy of the GATK reads in the PairedEnds. - * TODO: This is only deep for the Google Model read, GATKRead copy() isn't deep for - * TODO: for the SAMRecord backed read. - * @return a new deep copy - */ - public PairedEnds copy() { - if (second == null) { - return new PairedEnds(first.copy()); - } - return new PairedEnds(first.copy()).and(second.copy()); - } - - /** - * Returns the pair orientation suitable for optical duplicates, - * which always goes by the first then the second end for the strands. - * This is based on code in MarkDuplicatesGATK and ReadEnds.getOrientationByte. - * Returns one of {@link ReadEnds#RR}, {@link ReadEnds#RF}, {@link ReadEnds#FR}, {@link ReadEnds#FF} - */ - public byte getOrientationForOpticalDuplicates() { - final GATKRead read1; - final GATKRead read2; - if (first.isFirstOfPair()){ - read1 = first; - read2 = second; - } else { - read1 = second; - read2 = first; - } - - final boolean R1R = read1.isReverseStrand(); - final boolean R2R = read2.isReverseStrand(); - if (R1R && R2R) { - return ReadEnds.RR; - } - if (R1R) { - return ReadEnds.RF; //at this point we know for sure R2R is false - } - if (R2R) { - return ReadEnds.FR; //at this point we know for sure R1R is false - } - return ReadEnds.FF; //at this point we know for sure R1R is false and R2R is false - } -} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKey.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKey.java index fe6004b5742..1d5fd9f93af 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKey.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKey.java @@ -6,65 +6,49 @@ /** * Encodes a unique key for read, read pairs and fragments. Used to identify duplicates for MarkDuplicatesGATK. + * + * This class was changed to primarily operate on key hashing instead of generating long string keys as it was discovered + * that it had performance implications for serialization in MarkDuplicatesSpark. */ public final class ReadsKey { - public static final String FRAGMENT_PREFIX = "f|"; - - private static final String PAIRED_ENDS_PREFIX = "p|"; - - /** - * Makes a unique key for the fragment. - */ - public static String keyForFragment(final SAMFileHeader header, final GATKRead read) { - return FRAGMENT_PREFIX + subkeyForFragment(header, read); - } - /** - * Makes a unique key for the fragment (excluding the prefix). + * Makes a hash key for the fragment. */ - private static String subkeyForFragment(final SAMFileHeader header, final GATKRead read) { - final String library = ReadUtils.getLibrary(read, header); + public static int hashKeyForFragment(int strandedUnclippedStart, boolean reverseStrand, int referenceIndex, String library) { - return String.format( - "%s|%d|%d|%s", - library != null ? library : "-", - ReadUtils.getReferenceIndex(read, header), - ReadUtils.getStrandedUnclippedStart(read), - read.isReverseStrand() ? "r" : "f"); + int key = library != null ? library.hashCode() : 1; + key = key * 31 + referenceIndex; + key = key * 31 + strandedUnclippedStart; + return key * 31 + (reverseStrand ? 0 : 1); } /** - * Makes a unique key for the paired reads. + * Makes a hash key for the paired reads. */ - public static String keyForPairedEnds(final SAMFileHeader header, final GATKRead first, final GATKRead second) { - final String key = subkeyForFragment(header, first); + public static int hashKeyForPair(final SAMFileHeader header, final GATKRead first, final GATKRead second) { + int key = hashKeyForFragment(ReadUtils.getStrandedUnclippedStart(first), first.isReverseStrand(), + ReadUtils.getReferenceIndex(first, header), ReadUtils.getLibrary(first, header)); if (second == null) { - return PAIRED_ENDS_PREFIX + key; + return key; } - return String.format( - PAIRED_ENDS_PREFIX + "%s|%d|%d|%s", - key, - ReadUtils.getReferenceIndex(second, header), - ReadUtils.getStrandedUnclippedStart(second), - second.isReverseStrand() ? "r" : "f"); + key = 31 * key + ReadUtils.getReferenceIndex(second, header); + key = 31 * key + ReadUtils.getStrandedUnclippedStart(second); + return 31 * key + (second.isReverseStrand() ? 0 : 1); } /** * Makes a unique key for the read. */ - public static String keyForRead(final SAMFileHeader header, final GATKRead read) { - return String.format( - "%s|%s", - read.getReadGroup(), - read.getName()); + public static String keyForRead(final GATKRead read) { + return read.getName(); } /** - * Returns true if the key is a fragment key. + * Makes a hash key for the read. */ - public static boolean isFragment(String key) { - return key.startsWith(FRAGMENT_PREFIX); + public static int hashKeyForRead(final GATKRead read) { + return read.getName().hashCode(); } } diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/EmptyFragment.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/EmptyFragment.java new file mode 100644 index 00000000000..a175037a764 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/EmptyFragment.java @@ -0,0 +1,74 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords; + +import htsjdk.samtools.SAMFileHeader; +import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.ReadUtils; +import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey; + +/** + * Dummy class representing a mated read fragment at a particular start position to be used for accounting + * when deciding whether to duplicate unmatched fragments. + * + * This class holds onto as little information as possible in an attempt to prevent excessive serialization of + * during the processing step of MarkDuplicatesSpark + */ +public final class EmptyFragment extends PairedEnds { + protected transient int key; + + private final int firstUnclippedStartPosition; + private final int firstStartPosition; + private final short firstRefIndex; + private final boolean R1R; + + /** + * special constructor for creating empty fragments + * this only includes the necessary data to locate the read, the rest is unnecessary because it will appear in the paired bucket + * + */ + public EmptyFragment(GATKRead read, SAMFileHeader header) { + super(0, null); + + this.firstUnclippedStartPosition = ReadUtils.getStrandedUnclippedStart(read); + this.firstRefIndex = (short)ReadUtils.getReferenceIndex(read, header); + this.R1R = read.isReverseStrand(); + firstStartPosition = 0; + this.key = ReadsKey.hashKeyForFragment(firstUnclippedStartPosition, + isR1R(), + firstRefIndex, + ReadUtils.getLibrary(read, header)); + } + + @Override + public Type getType() { + return Type.EMPTY_FRAGMENT; + } + @Override + public int getScore() { + return 0; + } + @Override + // NOTE: This is transient and thus may not exist if the object gets serialized + public int key() { + return key; + } + @Override + public int getUnclippedStartPosition() { + return firstUnclippedStartPosition; + } + @Override + public int getFirstStartPosition() { + return firstStartPosition; + } + @Override + public boolean isR1R() { + return R1R; + } + @Override + public int getFirstRefIndex() { + return firstRefIndex; + } + @Override + public String toString() { + return "EmptyFragment " + firstUnclippedStartPosition; + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Fragment.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Fragment.java new file mode 100644 index 00000000000..34461b037b6 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Fragment.java @@ -0,0 +1,72 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords; + +import htsjdk.samtools.SAMFileHeader; +import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.ReadUtils; +import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; +import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey; + +/** + * Class representing a single read fragment at a particular start location without a mapped mate. + * + * This class holds onto as little information as possible in an attempt to prevent excessive serialization of + * during the processing step of MarkDuplicatesSpark + */ +public class Fragment extends PairedEnds { + protected transient int key; + + private final int firstStartPosition; + private final int firstUnclippedStartPosition; + private final short firstRefIndex; + private final boolean R1R; + + protected final int score; + + public Fragment(final GATKRead first, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy) { + super(partitionIndex, first.getName()); + + this.firstUnclippedStartPosition = ReadUtils.getStrandedUnclippedStart(first); + this.firstStartPosition = first.getAssignedStart(); + this.firstRefIndex = (short)ReadUtils.getReferenceIndex(first, header); + this.score = scoringStrategy.score(first); + this.R1R = first.isReverseStrand(); + this.key = ReadsKey.hashKeyForFragment(firstUnclippedStartPosition, + isR1R(), + firstRefIndex, + ReadUtils.getLibrary(first, header)); + } + + @Override + public Type getType() { + return Type.FRAGMENT; + } + @Override + // NOTE: This is transient and thus may not exist if the object gets serialized + public int key() { + return key; + } + @Override + public int getScore() { + return score; + } + @Override + public int getUnclippedStartPosition() { + return firstUnclippedStartPosition; + } + @Override + public int getFirstStartPosition() { + return firstStartPosition; + } + @Override + public boolean isR1R() { + return R1R; + } + @Override + public int getFirstRefIndex() { + return firstRefIndex; + } + @Override + public String toString() { + return name + " " + firstUnclippedStartPosition; + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/MarkDuplicatesSparkRecord.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/MarkDuplicatesSparkRecord.java new file mode 100644 index 00000000000..0a021c56b6a --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/MarkDuplicatesSparkRecord.java @@ -0,0 +1,59 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords; + +import htsjdk.samtools.SAMFileHeader; +import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; + +/** + * A common interface for the data types that represent reads for mark duplicates spark. This is done to satisfy limitations + * in spark as it is often faster to perform a map on an RDD than to split it off into multiple streams. + * + * The implementing classes are specific to the needs of various code paths in MarkDuplicatesSpark. + */ +public abstract class MarkDuplicatesSparkRecord { + protected final int partitionIndex; + protected final String name; + MarkDuplicatesSparkRecord(int partitionIndex, String name) { + this.name = name; + this.partitionIndex = partitionIndex; + } + + // Required abstract methods + public abstract Type getType(); + // NOTE: these keys are typically stored as a transient field to prevent serialization for performances purposes, this is only guaranteed to give the correct result right after construction + public abstract int key(); + + + // A fragment containing only one read without a mapped mate + public static PairedEnds newFragment(final GATKRead first, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy) { + return new Fragment(first, header, partitionIndex, scoringStrategy); + } + + // An optimization for reducing the serialized data passed around when indicating that there was a mapped read at a location + public static PairedEnds newEmptyFragment(GATKRead read, SAMFileHeader header) { + return new EmptyFragment(read, header); + } + + // An object representing a pair of primary and secondary reads with a particular span for duplicate marking + public static PairedEnds newPair(GATKRead first, GATKRead second, SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy) { + return new Pair(first, second, header, partitionIndex, scoringStrategy); + } + + // An object representing a read or group of reads that we want to pass through the tool without being duplicate marked + public static Passthrough getPassthrough(GATKRead read, int partitionIndex) { + return new Passthrough(read, partitionIndex); + } + + + public int getPartitionIndex(){ + return partitionIndex; + } + + public String getName() { + return name; + } + + public enum Type { + FRAGMENT, PAIR, PASSTHROUGH, EMPTY_FRAGMENT + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Pair.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Pair.java new file mode 100644 index 00000000000..22a1a40072d --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Pair.java @@ -0,0 +1,218 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import htsjdk.samtools.SAMFileHeader; +import org.broadinstitute.hellbender.utils.Utils; +import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.ReadUtils; +import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; +import org.broadinstitute.hellbender.utils.read.markduplicates.OpticalDuplicateFinder; +import org.broadinstitute.hellbender.utils.read.markduplicates.ReadEnds; +import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey; + +/** + * Class representing a pair of reads together with accompanying optical duplicate marking information. + * + * This class holds onto as little information as possible in an attempt to prevent excessive serialization of + * during the processing step of MarkDuplicatesSpark + */ +@DefaultSerializer(Pair.Serializer.class) +public final class Pair extends PairedEnds implements OpticalDuplicateFinder.PhysicalLocation { + protected transient int key; + + private final int firstStartPosition; + private final int firstUnclippedStartPosition; + private final short firstRefIndex; + private final boolean R1R; + + private final int secondUnclippedStartPosition; + private final short secondRefIndex; + private final boolean R2R; + private final int score; + + // Information used to detect optical dupes + private transient short readGroup = -1; + private transient short tile = -1; + private transient short x = -1; + private transient short y = -1; + private transient short libraryId = -1; + + public Pair(final GATKRead read1, final GATKRead read2, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy) { + super(partitionIndex, read1.getName()); + + final String name1 = read1.getName(); + final String name2 = read2.getName(); + Utils.validate(name1.equals(name2), () -> "Paired reads have different names\n" + name1 + "\n" + name2); + + this.score = scoringStrategy.score(read1) + scoringStrategy.score(read2); + + GATKRead first = read1; + GATKRead second; + + final int read1UnclippedStart = ReadUtils.getStrandedUnclippedStart(read1); + final int read2UnclippedStart = ReadUtils.getStrandedUnclippedStart(read2); + + if( read1UnclippedStart < read2UnclippedStart ){ + first = read1; + second = read2; + firstUnclippedStartPosition = read1UnclippedStart; + secondUnclippedStartPosition = read2UnclippedStart; + } else { + first = read2; + second = read1; + firstUnclippedStartPosition = read2UnclippedStart; + secondUnclippedStartPosition = read1UnclippedStart; + } + + firstStartPosition = first.getAssignedStart(); + firstRefIndex = (short)ReadUtils.getReferenceIndex(first, header); + secondRefIndex = (short)ReadUtils.getReferenceIndex(second, header); + + final GATKRead firstOfPair; + final GATKRead secondOfPair; + if (read1.isFirstOfPair()){ + firstOfPair = read1; + secondOfPair = read2; + } else { + firstOfPair = read2; + secondOfPair = read1; + } + + R1R = firstOfPair.isReverseStrand(); + R2R = secondOfPair.isReverseStrand(); + + this.key = ReadsKey.hashKeyForPair(header, first, second); + } + + // Constructor for serialization purposes + private Pair(Kryo kryo, Input input){ + super(input.readInt(true), input.readString()); + + // Information used to detect optical dupes + readGroup = -1; + tile = -1; + x = -1; + y = -1; + libraryId = -1; + + score = input.readInt(); + + firstStartPosition = input.readInt(); + firstUnclippedStartPosition = input.readInt(); + firstRefIndex = input.readShort(); + R1R = input.readBoolean(); + + secondUnclippedStartPosition = input.readInt(); + secondRefIndex = input.readShort(); + R2R = input.readBoolean(); + + } + + protected void serialize(Kryo kryo, Output output) { + output.writeInt(partitionIndex, true); + output.writeAscii(name); + + output.writeInt(score); + + output.writeInt(firstStartPosition); + output.writeInt(firstUnclippedStartPosition); + output.writeShort(firstRefIndex); + output.writeBoolean(R1R); + + output.writeInt(secondUnclippedStartPosition); + output.writeShort(secondRefIndex); + output.writeBoolean(R2R); + } + + @Override + public Type getType() { + return Type.PAIR; + } + @Override + // NOTE: This is transient and thus may not exist if the object gets serialized + public int key() { + return key; + } + @Override + public int getScore() { + return score; + } + @Override + public int getUnclippedStartPosition() { + return firstUnclippedStartPosition; + } + @Override + public int getFirstStartPosition() { + return firstStartPosition; + } + @Override + public boolean isR1R() { + return R1R; + } + @Override + public int getFirstRefIndex() { + return firstRefIndex; + } + @Override + public String toString() { + return name + " " + firstUnclippedStartPosition + " " + (secondUnclippedStartPosition == -1 ? "" : secondUnclippedStartPosition); + } + + /** + * Returns the pair orientation suitable for optical duplicates, + * which always goes by the first then the second end for the strands. + * This is based on code in MarkDuplicatesGATK and ReadEnds.getOrientationByte. + * Returns one of {@link ReadEnds#RR}, {@link ReadEnds#RF}, {@link ReadEnds#FR}, {@link ReadEnds#FF} + */ + public byte getOrientationForOpticalDuplicates() { + if (R1R && R2R) { + return ReadEnds.RR; + } + if (R1R) { + return ReadEnds.RF; //at this point we know for sure R2R is false + } + if (R2R) { + return ReadEnds.FR; //at this point we know for sure R1R is false + } + return ReadEnds.FF; //at this point we know for sure R1R is false and R2R is false + } + + // Methods for OpticalDuplicateFinder.PhysicalLocation + @Override + public short getReadGroup() { return this.readGroup; } + @Override + public void setReadGroup(final short readGroup) { this.readGroup = readGroup; } + @Override + public short getTile() { return this.tile; } + @Override + public void setTile(final short tile) { this.tile = tile; } + @Override + public short getX() { return this.x; } + @Override + public void setX(final short x) { this.x = x; } + @Override + public short getY() { return this.y; } + @Override + public void setY(final short y) { this.y = y; } + @Override + public short getLibraryId() { return this.libraryId; } + @Override + public void setLibraryId(final short libraryId) { this.libraryId = libraryId; } + + /** + * Serializers for each subclass of PairedEnds which rely on implementations of serializations within each class itself + */ + public static final class Serializer extends com.esotericsoftware.kryo.Serializer { + @Override + public void write(final Kryo kryo, final Output output, final org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords.Pair pair ) { + pair.serialize(kryo, output); + } + @Override + public org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords.Pair read(final Kryo kryo, final Input input, final Class klass ) { + return new org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords.Pair(kryo, input); + } + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/PairedEnds.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/PairedEnds.java new file mode 100644 index 00000000000..3768d914c38 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/PairedEnds.java @@ -0,0 +1,18 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords; + +/** + * Struct-like class to store information about the paired reads for mark duplicates. + */ +public abstract class PairedEnds extends MarkDuplicatesSparkRecord { + + PairedEnds(int partitionIndex, String name) { + super(partitionIndex, name); + } + + public abstract int getFirstStartPosition(); + public abstract int getUnclippedStartPosition(); + public abstract int getFirstRefIndex(); + public abstract boolean isR1R(); + public abstract int getScore(); + +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Passthrough.java b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Passthrough.java new file mode 100644 index 00000000000..ca11816be37 --- /dev/null +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/markduplicates/sparkrecords/Passthrough.java @@ -0,0 +1,29 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords; + +import htsjdk.samtools.SAMFileHeader; +import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey; + +/** + * Dummy class used for preserving reads that need to be marked as non-duplicate despite not wanting to perform any + * processing on the reads. (eg. unmapped reads we don't want to process but must be non-duplicate marked) + */ +public final class Passthrough extends MarkDuplicatesSparkRecord { + private final transient int key; + + Passthrough(GATKRead read, int partitionIndex) { + super(partitionIndex, read.getName()); + + this.key = ReadsKey.hashKeyForRead(read); + } + + @Override + public Type getType() { + return Type.PASSTHROUGH; + } + @Override + // NOTE: This is transient and thus may not exist if the object gets serialized + public int key() { + return key; + } +} diff --git a/src/main/java/org/broadinstitute/hellbender/utils/test/testers/SamFileTester.java b/src/main/java/org/broadinstitute/hellbender/utils/test/testers/SamFileTester.java index 94e6ff42694..e5e6a300541 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/test/testers/SamFileTester.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/test/testers/SamFileTester.java @@ -59,6 +59,10 @@ public void setOutput(final File output) { this.output = output; } + public void addArg(final String key) { + args.add(key); + } + public void addArg(final String key, final String value) { args.add(key); args.add(value); } diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/bwa/BwaAndMarkDuplicatesPipelineSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/bwa/BwaAndMarkDuplicatesPipelineSparkIntegrationTest.java index 9a5bd6a5fff..c49b5b40392 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/bwa/BwaAndMarkDuplicatesPipelineSparkIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/bwa/BwaAndMarkDuplicatesPipelineSparkIntegrationTest.java @@ -3,6 +3,7 @@ import org.broadinstitute.hellbender.CommandLineProgramTest; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; import org.broadinstitute.hellbender.tools.spark.pipelines.BwaAndMarkDuplicatesPipelineSpark; +import org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSpark; import org.broadinstitute.hellbender.utils.test.ArgumentsBuilder; import org.broadinstitute.hellbender.utils.test.SamAssertionUtils; import org.testng.Assert; @@ -34,6 +35,7 @@ public void test() throws Exception { args.addInput(input); args.addOutput(output); args.addBooleanArgument(StandardArgumentDefinitions.DISABLE_SEQUENCE_DICT_VALIDATION_NAME, true); + args.add("--"+MarkDuplicatesSpark.DO_NOT_MARK_UNMAPPED_MATES); this.runCommandLine(args.getArgsArray()); SamAssertionUtils.assertSamsEqual(output, expectedSam); diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java index 5e9becfa506..0f83de31508 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/MarkDuplicatesSparkIntegrationTest.java @@ -158,4 +158,24 @@ public void testMarkDuplicatesSparkIntegrationTestLocal( } } } + + // Tests asserting that without --do-not-mark-unmapped-mates argument that unmapped mates are still duplicate marked with their partner + @Test + public void testMappedPairAndMappedFragmentAndMatePairSecondUnmapped() { + final AbstractMarkDuplicatesTester tester = new MarkDuplicatesSparkTester(true); + tester.addMatePair(1, 10040, 10040, false, true, true, true, "76M", null, false, false, false, false, false, DEFAULT_BASE_QUALITY); // first a duplicate, + // second end unmapped + tester.addMappedPair(1, 10189, 10040, false, false, "41S35M", "65M11S", true, false, false, ELIGIBLE_BASE_QUALITY); // mapped OK + tester.addMappedFragment(1, 10040, true, DEFAULT_BASE_QUALITY); // duplicate + tester.runTest(); + } + @Test + public void testMappedPairAndMappedFragmentAndMatePairFirstUnmapped() { + final AbstractMarkDuplicatesTester tester = new MarkDuplicatesSparkTester(true); + tester.addMatePair(1, 10040, 10040, true, false, true, true, null, "76M", false, false, false, false, false, DEFAULT_BASE_QUALITY); // first a duplicate, + // first end unmapped + tester.addMappedPair(1, 10189, 10040, false, false, "41S35M", "65M11S", true, false, false, ELIGIBLE_BASE_QUALITY); // mapped OK + tester.addMappedFragment(1, 10040, true, DEFAULT_BASE_QUALITY); // duplicate + tester.runTest(); + } } diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSparkIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSparkIntegrationTest.java index a24947d877f..d80a16e6e04 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSparkIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/pipelines/ReadsPipelineSparkIntegrationTest.java @@ -3,6 +3,7 @@ import htsjdk.samtools.ValidationStringency; import org.broadinstitute.hellbender.CommandLineProgramTest; import org.broadinstitute.hellbender.engine.spark.datasources.ReferenceTwoBitSource; +import org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSpark; import org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest; import org.broadinstitute.hellbender.utils.test.BaseTest; import org.broadinstitute.hellbender.GATKBaseTest; @@ -121,6 +122,7 @@ public void testReadsPipelineSpark(PipelineTest params) throws IOException { args.add("-R"); args.add(referenceFile.getAbsolutePath()); } + args.add("--"+ MarkDuplicatesSpark.DO_NOT_MARK_UNMAPPED_MATES); args.add("-indels"); args.add("--enable-baq"); args.add("-pairHMM"); diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java index 1fd40c1f42f..79c44e6db94 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java @@ -40,7 +40,7 @@ public void markDupesTest(final String input, final long totalExpected, final lo OpticalDuplicatesArgumentCollection opticalDuplicatesArgumentCollection = new OpticalDuplicatesArgumentCollection(); final OpticalDuplicateFinder finder = opticalDuplicatesArgumentCollection.READ_NAME_REGEX != null ? new OpticalDuplicateFinder(opticalDuplicatesArgumentCollection.READ_NAME_REGEX, opticalDuplicatesArgumentCollection.OPTICAL_DUPLICATE_PIXEL_DISTANCE, null) : null; - JavaRDD markedReads = MarkDuplicatesSpark.mark(reads, header, MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES, finder, 1); + JavaRDD markedReads = MarkDuplicatesSpark.mark(reads, header, MarkDuplicatesScoringStrategy.SUM_OF_BASE_QUALITIES, finder, 1, false); Assert.assertEquals(markedReads.count(), totalExpected); JavaRDD dupes = markedReads.filter(GATKRead::isDuplicate); diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java index 49c597cddb3..82355121ec3 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUtilsUnitTest.java @@ -2,23 +2,28 @@ import com.google.api.client.util.Lists; import com.google.common.collect.ImmutableList; -import htsjdk.samtools.SAMFileHeader; +import htsjdk.samtools.*; +import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; import org.broadinstitute.hellbender.engine.spark.SparkContextFactory; -import org.broadinstitute.hellbender.utils.read.ArtificialReadUtils; -import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.engine.spark.datasources.ReadsSparkSink; +import org.broadinstitute.hellbender.utils.read.*; +import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy; import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey; import org.broadinstitute.hellbender.GATKBaseTest; +import org.broadinstitute.hellbender.utils.test.SamAssertionUtils; import org.testng.Assert; import org.testng.annotations.Test; import scala.Tuple2; +import scala.collection.Seq; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; public class MarkDuplicatesSparkUtilsUnitTest extends GATKBaseTest { @Test(groups = "spark") @@ -38,34 +43,6 @@ public void testSpanningIterator() { ImmutableList.of(pairIterable(1, "a"), pairIterable(2, "b"), pairIterable(1, "c"))); } - @Test(groups = "spark") - public void testSpanReadsByKeyWithAlternatingGroups() { - SAMFileHeader header = ArtificialReadUtils.createArtificialSamHeaderWithGroups(1, 1, 1000, 2); - GATKRead read1 = ArtificialReadUtils.createArtificialRead(header, "N", 0, 1, 20); - read1.setReadGroup(getReadGroupId(header, 0)); - GATKRead read2 = ArtificialReadUtils.createArtificialRead(header, "N", 0, 2, 20); - read2.setReadGroup(getReadGroupId(header, 1)); - GATKRead read3 = ArtificialReadUtils.createArtificialRead(header, "N", 0, 3, 20); - read3.setReadGroup(getReadGroupId(header, 0)); - GATKRead read4 = ArtificialReadUtils.createArtificialRead(header, "N", 0, 4, 20); - read4.setReadGroup(getReadGroupId(header, 1)); - - String key1 = ReadsKey.keyForRead(header, read1); - String key2 = ReadsKey.keyForRead(header, read2); - String key3 = ReadsKey.keyForRead(header, read3); - String key4 = ReadsKey.keyForRead(header, read4); - - Assert.assertEquals("ReadGroup0|N", key1); - Assert.assertEquals("ReadGroup1|N", key2); - Assert.assertEquals("ReadGroup0|N", key3); - Assert.assertEquals("ReadGroup1|N", key4); - - JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - JavaRDD reads = ctx.parallelize(ImmutableList.of(read1, read2, read3, read4), 1); - JavaPairRDD> groupedReads = MarkDuplicatesSparkUtils.spanReadsByKey(header, reads); - Assert.assertEquals(groupedReads.collect(), - ImmutableList.of(pairIterable(key1, read1, read3), pairIterable(key2, read2, read4))); - } private String getReadGroupId(final SAMFileHeader header, final int index) { return header.getReadGroups().get(index).getReadGroupId(); diff --git a/src/test/java/org/broadinstitute/hellbender/tools/walkers/markduplicates/AbstractMarkDuplicatesCommandLineProgramTest.java b/src/test/java/org/broadinstitute/hellbender/tools/walkers/markduplicates/AbstractMarkDuplicatesCommandLineProgramTest.java index f469a099683..d7a56bd48d0 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/walkers/markduplicates/AbstractMarkDuplicatesCommandLineProgramTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/walkers/markduplicates/AbstractMarkDuplicatesCommandLineProgramTest.java @@ -10,6 +10,8 @@ import org.broadinstitute.hellbender.CommandLineProgramTest; import org.broadinstitute.hellbender.cmdline.CommandLineProgram; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.tools.spark.pipelines.MarkDuplicatesSparkIntegrationTest; +import org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSpark; import org.broadinstitute.hellbender.utils.test.ArgumentsBuilder; import org.broadinstitute.hellbender.utils.test.BaseTest; import org.broadinstitute.hellbender.utils.test.IntegrationTestSpec; @@ -316,6 +318,9 @@ public void testMappedPairAndMappedFragmentAndMatePairSecondUnmapped() { // second end unmapped tester.addMappedPair(1, 10189, 10040, false, false, "41S35M", "65M11S", true, false, false, ELIGIBLE_BASE_QUALITY); // mapped OK tester.addMappedFragment(1, 10040, true, DEFAULT_BASE_QUALITY); // duplicate + if (this instanceof MarkDuplicatesSparkIntegrationTest) { + tester.addArg("--do_not_mark_unmapped_mates"); + } tester.runTest(); } @@ -326,6 +331,9 @@ public void testMappedPairAndMappedFragmentAndMatePairFirstUnmapped() { // first end unmapped tester.addMappedPair(1, 10189, 10040, false, false, "41S35M", "65M11S", true, false, false, ELIGIBLE_BASE_QUALITY); // mapped OK tester.addMappedFragment(1, 10040, true, DEFAULT_BASE_QUALITY); // duplicate + if (this instanceof MarkDuplicatesSparkIntegrationTest) { + tester.addArg("--do_not_mark_unmapped_mates"); + } tester.runTest(); } diff --git a/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/MarkDuplicatesSparkTester.java b/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/MarkDuplicatesSparkTester.java index 1321676ca11..ad882a9ee7a 100644 --- a/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/MarkDuplicatesSparkTester.java +++ b/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/MarkDuplicatesSparkTester.java @@ -1,19 +1,36 @@ package org.broadinstitute.hellbender.utils.read.markduplicates; +import com.google.common.collect.Lists; import htsjdk.samtools.DuplicateScoringStrategy; import org.broadinstitute.hellbender.cmdline.CommandLineProgram; import org.broadinstitute.hellbender.tools.spark.transforms.markduplicates.MarkDuplicatesSpark; import org.broadinstitute.hellbender.utils.test.testers.AbstractMarkDuplicatesTester; +import java.util.Arrays; +import java.util.List; + /** * A tester class for {@link MarkDuplicatesSpark}. */ public final class MarkDuplicatesSparkTester extends AbstractMarkDuplicatesTester { + boolean markUnmappedReads = false; public MarkDuplicatesSparkTester() { + this(false); + } + + public MarkDuplicatesSparkTester(boolean markUnmappedReads) { super(DuplicateScoringStrategy.ScoringStrategy.TOTAL_MAPPED_REFERENCE_LENGTH); + this.markUnmappedReads = markUnmappedReads; } @Override protected CommandLineProgram getProgram() { return new MarkDuplicatesSpark(); } + + @Override + protected void addArgs() { + if (!markUnmappedReads) { + addArg("--" + MarkDuplicatesSpark.DO_NOT_MARK_UNMAPPED_MATES); + } + } } diff --git a/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKeyUnitTest.java b/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKeyUnitTest.java new file mode 100644 index 00000000000..42fbe21e31d --- /dev/null +++ b/src/test/java/org/broadinstitute/hellbender/utils/read/markduplicates/ReadsKeyUnitTest.java @@ -0,0 +1,78 @@ +package org.broadinstitute.hellbender.utils.read.markduplicates; + +import org.broadinstitute.hellbender.GATKBaseTest; +import org.testng.annotations.Test; + +import java.util.Random; + +public class ReadsKeyUnitTest extends GATKBaseTest { + + @Test (enabled = false) + // This test makes the argument that using a StringBuilder in ReadsKey is faster than an equivalent call to String.format() + /* an example of the results: + With Format: 13909867289 + With Builder: 1130077051 + With String +: 1155343164 + With Joiner: 2013227121 + */ + public void testKeyForFragmentPerformanceEvidence() throws Exception { + Random rand = new Random(); + int numTrials = 10000000; + for (int i = 0; i < numTrials; i++) { + rand.nextInt(); + } + + long startTime = System.nanoTime(); + for (int i = 0; i < numTrials; i++) { + String s = String.format( + "%s|%d|%d|%s", + rand.nextBoolean() ? "SomeLongTestLibraryName-----x" : "-", + rand.nextInt(30), + rand.nextInt(100), + rand.nextBoolean() ? "r" : "f"); + } + long endTime = System.nanoTime(); + System.out.println("With Format:\t" + (endTime - startTime)); + + startTime = System.nanoTime(); + for (int i = 0; i < numTrials; i++) { + String s = new StringBuilder().append(rand.nextBoolean() ? "SomeLongTestLibraryName-----x" : "-") + .append("|") + .append(rand.nextInt(30)) + .append("|") + .append(rand.nextInt(100)) + .append("|") + .append( rand.nextBoolean() ? "r" : "f") + .toString(); + + } + endTime = System.nanoTime(); + System.out.println("With Builder:\t" + (endTime - startTime)); + + startTime = System.nanoTime(); + for (int i = 0; i < numTrials; i++) { + String s = (rand.nextBoolean() ? "SomeLongTestLibraryName-----x" : "-") + + "|" + + rand.nextInt(30) + + "|" + + rand.nextInt(100) + + "|" + + (rand.nextBoolean() ? "r" : "f"); + + } + endTime = System.nanoTime(); + System.out.println("With String +:\t" + (endTime - startTime)); + + startTime = System.nanoTime(); + for (int i = 0; i < numTrials; i++) { + String s = String.join("|", rand.nextBoolean() ? "SomeLongTestLibraryName-----x" : "-", + Integer.toString(rand.nextInt(30)), + Integer.toString(rand.nextInt(100)), + rand.nextBoolean() ? "r" : "f"); + + } + endTime = System.nanoTime(); + System.out.println("With Joiner:\t" + (endTime - startTime)); + } + +} \ No newline at end of file