diff --git a/scripts/dev/diff_debugspammingiterator.sh b/scripts/dev/diff_debugspammingiterator.sh new file mode 100644 index 00000000..043a4b0c --- /dev/null +++ b/scripts/dev/diff_debugspammingiterator.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +for f1 in DebugSpammingIterator$1.Worker*.log ; do + f2=${f1/tor$1/tor$2} + # let truncated files still be happy + lines1=$(cat $f1 | wc -l) + lines2=$(cat $f2 | wc -l) + lines=$lines2 + if [[ $lines1 -lt $lines2 ]] ; then + lines=$lines1 + fi + for filter in \ + "SAMEvidenceSource.iterator().rawiterator" \ + "SAMEvidenceSource.shouldFilterPreTransform" \ + "SAMEvidenceSource.transform" \ + "SAMEvidenceSource.iterator(QueryInterval[])" \ + "SAMEvidenceSource.shouldFilter(SAMRecord)" \ + "SAMEvidenceSource.shouldFilter(DirectedEvidence)" \ + "asEvidence" \ + "SAMEvidenceSource.iterator(QueryInterval[]).filter" \ + "mergedIterator" \ + "PositionalAssembler.evidenceIt" \ + "PositionalAssembler.SupportNodeIterator" \ + "PositionalAssembler.AggregateNodeIterator" \ + "PositionalAssembler.PathNodeIterator" \ + "AssemblyEvidenceSource.assembler" \ + ; do + (diff --speed-large-files -q \ + <(head -$lines $f1 | grep -F "$filter") \ + <(head -$lines $f2 | grep -F "$filter") >/dev/null \ + && echo "---------------IDENTICAL $filter $f1") || \ + echo "----------NOT--IDENTICAL $filter $f1" + diff --speed-large-files\ + <(head -$lines $f1 | grep -F "$filter") \ + <(head -$lines $f2 | grep -F "$filter") | head -2 + done +done + + diff --git a/src/main/java/au/edu/wehi/idsv/AssemblyEvidenceSource.java b/src/main/java/au/edu/wehi/idsv/AssemblyEvidenceSource.java index 8065f923..75639cce 100644 --- a/src/main/java/au/edu/wehi/idsv/AssemblyEvidenceSource.java +++ b/src/main/java/au/edu/wehi/idsv/AssemblyEvidenceSource.java @@ -7,9 +7,11 @@ import au.edu.wehi.idsv.sam.SAMFileUtil; import au.edu.wehi.idsv.sam.SAMRecordUtil; import au.edu.wehi.idsv.sam.SamTags; +import au.edu.wehi.idsv.util.DebugSpammingIterator; import au.edu.wehi.idsv.util.FileHelper; import au.edu.wehi.idsv.visualisation.AssemblyTelemetry; import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.MoreExecutors; import gridss.SoftClipsToSplitReads; import gridss.cmdline.CommandLineProgramHelper; @@ -233,9 +235,13 @@ private void assembleChunk(SAMFileWriter writer, SAMFileWriter filteredWriter, i try (CloseableIterator input = mergedIterator(source, expanded, EvidenceSortOrder.SAMRecordStartPosition)) { Iterator throttledIt = throttled(input, downsampledRegions); Iterator errorCorrectedIt = errorCorrected(throttledIt); - PositionalAssembler assembler = new PositionalAssembler(getContext(), AssemblyEvidenceSource.this, assemblyNameGenerator, errorCorrectedIt, direction, excludedRegions, safetyRegions); + PositionalAssembler positionalAssembler = new PositionalAssembler(getContext(), AssemblyEvidenceSource.this, assemblyNameGenerator, errorCorrectedIt, direction, excludedRegions, safetyRegions); if (telemetry != null) { - assembler.setTelemetry(telemetry.getTelemetry(chunkNumber, direction)); + positionalAssembler.setTelemetry(telemetry.getTelemetry(chunkNumber, direction)); + } + Iterator assembler = positionalAssembler; + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + assembler = Iterators.peekingIterator(new DebugSpammingIterator<>(assembler, "AssemblyEvidenceSource.assembler")); } while (assembler.hasNext()) { SAMRecord asm = assembler.next(); diff --git a/src/main/java/au/edu/wehi/idsv/Defaults.java b/src/main/java/au/edu/wehi/idsv/Defaults.java index 15d9a649..4f3d26fe 100644 --- a/src/main/java/au/edu/wehi/idsv/Defaults.java +++ b/src/main/java/au/edu/wehi/idsv/Defaults.java @@ -7,6 +7,7 @@ public class Defaults { public static final boolean SANITY_CHECK_EVIDENCE_TRACKER; public static final boolean SANITY_CHECK_CLIQUE; public static final boolean SANITY_CHECK_ITERATORS; + public static final boolean SANITY_CHECK_DUMP_ITERATORS; public static final boolean SANITY_CHECK_MEMOIZATION; public static final boolean SANITY_CHECK_MEMOIZATION_ALL_OPERATIONS; public static final boolean SINGLE_THREAD_LIBSSW; @@ -23,6 +24,7 @@ public class Defaults { SANITY_CHECK_EVIDENCE_TRACKER = Boolean.valueOf(System.getProperty("sanitycheck.evidencetracker", "false")); SANITY_CHECK_CLIQUE = Boolean.valueOf(System.getProperty("sanitycheck.clique", "false")); SANITY_CHECK_ITERATORS = Boolean.valueOf(System.getProperty("sanitycheck.iterators", "false")); + SANITY_CHECK_DUMP_ITERATORS = Boolean.valueOf(System.getProperty("sanitycheck.dumpIterators", "false")); SANITY_CHECK_MEMOIZATION = Boolean.valueOf(System.getProperty("sanitycheck.memoization", "false")); SANITY_CHECK_MEMOIZATION_ALL_OPERATIONS = Boolean.valueOf(System.getProperty("sanitycheck.memoization.alloperations", "false")); SINGLE_THREAD_LIBSSW = Boolean.valueOf(System.getProperty("sswjni.sync", "false")); diff --git a/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java b/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java index 30fab7ac..0d613db3 100644 --- a/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java +++ b/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java @@ -219,9 +219,15 @@ public CloseableIterator iterator(final QueryInterval[] interv // ignore blacklisted regions IntervalBed queryInterval = new IntervalBed(getContext().getLinear(), expandedIntervals); queryInterval.remove(getBlacklistedRegions()); - SAMRecordIterator it = tryOpenReader(reader, queryInterval.asQueryInterval()); + CloseableIterator it = tryOpenReader(reader, queryInterval.asQueryInterval()); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.iterator().rawiterator")); + } Iterator eit = asEvidence(it, eso); eit = Iterators.filter(eit, e -> QueryIntervalUtil.overlaps(intervals, e.getBreakendSummary())); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + eit = new AutoClosingIterator<>(new DebugSpammingIterator<>(eit, "SAMEvidenceSource.iterator(QueryInterval[]).filter")); + } return new AutoClosingIterator<>(eit, reader, it); } /** @@ -253,6 +259,9 @@ public CloseableIterator iterator(EvidenceSortOrder eso) { SAMRecordIterator it = reader.iterator(); it.assertSorted(SortOrder.coordinate); Iterator eit = asEvidence(it, eso); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + eit = new AutoClosingIterator<>(new DebugSpammingIterator<>(eit, "SAMEvidenceSource.iterator()")); + } return new AutoClosingIterator<>(eit, reader, it); } protected SamReader getReader() { @@ -276,10 +285,22 @@ public void assertPreprocessingComplete() { private Iterator asEvidence(Iterator it, EvidenceSortOrder eso) { it = new BufferedIterator<>(it, 2); // TODO: remove when https://github.com/samtools/htsjdk/issues/760 is resolved it = Iterators.filter(it, r -> !shouldFilterPreTransform(r)); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.shouldFilterPreTransform")); + } it = Iterators.transform(it, r -> transform(r)); - it = Iterators.filter(it, r -> !shouldFilter(r)); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.transform")); + } + it = Iterators.filter(it, r -> !shouldFilter(r)); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.shouldFilter(SAMRecord)")); + } Iterator eit = new DirectedEvidenceIterator(it, this, minIndelSize()); eit = Iterators.filter(eit, e -> !shouldFilter(e)); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.shouldFilter(DirectedEvidence)")); + } switch (eso) { case SAMRecordStartPosition: // already sorted by coordinate @@ -295,6 +316,9 @@ private Iterator asEvidence(Iterator it, EvidenceSo default: throw new IllegalArgumentException("Sort order must be specified"); } + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + eit = new DebugSpammingIterator<>(eit, "asEvidence"); + } return eit; } private static float average(byte[] values) { @@ -552,6 +576,9 @@ public static CloseableIterator mergedIterator(List merged = new AutoClosingMergedIterator(toMerge, eso == EvidenceSortOrder.EvidenceStartPosition ? DirectedEvidenceOrder.ByNatural : DirectedEvidenceOrder.BySAMStart); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + merged = new AutoClosingIterator<>(new DebugSpammingIterator<>(merged, "mergedIterator")); + } return merged; } public static CloseableIterator mergedIterator(final List source, final QueryInterval[] intervals, EvidenceSortOrder eso) { @@ -561,6 +588,9 @@ public static CloseableIterator mergedIterator(final List merged = new AutoClosingMergedIterator(toMerge, eso == EvidenceSortOrder.EvidenceStartPosition ? DirectedEvidenceOrder.ByNatural : DirectedEvidenceOrder.BySAMStart); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + merged = new AutoClosingIterator<>(new DebugSpammingIterator<>(merged, "mergedIterator")); + } return merged; } /** diff --git a/src/main/java/au/edu/wehi/idsv/debruijn/positional/PositionalAssembler.java b/src/main/java/au/edu/wehi/idsv/debruijn/positional/PositionalAssembler.java index 17da8e95..55ead989 100644 --- a/src/main/java/au/edu/wehi/idsv/debruijn/positional/PositionalAssembler.java +++ b/src/main/java/au/edu/wehi/idsv/debruijn/positional/PositionalAssembler.java @@ -8,9 +8,7 @@ import au.edu.wehi.idsv.debruijn.ReadErrorCorrector; import au.edu.wehi.idsv.picard.ReferenceLookup; import au.edu.wehi.idsv.sam.SamTags; -import au.edu.wehi.idsv.util.FileHelper; -import au.edu.wehi.idsv.util.FilenameUtil; -import au.edu.wehi.idsv.util.IntervalUtil; +import au.edu.wehi.idsv.util.*; import au.edu.wehi.idsv.visualisation.AssemblyTelemetry.AssemblyChunkTelemetry; import au.edu.wehi.idsv.visualisation.PositionalDeBruijnGraphTracker; import com.google.common.collect.*; @@ -317,7 +315,7 @@ private NonReferenceContigAssembler createAssembler(Set preloa int referenceIndex = inputIterator.peek().getBreakendSummary().referenceIndex; int firstPosition = inputIterator.peek().getBreakendSummary().start; currentContig = context.getDictionary().getSequence(referenceIndex).getSequenceName(); - ReferenceIndexIterator evidenceIt = new ReferenceIndexIterator(inputIterator, referenceIndex); + PeekingIterator evidenceIt = new ReferenceIndexIterator(inputIterator, referenceIndex); evidenceTracker = new EvidenceTracker(); VisualisationConfiguration vis = context.getConfig().getVisualisation(); if (vis.evidenceTracker) { @@ -326,17 +324,31 @@ private NonReferenceContigAssembler createAssembler(Set preloa File file = new File(vis.directory, filename); evidenceTracker.setDebugFileOutput(file); } - SupportNodeIterator supportIt = new SupportNodeIterator(k, evidenceIt, Math.max(2 * source.getMaxReadLength(), source.getMaxConcordantFragmentSize()), evidenceTracker, ap.includePairAnchors, ap.pairAnchorMismatchIgnoreEndBases); - AggregateNodeIterator agIt = new AggregateNodeIterator(supportIt); + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + evidenceIt = Iterators.peekingIterator(new DebugSpammingIterator<>(it, "PositionalAssembler.evidenceIt")); + } + SupportNodeIterator supportIt_raw = new SupportNodeIterator(k, evidenceIt, Math.max(2 * source.getMaxReadLength(), source.getMaxConcordantFragmentSize()), evidenceTracker, ap.includePairAnchors, ap.pairAnchorMismatchIgnoreEndBases); + PeekingIterator supportIt = supportIt_raw; + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + supportIt = Iterators.peekingIterator(new DebugSpammingIterator<>(supportIt, "PositionalAssembler.SupportNodeIterator")); + } + AggregateNodeIterator agIt_raw = new AggregateNodeIterator(supportIt); + PeekingIterator agIt = agIt_raw; + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + agIt = Iterators.peekingIterator(new DebugSpammingIterator<>(agIt, "PositionalAssembler.AggregateNodeIterator")); + } Iterator knIt = agIt; if (Defaults.SANITY_CHECK_ASSEMBLY_GRAPH) { knIt = evidenceTracker.new AggregateNodeAssertionInterceptor(knIt); } - PathNodeIterator pathNodeIt = new PathNodeIterator(knIt, maxPathLength, k); + PathNodeIterator pathNodeIt = new PathNodeIterator(knIt, maxPathLength, k); Iterator pnIt = pathNodeIt; if (Defaults.SANITY_CHECK_ASSEMBLY_GRAPH) { pnIt = evidenceTracker.new PathNodeAssertionInterceptor(pnIt, "PathNodeIterator"); } + if (Defaults.SANITY_CHECK_DUMP_ITERATORS) { + pnIt = Iterators.peekingIterator(new DebugSpammingIterator<>(pnIt, "PositionalAssembler.PathNodeIterator")); + } currentAssembler = new NonReferenceContigAssembler(pnIt, referenceIndex, maxEvidenceSupportIntervalWidth, anchorAssemblyLength, k, source, assemblyNameGenerator, evidenceTracker, currentContig, BreakendDirection.Forward, excludedRegions, safetyRegions); if (vis.assemblyProgress) { String filename = String.format("positional-%s_%d-%s.csv", context.getDictionary().getSequence(referenceIndex).getSequenceName(), firstPosition, direction); @@ -344,7 +356,7 @@ private NonReferenceContigAssembler createAssembler(Set preloa File file = new File(vis.directory, filename); PositionalDeBruijnGraphTracker exportTracker; try { - exportTracker = new PositionalDeBruijnGraphTracker(file, supportIt, agIt, pathNodeIt, null, evidenceTracker, currentAssembler); + exportTracker = new PositionalDeBruijnGraphTracker(file, supportIt_raw, agIt_raw, pathNodeIt, null, evidenceTracker, currentAssembler); exportTracker.writeHeader(); currentAssembler.setExportTracker(exportTracker); } catch (IOException e) { diff --git a/src/main/java/au/edu/wehi/idsv/util/DebugSpammingIterator.java b/src/main/java/au/edu/wehi/idsv/util/DebugSpammingIterator.java new file mode 100644 index 00000000..923fe794 --- /dev/null +++ b/src/main/java/au/edu/wehi/idsv/util/DebugSpammingIterator.java @@ -0,0 +1,57 @@ +package au.edu.wehi.idsv.util; + +import htsjdk.samtools.util.RuntimeIOException; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.util.HashMap; +import java.util.Iterator; + +public class DebugSpammingIterator implements Iterator { + private static final int FLUSH_INTERVAL = 1024; + private static final HashMap out = new HashMap<>(); + private final Iterator underlying; + private final long maxRecords; + private final String id; + private long records = 0; + public DebugSpammingIterator(Iterator underlying, String id) { + this(underlying, id, 1024 * 1024 * 1024); + } + public DebugSpammingIterator(Iterator underlying, String id, long maxRecords) { + this.underlying = underlying; + this.id = id; + this.maxRecords = maxRecords; + } + @Override + public boolean hasNext() { + return underlying.hasNext(); + } + + @Override + public T next() { + T r = underlying.next(); + String threadName = Thread.currentThread().getName(); + records++; + if (records < maxRecords) { + if (!out.containsKey(threadName)) { + int i = 1; + while (new File("DebugSpammingIterator" + i + "." + threadName + ".log").exists()) { + i++; + } + try { + out.put(threadName, new PrintWriter(new File("DebugSpammingIterator" + i + "." + threadName + ".log"))); + } catch (FileNotFoundException e) { + throw new RuntimeIOException(e); + } + } + String msg = String.format("%s\t%s\t%s\n", Thread.currentThread().getName(), id, r); + out.get(threadName).write(msg); + } else if (records == maxRecords || records % FLUSH_INTERVAL == 0) { + synchronized (out) { + out.get(threadName).flush(); + } + } + return r; + } +} diff --git a/src/main/java/au/edu/wehi/idsv/util/DensityThrottlingIterator.java b/src/main/java/au/edu/wehi/idsv/util/DensityThrottlingIterator.java index 3278c8a7..9d9c2e21 100644 --- a/src/main/java/au/edu/wehi/idsv/util/DensityThrottlingIterator.java +++ b/src/main/java/au/edu/wehi/idsv/util/DensityThrottlingIterator.java @@ -35,8 +35,6 @@ public abstract class DensityThrottlingIterator implements PeekingIterator /** * @param it iterator to filter. Cannot contain null elements * @param windowSize Size of window to track density over - * @param acceptAllCount number of record in window before throttling starts - * @param maxCount maximum average number of records in window */ public DensityThrottlingIterator(Iterator it, int windowSize, double acceptDensity, double targetDensity) { this.underlying = it; diff --git a/src/test/java/au/edu/wehi/idsv/util/DensityThrottlingIteratorTest.java b/src/test/java/au/edu/wehi/idsv/util/DensityThrottlingIteratorTest.java index 1cba66ea..1adf3436 100644 --- a/src/test/java/au/edu/wehi/idsv/util/DensityThrottlingIteratorTest.java +++ b/src/test/java/au/edu/wehi/idsv/util/DensityThrottlingIteratorTest.java @@ -45,4 +45,40 @@ public void should_use_exponential_backoff_to_throttle_above_unconditional_accep List result = Lists.newArrayList(new IntDensityThrottlingIterator(input.iterator(), 4, 2.0, 4.0)); assertEquals(2048, result.size(), 64); } + public class IntRecord { + public final int value; + public final long position; + public IntRecord(int value, long position) { + this.value = value; + this.position = position; + } + } + public class IntRecordRecordDensityThrottlingIterator extends DensityThrottlingIterator { + public IntRecordRecordDensityThrottlingIterator(Iterator it, int windowSize, double acceptDensity, double targetDensity) { + super(it, windowSize, acceptDensity, targetDensity); + } + @Override + protected long getPosition(IntRecord record) { + return record.position; + } + + @Override + protected boolean excludedFromThrottling(IntRecord record) { + return false; + } + } + @Test + public void should_be_deterministic() { + List input = new ArrayList(); + for (int i = 0; i < 1024; i++) { + int value = ~i & 0xFF; + for (long pos = 0; pos < 128; pos++) { + input.add(new IntRecord(value, pos)); + } + } + List result1 = Lists.newArrayList(new IntRecordRecordDensityThrottlingIterator(input.iterator(), 4, 2.0, 4.0)); + List result2 = Lists.newArrayList(new IntRecordRecordDensityThrottlingIterator(input.iterator(), 4, 2.0, 4.0)); + + assertEquals(result1, result2); + } }