Skip to content

Commit

Permalink
TEZ-4569: SCATTER_GATHER + BROADCAST hangs on DAG Recovery (#361) (Sh…
Browse files Browse the repository at this point in the history
…ohei Okumiya reviewed by Laszlo Bodor)
  • Loading branch information
okumin authored Dec 23, 2024
1 parent 9efa6f1 commit 44c4f1e
Show file tree
Hide file tree
Showing 2 changed files with 554 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,7 @@ private VertexState setupVertex() {
: rootInputDescriptors.values()) {
if (input.getControllerDescriptor() != null &&
input.getControllerDescriptor().getClassName() != null) {
if (inputsWithInitializers == null) {
if (!hasInputInitializers()) {
inputsWithInitializers = Sets.newHashSet();
}
inputsWithInitializers.add(input.getName());
Expand All @@ -2771,7 +2771,7 @@ private VertexState setupVertex() {
}
}

if (hasBipartite && inputsWithInitializers != null) {
if (hasBipartite && hasInputInitializers()) {
LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
return finished(VertexState.FAILED);
}
Expand Down Expand Up @@ -2819,6 +2819,22 @@ private VertexState setupVertex() {
return VertexState.INITED;
}

private boolean hasInputInitializers() {
return inputsWithInitializers != null;
}

private boolean usesRootInputVertexManager() {
// RootInputVertexManager can start tasks even though any parents are not fully initialized.
if (vertexPlan.hasVertexManagerPlugin()) {
final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
.convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan.getVertexManagerPlugin());
return pluginDesc.getClassName().equals(RootInputVertexManager.class.getName());
} else {
// This case implicitly uses RootInputVertexManager. See VertexImpl#assignVertexManager
return hasInputInitializers();
}
}

private boolean isVertexInitSkippedInParentVertices() {
for (Map.Entry<Vertex, Edge> entry : sourceVertices.entrySet()) {
if(!(((VertexImpl) entry.getKey()).isVertexInitSkipped())) {
Expand All @@ -2828,24 +2844,36 @@ private boolean isVertexInitSkippedInParentVertices() {
return true;
}

private void assignVertexManager() throws TezException {
private boolean canSkipInitialization() {
// condition for skip initializing stage
// - VertexInputInitializerEvent is seen
// - VertexReconfigureDoneEvent is seen
// - Reason to check whether VertexManager has complete its responsibility
// - VertexInitializedEvent is seen
// - VertexConfigurationDoneEvent is seen
// - Reason to check whether VertexManager has completed its responsibility
// VertexManager actually is involved in the InputInitializer (InputInitializer generate events
// and send them to VertexManager which do some processing and send back to Vertex), so that means
// Input initializer will affect on the VertexManager and we couldn't skip the initializing step if
// Input initializer will affect on the VertexManager and we couldn't skip the initializing step if
// VertexManager has not completed its responsibility.
// - Why using VertexReconfigureDoneEvent
// - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex
// VertexReconfigureDoneEvent will be logged
// - Why using VertexConfigurationDoneEvent
// - VertexConfigurationDoneEvent represent the case that user use API reconfigureVertex
// VertexConfigurationDoneEvent will be logged
// - TaskStartEvent is seen in that vertex or setVertexParallelism is called
// - All the parent vertices have skipped initializing stage while recovering
if (recoveryData != null && recoveryData.shouldSkipInit()
&& (recoveryData.isVertexTasksStarted() ||
recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled())
&& isVertexInitSkippedInParentVertices()) {
// - Or RootInputVertexManager is used, which can start without waiting for parent vertices
if (recoveryData == null) {
return false;
}
if (!recoveryData.shouldSkipInit()) {
return false;
}
if (!recoveryData.isVertexTasksStarted()
&& !recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) {
return false;
}
return isVertexInitSkippedInParentVertices() || usesRootInputVertexManager();
}

private void assignVertexManager() throws TezException {
if (canSkipInitialization()) {
// Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent();
if (LOG.isInfoEnabled()) {
Expand Down Expand Up @@ -2909,7 +2937,7 @@ && isVertexInitSkippedInParentVertices()) {
// If there is a one to one edge then we use the InputReadyVertexManager
// If there is a scatter-gather edge then we use the ShuffleVertexManager
// Else we use the default ImmediateStartVertexManager
if (inputsWithInitializers != null) {
if (hasInputInitializers()) {
LOG.info("Setting vertexManager to RootInputVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(RootInputVertexManager
Expand Down Expand Up @@ -3084,7 +3112,7 @@ private VertexState handleInitEvent(VertexImpl vertex) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
+ " to set #tasks for the vertex " + vertex.getLogIdentifier());

if (vertex.inputsWithInitializers != null) {
if (vertex.hasInputInitializers()) {
if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
Expand Down Expand Up @@ -3123,8 +3151,7 @@ private VertexState handleInitEvent(VertexImpl vertex) {
LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier);
vertex.createTasks();
// this block may return VertexState.INITIALIZING
if (vertex.inputsWithInitializers != null &&
(vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
if (vertex.hasInputInitializers() && (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
vertex.setupInputInitializerManager();
Expand Down
Loading

0 comments on commit 44c4f1e

Please sign in to comment.