Skip to content

Commit

Permalink
Added -Dsanitycheck.dumpIterators option that dumps out the records f…
Browse files Browse the repository at this point in the history
…or almost all the assembly iterators.

Useful for isolating where non-determinism gets introduced
  • Loading branch information
Daniel Cameron committed Feb 4, 2022
1 parent d718998 commit 92cb449
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 14 deletions.
39 changes: 39 additions & 0 deletions scripts/dev/diff_debugspammingiterator.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/bin/bash

for f1 in DebugSpammingIterator$1.Worker*.log ; do
f2=${f1/tor$1/tor$2}
# let truncated files still be happy
lines1=$(cat $f1 | wc -l)
lines2=$(cat $f2 | wc -l)
lines=$lines2
if [[ $lines1 -lt $lines2 ]] ; then
lines=$lines1
fi
for filter in \
"SAMEvidenceSource.iterator().rawiterator" \
"SAMEvidenceSource.shouldFilterPreTransform" \
"SAMEvidenceSource.transform" \
"SAMEvidenceSource.iterator(QueryInterval[])" \
"SAMEvidenceSource.shouldFilter(SAMRecord)" \
"SAMEvidenceSource.shouldFilter(DirectedEvidence)" \
"asEvidence" \
"SAMEvidenceSource.iterator(QueryInterval[]).filter" \
"mergedIterator" \
"PositionalAssembler.evidenceIt" \
"PositionalAssembler.SupportNodeIterator" \
"PositionalAssembler.AggregateNodeIterator" \
"PositionalAssembler.PathNodeIterator" \
"AssemblyEvidenceSource.assembler" \
; do
(diff --speed-large-files -q \
<(head -$lines $f1 | grep -F "$filter") \
<(head -$lines $f2 | grep -F "$filter") >/dev/null \
&& echo "---------------IDENTICAL $filter $f1") || \
echo "----------NOT--IDENTICAL $filter $f1"
diff --speed-large-files\
<(head -$lines $f1 | grep -F "$filter") \
<(head -$lines $f2 | grep -F "$filter") | head -2
done
done


10 changes: 8 additions & 2 deletions src/main/java/au/edu/wehi/idsv/AssemblyEvidenceSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import au.edu.wehi.idsv.sam.SAMFileUtil;
import au.edu.wehi.idsv.sam.SAMRecordUtil;
import au.edu.wehi.idsv.sam.SamTags;
import au.edu.wehi.idsv.util.DebugSpammingIterator;
import au.edu.wehi.idsv.util.FileHelper;
import au.edu.wehi.idsv.visualisation.AssemblyTelemetry;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.MoreExecutors;
import gridss.SoftClipsToSplitReads;
import gridss.cmdline.CommandLineProgramHelper;
Expand Down Expand Up @@ -233,9 +235,13 @@ private void assembleChunk(SAMFileWriter writer, SAMFileWriter filteredWriter, i
try (CloseableIterator<DirectedEvidence> input = mergedIterator(source, expanded, EvidenceSortOrder.SAMRecordStartPosition)) {
Iterator<DirectedEvidence> throttledIt = throttled(input, downsampledRegions);
Iterator<DirectedEvidence> errorCorrectedIt = errorCorrected(throttledIt);
PositionalAssembler assembler = new PositionalAssembler(getContext(), AssemblyEvidenceSource.this, assemblyNameGenerator, errorCorrectedIt, direction, excludedRegions, safetyRegions);
PositionalAssembler positionalAssembler = new PositionalAssembler(getContext(), AssemblyEvidenceSource.this, assemblyNameGenerator, errorCorrectedIt, direction, excludedRegions, safetyRegions);
if (telemetry != null) {
assembler.setTelemetry(telemetry.getTelemetry(chunkNumber, direction));
positionalAssembler.setTelemetry(telemetry.getTelemetry(chunkNumber, direction));
}
Iterator<SAMRecord> assembler = positionalAssembler;
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
assembler = Iterators.peekingIterator(new DebugSpammingIterator<>(assembler, "AssemblyEvidenceSource.assembler"));
}
while (assembler.hasNext()) {
SAMRecord asm = assembler.next();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/au/edu/wehi/idsv/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class Defaults {
public static final boolean SANITY_CHECK_EVIDENCE_TRACKER;
public static final boolean SANITY_CHECK_CLIQUE;
public static final boolean SANITY_CHECK_ITERATORS;
public static final boolean SANITY_CHECK_DUMP_ITERATORS;
public static final boolean SANITY_CHECK_MEMOIZATION;
public static final boolean SANITY_CHECK_MEMOIZATION_ALL_OPERATIONS;
public static final boolean SINGLE_THREAD_LIBSSW;
Expand All @@ -23,6 +24,7 @@ public class Defaults {
SANITY_CHECK_EVIDENCE_TRACKER = Boolean.valueOf(System.getProperty("sanitycheck.evidencetracker", "false"));
SANITY_CHECK_CLIQUE = Boolean.valueOf(System.getProperty("sanitycheck.clique", "false"));
SANITY_CHECK_ITERATORS = Boolean.valueOf(System.getProperty("sanitycheck.iterators", "false"));
SANITY_CHECK_DUMP_ITERATORS = Boolean.valueOf(System.getProperty("sanitycheck.dumpIterators", "false"));
SANITY_CHECK_MEMOIZATION = Boolean.valueOf(System.getProperty("sanitycheck.memoization", "false"));
SANITY_CHECK_MEMOIZATION_ALL_OPERATIONS = Boolean.valueOf(System.getProperty("sanitycheck.memoization.alloperations", "false"));
SINGLE_THREAD_LIBSSW = Boolean.valueOf(System.getProperty("sswjni.sync", "false"));
Expand Down
34 changes: 32 additions & 2 deletions src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,15 @@ public CloseableIterator<DirectedEvidence> iterator(final QueryInterval[] interv
// ignore blacklisted regions
IntervalBed queryInterval = new IntervalBed(getContext().getLinear(), expandedIntervals);
queryInterval.remove(getBlacklistedRegions());
SAMRecordIterator it = tryOpenReader(reader, queryInterval.asQueryInterval());
CloseableIterator it = tryOpenReader(reader, queryInterval.asQueryInterval());
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.iterator().rawiterator"));
}
Iterator<DirectedEvidence> eit = asEvidence(it, eso);
eit = Iterators.filter(eit, e -> QueryIntervalUtil.overlaps(intervals, e.getBreakendSummary()));
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
eit = new AutoClosingIterator<>(new DebugSpammingIterator<>(eit, "SAMEvidenceSource.iterator(QueryInterval[]).filter"));
}
return new AutoClosingIterator<>(eit, reader, it);
}
/**
Expand Down Expand Up @@ -253,6 +259,9 @@ public CloseableIterator<DirectedEvidence> iterator(EvidenceSortOrder eso) {
SAMRecordIterator it = reader.iterator();
it.assertSorted(SortOrder.coordinate);
Iterator<DirectedEvidence> eit = asEvidence(it, eso);
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
eit = new AutoClosingIterator<>(new DebugSpammingIterator<>(eit, "SAMEvidenceSource.iterator()"));
}
return new AutoClosingIterator<>(eit, reader, it);
}
protected SamReader getReader() {
Expand All @@ -276,10 +285,22 @@ public void assertPreprocessingComplete() {
private Iterator<DirectedEvidence> asEvidence(Iterator<SAMRecord> it, EvidenceSortOrder eso) {
it = new BufferedIterator<>(it, 2); // TODO: remove when https://github.com/samtools/htsjdk/issues/760 is resolved
it = Iterators.filter(it, r -> !shouldFilterPreTransform(r));
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.shouldFilterPreTransform"));
}
it = Iterators.transform(it, r -> transform(r));
it = Iterators.filter(it, r -> !shouldFilter(r));
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.transform"));
}
it = Iterators.filter(it, r -> !shouldFilter(r));
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.shouldFilter(SAMRecord)"));
}
Iterator<DirectedEvidence> eit = new DirectedEvidenceIterator(it, this, minIndelSize());
eit = Iterators.filter(eit, e -> !shouldFilter(e));
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
it = new AutoClosingIterator<>(new DebugSpammingIterator<>(it, "SAMEvidenceSource.shouldFilter(DirectedEvidence)"));
}
switch (eso) {
case SAMRecordStartPosition:
// already sorted by coordinate
Expand All @@ -295,6 +316,9 @@ private Iterator<DirectedEvidence> asEvidence(Iterator<SAMRecord> it, EvidenceSo
default:
throw new IllegalArgumentException("Sort order must be specified");
}
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
eit = new DebugSpammingIterator<>(eit, "asEvidence");
}
return eit;
}
private static float average(byte[] values) {
Expand Down Expand Up @@ -552,6 +576,9 @@ public static CloseableIterator<DirectedEvidence> mergedIterator(List<SAMEvidenc
toMerge.add(it);
}
CloseableIterator<DirectedEvidence> merged = new AutoClosingMergedIterator<DirectedEvidence>(toMerge, eso == EvidenceSortOrder.EvidenceStartPosition ? DirectedEvidenceOrder.ByNatural : DirectedEvidenceOrder.BySAMStart);
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
merged = new AutoClosingIterator<>(new DebugSpammingIterator<>(merged, "mergedIterator"));
}
return merged;
}
public static CloseableIterator<DirectedEvidence> mergedIterator(final List<SAMEvidenceSource> source, final QueryInterval[] intervals, EvidenceSortOrder eso) {
Expand All @@ -561,6 +588,9 @@ public static CloseableIterator<DirectedEvidence> mergedIterator(final List<SAME
toMerge.add(it);
}
CloseableIterator<DirectedEvidence> merged = new AutoClosingMergedIterator<DirectedEvidence>(toMerge, eso == EvidenceSortOrder.EvidenceStartPosition ? DirectedEvidenceOrder.ByNatural : DirectedEvidenceOrder.BySAMStart);
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
merged = new AutoClosingIterator<>(new DebugSpammingIterator<>(merged, "mergedIterator"));
}
return merged;
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import au.edu.wehi.idsv.debruijn.ReadErrorCorrector;
import au.edu.wehi.idsv.picard.ReferenceLookup;
import au.edu.wehi.idsv.sam.SamTags;
import au.edu.wehi.idsv.util.FileHelper;
import au.edu.wehi.idsv.util.FilenameUtil;
import au.edu.wehi.idsv.util.IntervalUtil;
import au.edu.wehi.idsv.util.*;
import au.edu.wehi.idsv.visualisation.AssemblyTelemetry.AssemblyChunkTelemetry;
import au.edu.wehi.idsv.visualisation.PositionalDeBruijnGraphTracker;
import com.google.common.collect.*;
Expand Down Expand Up @@ -317,7 +315,7 @@ private NonReferenceContigAssembler createAssembler(Set<DirectedEvidence> preloa
int referenceIndex = inputIterator.peek().getBreakendSummary().referenceIndex;
int firstPosition = inputIterator.peek().getBreakendSummary().start;
currentContig = context.getDictionary().getSequence(referenceIndex).getSequenceName();
ReferenceIndexIterator evidenceIt = new ReferenceIndexIterator(inputIterator, referenceIndex);
PeekingIterator<DirectedEvidence> evidenceIt = new ReferenceIndexIterator(inputIterator, referenceIndex);
evidenceTracker = new EvidenceTracker();
VisualisationConfiguration vis = context.getConfig().getVisualisation();
if (vis.evidenceTracker) {
Expand All @@ -326,25 +324,39 @@ private NonReferenceContigAssembler createAssembler(Set<DirectedEvidence> preloa
File file = new File(vis.directory, filename);
evidenceTracker.setDebugFileOutput(file);
}
SupportNodeIterator supportIt = new SupportNodeIterator(k, evidenceIt, Math.max(2 * source.getMaxReadLength(), source.getMaxConcordantFragmentSize()), evidenceTracker, ap.includePairAnchors, ap.pairAnchorMismatchIgnoreEndBases);
AggregateNodeIterator agIt = new AggregateNodeIterator(supportIt);
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
evidenceIt = Iterators.peekingIterator(new DebugSpammingIterator<>(it, "PositionalAssembler.evidenceIt"));
}
SupportNodeIterator supportIt_raw = new SupportNodeIterator(k, evidenceIt, Math.max(2 * source.getMaxReadLength(), source.getMaxConcordantFragmentSize()), evidenceTracker, ap.includePairAnchors, ap.pairAnchorMismatchIgnoreEndBases);
PeekingIterator<KmerSupportNode> supportIt = supportIt_raw;
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
supportIt = Iterators.peekingIterator(new DebugSpammingIterator<>(supportIt, "PositionalAssembler.SupportNodeIterator"));
}
AggregateNodeIterator agIt_raw = new AggregateNodeIterator(supportIt);
PeekingIterator<KmerNode> agIt = agIt_raw;
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
agIt = Iterators.peekingIterator(new DebugSpammingIterator<>(agIt, "PositionalAssembler.AggregateNodeIterator"));
}
Iterator<KmerNode> knIt = agIt;
if (Defaults.SANITY_CHECK_ASSEMBLY_GRAPH) {
knIt = evidenceTracker.new AggregateNodeAssertionInterceptor(knIt);
}
PathNodeIterator pathNodeIt = new PathNodeIterator(knIt, maxPathLength, k);
PathNodeIterator pathNodeIt = new PathNodeIterator(knIt, maxPathLength, k);
Iterator<KmerPathNode> pnIt = pathNodeIt;
if (Defaults.SANITY_CHECK_ASSEMBLY_GRAPH) {
pnIt = evidenceTracker.new PathNodeAssertionInterceptor(pnIt, "PathNodeIterator");
}
if (Defaults.SANITY_CHECK_DUMP_ITERATORS) {
pnIt = Iterators.peekingIterator(new DebugSpammingIterator<>(pnIt, "PositionalAssembler.PathNodeIterator"));
}
currentAssembler = new NonReferenceContigAssembler(pnIt, referenceIndex, maxEvidenceSupportIntervalWidth, anchorAssemblyLength, k, source, assemblyNameGenerator, evidenceTracker, currentContig, BreakendDirection.Forward, excludedRegions, safetyRegions);
if (vis.assemblyProgress) {
String filename = String.format("positional-%s_%d-%s.csv", context.getDictionary().getSequence(referenceIndex).getSequenceName(), firstPosition, direction);
filename = FilenameUtil.stripInvalidFilenameCharacters(filename);
File file = new File(vis.directory, filename);
PositionalDeBruijnGraphTracker exportTracker;
try {
exportTracker = new PositionalDeBruijnGraphTracker(file, supportIt, agIt, pathNodeIt, null, evidenceTracker, currentAssembler);
exportTracker = new PositionalDeBruijnGraphTracker(file, supportIt_raw, agIt_raw, pathNodeIt, null, evidenceTracker, currentAssembler);
exportTracker.writeHeader();
currentAssembler.setExportTracker(exportTracker);
} catch (IOException e) {
Expand Down
57 changes: 57 additions & 0 deletions src/main/java/au/edu/wehi/idsv/util/DebugSpammingIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package au.edu.wehi.idsv.util;

import htsjdk.samtools.util.RuntimeIOException;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Iterator;

public class DebugSpammingIterator<T> implements Iterator<T> {
private static final int FLUSH_INTERVAL = 1024;
private static final HashMap<String, PrintWriter> out = new HashMap<>();
private final Iterator<T> underlying;
private final long maxRecords;
private final String id;
private long records = 0;
public DebugSpammingIterator(Iterator<T> underlying, String id) {
this(underlying, id, 1024 * 1024 * 1024);
}
public DebugSpammingIterator(Iterator<T> underlying, String id, long maxRecords) {
this.underlying = underlying;
this.id = id;
this.maxRecords = maxRecords;
}
@Override
public boolean hasNext() {
return underlying.hasNext();
}

@Override
public T next() {
T r = underlying.next();
String threadName = Thread.currentThread().getName();
records++;
if (records < maxRecords) {
if (!out.containsKey(threadName)) {
int i = 1;
while (new File("DebugSpammingIterator" + i + "." + threadName + ".log").exists()) {
i++;
}
try {
out.put(threadName, new PrintWriter(new File("DebugSpammingIterator" + i + "." + threadName + ".log")));
} catch (FileNotFoundException e) {
throw new RuntimeIOException(e);
}
}
String msg = String.format("%s\t%s\t%s\n", Thread.currentThread().getName(), id, r);
out.get(threadName).write(msg);
} else if (records == maxRecords || records % FLUSH_INTERVAL == 0) {
synchronized (out) {
out.get(threadName).flush();
}
}
return r;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public abstract class DensityThrottlingIterator<T> implements PeekingIterator<T>
/**
* @param it iterator to filter. Cannot contain null elements
* @param windowSize Size of window to track density over
* @param acceptAllCount number of record in window before throttling starts
* @param maxCount maximum average number of records in window
*/
public DensityThrottlingIterator(Iterator<T> it, int windowSize, double acceptDensity, double targetDensity) {
this.underlying = it;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,40 @@ public void should_use_exponential_backoff_to_throttle_above_unconditional_accep
List<Integer> result = Lists.newArrayList(new IntDensityThrottlingIterator(input.iterator(), 4, 2.0, 4.0));
assertEquals(2048, result.size(), 64);
}
public class IntRecord {
public final int value;
public final long position;
public IntRecord(int value, long position) {
this.value = value;
this.position = position;
}
}
public class IntRecordRecordDensityThrottlingIterator extends DensityThrottlingIterator<IntRecord> {
public IntRecordRecordDensityThrottlingIterator(Iterator<IntRecord> it, int windowSize, double acceptDensity, double targetDensity) {
super(it, windowSize, acceptDensity, targetDensity);
}
@Override
protected long getPosition(IntRecord record) {
return record.position;
}

@Override
protected boolean excludedFromThrottling(IntRecord record) {
return false;
}
}
@Test
public void should_be_deterministic() {
List<IntRecord> input = new ArrayList<IntRecord>();
for (int i = 0; i < 1024; i++) {
int value = ~i & 0xFF;
for (long pos = 0; pos < 128; pos++) {
input.add(new IntRecord(value, pos));
}
}
List<IntRecord> result1 = Lists.newArrayList(new IntRecordRecordDensityThrottlingIterator(input.iterator(), 4, 2.0, 4.0));
List<IntRecord> result2 = Lists.newArrayList(new IntRecordRecordDensityThrottlingIterator(input.iterator(), 4, 2.0, 4.0));

assertEquals(result1, result2);
}
}

0 comments on commit 92cb449

Please sign in to comment.