Skip to content

Commit

Permalink
[FLINK-36623] Improve logging in DefaultStateTransitionManager (apach…
Browse files Browse the repository at this point in the history
…e#25610)

* The JobId is available in AdaptiveScheduler states
* The jobId is added to logs in DefaultStateTransitionManager
* Improved  logging in DefaultStateTransitionManager
  • Loading branch information
ztison authored Nov 5, 2024
1 parent 0b64b57 commit 21eea0d
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,11 @@ private JobSchedulingPlan determineParallelism(
"Not enough resources available for scheduling."));
}

@Override
public JobID getJobId() {
return jobInfo.getJobId();
}

@Override
public ArchivedExecutionGraph getArchivedExecutionGraph(
JobStatus jobStatus, @Nullable Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -109,13 +110,13 @@ public class DefaultStateTransitionManager implements StateTransitionManager {

@Override
public void onChange() {
LOG.debug("OnChange event received in phase {}.", getPhase());
LOG.debug("OnChange event received in phase {} for job {}.", getPhase(), getJobId());
phase.onChange();
}

@Override
public void onTrigger() {
LOG.debug("OnTrigger event received in phase {}.", getPhase());
LOG.debug("OnTrigger event received in phase {} for job {}.", getPhase(), getJobId());
phase.onTrigger();
}

Expand Down Expand Up @@ -157,7 +158,7 @@ private void progressToPhase(Phase newPhase) {
Preconditions.checkState(
!(phase instanceof Transitioning),
"The state transition operation has already been triggered.");
LOG.info("Transitioning from {} to {}.", phase, newPhase);
LOG.info("Transitioning from {} to {}, job {}.", phase, newPhase, getJobId());
phase = newPhase;
}

Expand All @@ -172,12 +173,17 @@ private void runIfPhase(Phase expectedPhase, Runnable callback) {
callback.run();
} else {
LOG.debug(
"Ignoring scheduled action because expected phase {} is not the actual phase {}.",
"Ignoring scheduled action because expected phase {} is not the actual phase {}, job {}.",
expectedPhase,
getPhase());
getPhase(),
getJobId());
}
}

private JobID getJobId() {
return transitionContext.getJobId();
}

/**
* A phase in the state machine of the {@link DefaultStateTransitionManager}. Each phase is
* responsible for a specific part of the state transition process.
Expand Down Expand Up @@ -233,6 +239,10 @@ void onTrigger() {}
public String toString() {
return getClass().getSimpleName();
}

JobID getJobId() {
return context.getJobId();
}
}

/**
Expand Down Expand Up @@ -350,11 +360,14 @@ private void scheduleTransitionEvaluation() {

private void transitionToSubSequentStateForDesiredResources() {
if (hasDesiredResources()) {
LOG.info("Desired resources are met, transitioning to the subsequent state.");
LOG.info(
"Desired resources are met, transitioning to the subsequent state, job {}.",
getJobId());
context().triggerTransitionToSubsequentState();
} else {
LOG.debug(
"Desired resources are not met, skipping the transition to the subsequent state.");
"Desired resources are not met, skipping the transition to the subsequent state, job {}.",
getJobId());
}
}
}
Expand All @@ -373,16 +386,28 @@ private Stabilized(
Temporal firstChangeEventTimestamp,
Duration maxTriggerDelay) {
super(clock, context);
this.scheduleRelativelyTo(this::onTrigger, firstChangeEventTimestamp, maxTriggerDelay);
this.scheduleRelativelyTo(
() -> {
LOG.info(
"Scheduled onTrigger event fired in Stabilized phase, job {}.",
getJobId());
onTrigger();
},
firstChangeEventTimestamp,
maxTriggerDelay);
}

@Override
void onTrigger() {
if (hasSufficientResources()) {
LOG.info("Sufficient resources are met, progressing to subsequent state.");
LOG.info(
"Sufficient resources are met, progressing to subsequent state, job {}.",
getJobId());
context().triggerTransitionToSubsequentState();
} else {
LOG.debug("Sufficient resources are not met, progressing to idling.");
LOG.debug(
"Sufficient resources are not met, progressing to idling, job {}.",
getJobId());
context().progressToIdling();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;

Expand Down Expand Up @@ -56,13 +57,16 @@ public ArchivedExecutionGraph getJob() {
return archivedExecutionGraph;
}

@Override
public JobID getJobId() {
return archivedExecutionGraph.getJobID();
}

@Override
public void handleGlobalFailure(
Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
logger.debug(
"Ignore global failure because we already finished the job {}.",
archivedExecutionGraph.getJobID(),
cause);
"Ignore global failure because we already finished the job {}.", getJobId(), cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.util.function.FunctionWithException;
Expand Down Expand Up @@ -51,6 +52,15 @@ default void onLeave(Class<? extends State> newState) {}
*/
void suspend(Throwable cause);

/**
* Gets the {@link JobID} of the job. The implementation should avoid to use the {@link
* State#getJob()} method as it may create the {@link ArchivedExecutionGraph} which is
* expensive.
*
* @return the {@link JobID} of the job
*/
JobID getJobId();

/**
* Gets the current {@link JobStatus}. The returned job status will remain unchanged at least
* until the scheduler transitions to a different state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobID;

import java.time.Duration;
import java.util.concurrent.ScheduledFuture;

Expand Down Expand Up @@ -68,5 +70,12 @@ interface Context {
* @return a ScheduledFuture representing pending completion of the operation.
*/
ScheduledFuture<?> scheduleOperation(Runnable callback, Duration delay);

/**
* Gets the {@link JobID} of the job.
*
* @return the {@link JobID} of the job
*/
JobID getJobId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ ExecutionGraph getExecutionGraph() {
return executionGraph;
}

JobID getJobId() {
@Override
public JobID getJobId() {
return executionGraph.getJobID();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;

Expand Down Expand Up @@ -53,6 +54,11 @@ public void suspend(Throwable cause) {
context.goToFinished(context.getArchivedExecutionGraph(JobStatus.SUSPENDED, cause));
}

@Override
public JobID getJobId() {
return context.getJobId();
}

@Override
public ArchivedExecutionGraph getJob() {
return context.getArchivedExecutionGraph(getJobStatus(), null);
Expand All @@ -72,6 +78,13 @@ public Logger getLogger() {
/** Context of the {@link StateWithoutExecutionGraph} state. */
interface Context extends StateTransitions.ToFinished {

/**
* Gets the {@link JobID} of the job.
*
* @return the {@link JobID} of the job
*/
JobID getJobId();

/**
* Creates the {@link ArchivedExecutionGraph} for the given job status and cause. Cause can
* be null if there is no failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Idling;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -464,6 +465,7 @@ private static class TestingStateTransitionManagerContext
// Instant.MIN makes debugging easier because timestamps become human-readable
private final Instant initializationTime = Instant.MIN;
private Duration elapsedTime = Duration.ZERO;
private final JobID jobId = new JobID();

// ///////////////////////////////////////////////
// Context creation
Expand Down Expand Up @@ -537,6 +539,11 @@ public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration delay) {
return scheduledTask;
}

@Override
public JobID getJobId() {
return jobId;
}

// ///////////////////////////////////////////////
// Test instance creation
// ///////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,11 @@ public void cancel() {}
@Override
public void suspend(Throwable cause) {}

@Override
public JobID getJobId() {
return null;
}

@Override
public JobStatus getJobStatus() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
Expand All @@ -38,6 +39,7 @@ class MockStateWithoutExecutionGraphContext
new StateValidator<>("Finished");

private boolean hasStateTransition = false;
private final JobID jobId = new JobID();

public void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
finishedStateValidator.expectInput(asserter);
Expand All @@ -49,6 +51,11 @@ public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
registerStateTransition();
}

@Override
public JobID getJobId() {
return jobId;
}

@Override
public ArchivedExecutionGraph getArchivedExecutionGraph(
JobStatus jobStatus, @Nullable Throwable cause) {
Expand Down

0 comments on commit 21eea0d

Please sign in to comment.