diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java index 30295bd399..e07e687664 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.library.common; +import com.google.common.collect.Range; import org.apache.hadoop.classification.InterfaceAudience.Private; /** @@ -50,6 +51,14 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) { return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); } + public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) { + Range inputRange = + Range.closedOpen(super.getInputIdentifier(), super.getInputIdentifier() + inputIdentifierCount); + + return inputRange.contains(thatInputAttemptIdentifier.getInputIdentifier()) && + super.getAttemptNumber() == thatInputAttemptIdentifier.getAttemptNumber(); + } + // PathComponent & shared does not need to be part of the hashCode and equals computation. @Override public int hashCode() { @@ -63,6 +72,6 @@ public boolean equals(Object obj) { @Override public String toString() { - return super.toString(); + return super.toString() + ", count=" + inputIdentifierCount; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java index 16172e1daf..d1d5aeda1a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/InputAttemptIdentifier.java @@ -108,6 +108,18 @@ public boolean canRetrieveInputInChunks() { (fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal()); } + /** + * Checks whether this InputAttemptIdentifier includes the given InputAttemptIdentifier. + * It is used when we obsolete InputAttemptIdentifiers that include a FetchFailure reported one. + * + * @param thatInputAttemptIdentifier The InputAttemptIdentifier to check for inclusion. + * @return True if the current identifier includes the given one, false otherwise. + */ + public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) { + return this.inputIdentifier == thatInputAttemptIdentifier.getInputIdentifier() && + this.attemptNumber == thatInputAttemptIdentifier.getAttemptNumber(); + } + // PathComponent & shared does not need to be part of the hashCode and equals computation. @Override public int hashCode() { @@ -139,6 +151,6 @@ public boolean equals(Object obj) { public String toString() { return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier + ", attemptNumber=" + attemptNumber + ", pathComponent=" - + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]"; + + pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId + "]"; } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index 4f42f57a1e..56b8cd4a08 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -282,6 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload, private void processInputFailedEvent(InputFailedEvent ife) { InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion()); + LOG.info("Marking obsolete input: {} {}", inputContext.getSourceVertexName(), srcAttemptIdentifier); shuffleManager.obsoleteKnownInput(srcAttemptIdentifier); } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 769ac68f7e..646194c6d7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -583,8 +583,9 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { } else { alreadyCompleted = completedInputSet.get(input.getInputIdentifier()); } + // Avoid adding attempts which have already completed or have been marked as OBSOLETE - if (alreadyCompleted || obsoletedInputs.contains(input)) { + if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) { inputIter.remove(); continue; } @@ -949,10 +950,14 @@ public void fetchFailed(String host, // TODO NEWTEZ. Implement logic to report fetch failures after a threshold. // For now, reporting immediately. InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier(); + if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) { + LOG.info("Do not report obsolete input: " + srcAttemptIdentifier); + return; + } LOG.info( - "{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, " + "{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, " + "local fetch: {}, remote fetch failure reported as local failure: {})", - sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed, + sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed, inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource()); failedShufflesCounter.increment(1); inputContext.notifyProgress(); @@ -984,6 +989,22 @@ public void fetchFailed(String host, } } } + + private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { + if (input == null) { + return false; + } + InputAttemptIdentifier obsoleteInput; + Iterator obsoleteInputsIter = obsoletedInputs.iterator(); + while (obsoleteInputsIter.hasNext()) { + obsoleteInput = obsoleteInputsIter.next(); + if (input.includes(obsoleteInput)) { + return true; + } + } + return false; + } + /////////////////// End of Methods from FetcherCallbackHandler public void shutdown() throws InterruptedException { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index f68ab948ba..3fc7d63059 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -1175,7 +1175,19 @@ private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) { } else { isInputFinished = isInputFinished(id.getInputIdentifier()); } - return !obsoleteInputs.contains(id) && !isInputFinished; + return !isObsoleteInputAttemptIdentifier(id) && !isInputFinished; + } + + private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) { + InputAttemptIdentifier obsoleteInput; + Iterator obsoleteInputsIter = obsoleteInputs.iterator(); + while (obsoleteInputsIter.hasNext()) { + obsoleteInput = obsoleteInputsIter.next(); + if (input.includes(obsoleteInput)) { + return true; + } + } + return false; } public synchronized List getMapsForHost(MapHost host) { diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java index 6b82a9d27d..5eb3b5030a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestInputIdentifiers.java @@ -41,4 +41,23 @@ public void testInputAttemptIdentifier() { Assert.assertTrue(set.add(i4)); } + @Test(timeout = 5000) + public void testInputAttemptIdentifierIncludes() { + InputAttemptIdentifier inputData0Attempt0 = new InputAttemptIdentifier(0, 0); + InputAttemptIdentifier inputData1Attempt0 = new InputAttemptIdentifier(1, 0); + InputAttemptIdentifier inputData2Attempt0 = new InputAttemptIdentifier(2, 0); + InputAttemptIdentifier inputData3Attempt0 = new InputAttemptIdentifier(3, 0); + InputAttemptIdentifier inputData1Attempt1 = new InputAttemptIdentifier(1, 1); + CompositeInputAttemptIdentifier inputData12Attempt0 = new CompositeInputAttemptIdentifier(1, 0, null, 2); + + Assert.assertTrue(inputData1Attempt0.includes(inputData1Attempt0)); + Assert.assertFalse(inputData1Attempt0.includes(inputData2Attempt0)); + Assert.assertFalse(inputData1Attempt0.includes(inputData1Attempt1)); + + Assert.assertFalse(inputData12Attempt0.includes(inputData0Attempt0)); + Assert.assertTrue(inputData12Attempt0.includes(inputData1Attempt0)); + Assert.assertTrue(inputData12Attempt0.includes(inputData2Attempt0)); + Assert.assertFalse(inputData12Attempt0.includes(inputData3Attempt0)); + Assert.assertFalse(inputData12Attempt0.includes(inputData1Attempt1)); + } }