Skip to content

Commit

Permalink
TEZ-4061: InputAttemptIdentifier and CompositeInputAttemptIdentifier …
Browse files Browse the repository at this point in the history
…cannot be compared for equality (#326) (Seonggon Namgung reviewed by Laszlo Bodor)
  • Loading branch information
ngsg authored Dec 25, 2024
1 parent d39603c commit c77d37e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime.library.common;

import com.google.common.collect.Range;
import org.apache.hadoop.classification.InterfaceAudience.Private;

/**
Expand Down Expand Up @@ -50,6 +51,14 @@ public InputAttemptIdentifier expand(int inputIdentifierOffset) {
return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId());
}

public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) {
Range<Integer> inputRange =
Range.closedOpen(super.getInputIdentifier(), super.getInputIdentifier() + inputIdentifierCount);

return inputRange.contains(thatInputAttemptIdentifier.getInputIdentifier()) &&
super.getAttemptNumber() == thatInputAttemptIdentifier.getAttemptNumber();
}

// PathComponent & shared does not need to be part of the hashCode and equals computation.
@Override
public int hashCode() {
Expand All @@ -63,6 +72,6 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return super.toString();
return super.toString() + ", count=" + inputIdentifierCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ public boolean canRetrieveInputInChunks() {
(fetchTypeInfo == SPILL_INFO.FINAL_UPDATE.ordinal());
}

/**
* Checks whether this InputAttemptIdentifier includes the given InputAttemptIdentifier.
* It is used when we obsolete InputAttemptIdentifiers that include a FetchFailure reported one.
*
* @param thatInputAttemptIdentifier The InputAttemptIdentifier to check for inclusion.
* @return True if the current identifier includes the given one, false otherwise.
*/
public boolean includes(InputAttemptIdentifier thatInputAttemptIdentifier) {
return this.inputIdentifier == thatInputAttemptIdentifier.getInputIdentifier() &&
this.attemptNumber == thatInputAttemptIdentifier.getAttemptNumber();
}

// PathComponent & shared does not need to be part of the hashCode and equals computation.
@Override
public int hashCode() {
Expand Down Expand Up @@ -139,6 +151,6 @@ public boolean equals(Object obj) {
public String toString() {
return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+ ", attemptNumber=" + attemptNumber + ", pathComponent="
+ pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId +"]";
+ pathComponent + ", spillType=" + fetchTypeInfo + ", spillId=" + spillEventId + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ private void processShufflePayload(DataMovementEventPayloadProto shufflePayload,

private void processInputFailedEvent(InputFailedEvent ife) {
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
LOG.info("Marking obsolete input: {} {}", inputContext.getSourceVertexName(), srcAttemptIdentifier);
shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,9 @@ Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
} else {
alreadyCompleted = completedInputSet.get(input.getInputIdentifier());
}

// Avoid adding attempts which have already completed or have been marked as OBSOLETE
if (alreadyCompleted || obsoletedInputs.contains(input)) {
if (alreadyCompleted || isObsoleteInputAttemptIdentifier(input)) {
inputIter.remove();
continue;
}
Expand Down Expand Up @@ -949,10 +950,14 @@ public void fetchFailed(String host,
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
InputAttemptIdentifier srcAttemptIdentifier = inputAttemptFetchFailure.getInputAttemptIdentifier();
if (isObsoleteInputAttemptIdentifier(srcAttemptIdentifier)) {
LOG.info("Do not report obsolete input: " + srcAttemptIdentifier);
return;
}
LOG.info(
"{}: Fetch failed for src: {} InputIdentifier: {}, connectFailed: {}, "
"{}: Fetch failed for InputIdentifier: {}, connectFailed: {}, "
+ "local fetch: {}, remote fetch failure reported as local failure: {})",
sourceDestNameTrimmed, srcAttemptIdentifier, srcAttemptIdentifier, connectFailed,
sourceDestNameTrimmed, srcAttemptIdentifier, connectFailed,
inputAttemptFetchFailure.isLocalFetch(), inputAttemptFetchFailure.isDiskErrorAtSource());
failedShufflesCounter.increment(1);
inputContext.notifyProgress();
Expand Down Expand Up @@ -984,6 +989,22 @@ public void fetchFailed(String host,
}
}
}

private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) {
if (input == null) {
return false;
}
InputAttemptIdentifier obsoleteInput;
Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoletedInputs.iterator();
while (obsoleteInputsIter.hasNext()) {
obsoleteInput = obsoleteInputsIter.next();
if (input.includes(obsoleteInput)) {
return true;
}
}
return false;
}

/////////////////// End of Methods from FetcherCallbackHandler

public void shutdown() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,19 @@ private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
} else {
isInputFinished = isInputFinished(id.getInputIdentifier());
}
return !obsoleteInputs.contains(id) && !isInputFinished;
return !isObsoleteInputAttemptIdentifier(id) && !isInputFinished;
}

private boolean isObsoleteInputAttemptIdentifier(InputAttemptIdentifier input) {
InputAttemptIdentifier obsoleteInput;
Iterator<InputAttemptIdentifier> obsoleteInputsIter = obsoleteInputs.iterator();
while (obsoleteInputsIter.hasNext()) {
obsoleteInput = obsoleteInputsIter.next();
if (input.includes(obsoleteInput)) {
return true;
}
}
return false;
}

public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,23 @@ public void testInputAttemptIdentifier() {
Assert.assertTrue(set.add(i4));
}

@Test(timeout = 5000)
public void testInputAttemptIdentifierIncludes() {
InputAttemptIdentifier inputData0Attempt0 = new InputAttemptIdentifier(0, 0);
InputAttemptIdentifier inputData1Attempt0 = new InputAttemptIdentifier(1, 0);
InputAttemptIdentifier inputData2Attempt0 = new InputAttemptIdentifier(2, 0);
InputAttemptIdentifier inputData3Attempt0 = new InputAttemptIdentifier(3, 0);
InputAttemptIdentifier inputData1Attempt1 = new InputAttemptIdentifier(1, 1);
CompositeInputAttemptIdentifier inputData12Attempt0 = new CompositeInputAttemptIdentifier(1, 0, null, 2);

Assert.assertTrue(inputData1Attempt0.includes(inputData1Attempt0));
Assert.assertFalse(inputData1Attempt0.includes(inputData2Attempt0));
Assert.assertFalse(inputData1Attempt0.includes(inputData1Attempt1));

Assert.assertFalse(inputData12Attempt0.includes(inputData0Attempt0));
Assert.assertTrue(inputData12Attempt0.includes(inputData1Attempt0));
Assert.assertTrue(inputData12Attempt0.includes(inputData2Attempt0));
Assert.assertFalse(inputData12Attempt0.includes(inputData3Attempt0));
Assert.assertFalse(inputData12Attempt0.includes(inputData1Attempt1));
}
}

0 comments on commit c77d37e

Please sign in to comment.