Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph #25798

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,7 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception {
Collections.singletonMap(JobStatus.RUNNING, 1L),
jobVertexDetailsInfos,
Collections.singletonMap(ExecutionState.RUNNING, 1),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"),
new JobPlanInfo.RawJson("{\"id\":\"1234\"}"));
final TestJobDetailsInfoHandler jobDetailsInfoHandler =
new TestJobDetailsInfoHandler(jobDetailsInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends JobStatusProvider {
*/
String getJsonPlan();

/**
* Returns the stream graph as a JSON string.
*
* @return stream graph as a JSON string
*/
String getJsonStreamGraph();

/**
* Returns the {@link JobID} for this execution graph.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl

@Nullable private final String changelogStorageName;

@Nullable private final String jsonStreamGraph;

public ArchivedExecutionGraph(
JobID jobID,
String jobName,
Expand All @@ -132,7 +134,8 @@ public ArchivedExecutionGraph(
@Nullable String stateBackendName,
@Nullable String checkpointStorageName,
@Nullable TernaryBoolean stateChangelogEnabled,
@Nullable String changelogStorageName) {
@Nullable String changelogStorageName,
@Nullable String jsonStreamGraph) {

this.jobID = Preconditions.checkNotNull(jobID);
this.jobName = Preconditions.checkNotNull(jobName);
Expand All @@ -153,6 +156,7 @@ public ArchivedExecutionGraph(
this.checkpointStorageName = checkpointStorageName;
this.stateChangelogEnabled = stateChangelogEnabled;
this.changelogStorageName = changelogStorageName;
this.jsonStreamGraph = jsonStreamGraph;
}

// --------------------------------------------------------------------------------------------
Expand All @@ -162,6 +166,11 @@ public String getJsonPlan() {
return jsonPlan;
}

@Override
public String getJsonStreamGraph() {
return jsonStreamGraph;
}

@Override
public JobID getJobID() {
return jobID;
Expand Down Expand Up @@ -366,7 +375,8 @@ public static ArchivedExecutionGraph createFrom(
executionGraph.getStateBackendName().orElse(null),
executionGraph.getCheckpointStorageName().orElse(null),
executionGraph.isChangelogStateBackendEnabled(),
executionGraph.getChangelogStorageName().orElse(null));
executionGraph.getChangelogStorageName().orElse(null),
executionGraph.getJsonStreamGraph());
}

/**
Expand Down Expand Up @@ -487,6 +497,7 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph(
checkpointingSettings == null
? TernaryBoolean.UNDEFINED
: checkpointingSettings.isChangelogStateBackendEnabled(),
checkpointingSettings == null ? null : "Unknown");
checkpointingSettings == null ? null : "Unknown",
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,11 @@ public void setJsonPlan(String jsonPlan) {
this.jsonPlan = jsonPlan;
}

@Override
public String getJsonStreamGraph() {
return executionPlanSchedulingContext.getJsonStreamGraph();
}

@Override
public String getJsonPlan() {
return jsonPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
Expand All @@ -34,6 +37,7 @@

import java.io.StringWriter;
import java.util.List;
import java.util.Map;

@Internal
public class JsonPlanGenerator {
Expand Down Expand Up @@ -170,4 +174,58 @@ public static String generatePlan(
throw new RuntimeException("Failed to generate plan", e);
}
}

public static String generateJsonStreamGraph(
ImmutableStreamGraph sg,
Map<Integer, JobVertexID> jobVertexIdMap,
int pendingOperatorCount) {
try (final StringWriter writer = new StringWriter(1024)) {
try (final JsonGenerator gen = new JsonFactory().createGenerator(writer)) {
// start of everything
gen.writeStartObject();

gen.writeNumberField("pending_operator_count", pendingOperatorCount);

gen.writeArrayFieldStart("nodes");

// info per vertex
for (ImmutableStreamNode node : sg.getStreamNodes()) {
gen.writeStartObject();
gen.writeStringField("id", String.valueOf(node.getId()));
gen.writeNumberField("parallelism", node.getParallelism());
gen.writeStringField("operator", node.getOperatorName());
gen.writeStringField("description", node.getOperatorDescription());
if (jobVertexIdMap.containsKey(node.getId())) {
gen.writeStringField(
"job_vertex_id", jobVertexIdMap.get(node.getId()).toString());
}

// write the input edge properties
gen.writeArrayFieldStart("inputs");

List<ImmutableStreamEdge> inEdges = node.getInEdges();
for (int inputNum = 0; inputNum < inEdges.size(); inputNum++) {
ImmutableStreamEdge edge = inEdges.get(inputNum);
gen.writeStartObject();
gen.writeNumberField("num", inputNum);
gen.writeStringField("id", String.valueOf(edge.getSourceId()));
gen.writeStringField("ship_strategy", edge.getPartitioner().toString());
gen.writeStringField("exchange", edge.getExchangeMode().name());
gen.writeEndObject();
}

gen.writeEndArray();

gen.writeEndObject();
}

// end of everything
gen.writeEndArray();
gen.writeEndObject();
}
return writer.toString();
} catch (Exception e) {
throw new RuntimeException("Failed to generate json stream plan", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -141,6 +142,11 @@ private static JobDetailsInfo createJobDetailsInfo(
executionState, jobVerticesPerState[executionState.ordinal()]);
}

JobPlanInfo.RawJson jsonStreamGraph = null;
if (!StringUtils.isNullOrWhitespaceOnly(executionGraph.getJsonStreamGraph())) {
jsonStreamGraph = new JobPlanInfo.RawJson(executionGraph.getJsonStreamGraph());
}

return new JobDetailsInfo(
executionGraph.getJobID(),
executionGraph.getJobName(),
Expand All @@ -155,7 +161,8 @@ private static JobDetailsInfo createJobDetailsInfo(
timestamps,
jobVertexInfos,
jobVerticesPerStateMap,
new JobPlanInfo.RawJson(executionGraph.getJsonPlan()));
new JobPlanInfo.RawJson(executionGraph.getJsonPlan()),
jsonStreamGraph);
}

private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class JobDetailsInfo implements ResponseBody {

public static final String FIELD_NAME_JSON_PLAN = "plan";

public static final String FIELD_NAME_JSON_STREAM_GRAPH_PLAN = "stream-graph-plan";

@JsonProperty(FIELD_NAME_JOB_ID)
@JsonSerialize(using = JobIDSerializer.class)
private final JobID jobId;
Expand Down Expand Up @@ -122,6 +124,9 @@ public class JobDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_JSON_PLAN)
private final JobPlanInfo.RawJson jsonPlan;

@JsonProperty(FIELD_NAME_JSON_STREAM_GRAPH_PLAN)
private final JobPlanInfo.RawJson jsonStreamGraphPlan;

@JsonCreator
public JobDetailsInfo(
@JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID)
Expand All @@ -140,7 +145,9 @@ public JobDetailsInfo(
Collection<JobVertexDetailsInfo> jobVertexInfos,
@JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE)
Map<ExecutionState, Integer> jobVerticesPerState,
@JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan) {
@JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan,
@JsonProperty(FIELD_NAME_JSON_STREAM_GRAPH_PLAN)
JobPlanInfo.RawJson jsonStreamGraphPlan) {
this.jobId = Preconditions.checkNotNull(jobId);
this.name = Preconditions.checkNotNull(name);
this.isStoppable = isStoppable;
Expand All @@ -155,6 +162,7 @@ public JobDetailsInfo(
this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos);
this.jobVerticesPerState = Preconditions.checkNotNull(jobVerticesPerState);
this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
this.jsonStreamGraphPlan = jsonStreamGraphPlan;
}

@Override
Expand All @@ -179,7 +187,8 @@ public boolean equals(Object o) {
&& Objects.equals(timestamps, that.timestamps)
&& Objects.equals(jobVertexInfos, that.jobVertexInfos)
&& Objects.equals(jobVerticesPerState, that.jobVerticesPerState)
&& Objects.equals(jsonPlan, that.jsonPlan);
&& Objects.equals(jsonPlan, that.jsonPlan)
&& Objects.equals(jsonStreamGraphPlan, that.jsonStreamGraphPlan);
}

@Override
Expand All @@ -198,7 +207,8 @@ public int hashCode() {
timestamps,
jobVertexInfos,
jobVerticesPerState,
jsonPlan);
jsonPlan,
jsonStreamGraphPlan);
}

@JsonIgnore
Expand Down Expand Up @@ -271,6 +281,11 @@ public String getJsonPlan() {
return jsonPlan.toString();
}

@JsonIgnore
public String getJsonStreamGraphPlan() {
return jsonStreamGraphPlan.toString();
}

// ---------------------------------------------------
// Static inner classes
// ---------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.streaming.api.graph.AdaptiveGraphManager;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
Expand Down Expand Up @@ -118,6 +119,14 @@ public int getPendingOperatorCount() {
return adaptiveGraphManager.getPendingOperatorsCount();
}

@Override
public String getJsonStreamGraph() {
return JsonPlanGenerator.generateJsonStreamGraph(
adaptiveGraphManager.getStreamGraphContext().getStreamGraph(),
adaptiveGraphManager.getStreamNodeIdsToJobVertexMap(),
getPendingOperatorCount());
}

private int getParallelism(int streamNodeId) {
return adaptiveGraphManager
.getStreamGraphContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,11 @@ int getConsumersMaxParallelism(
* @return the number of pending operators.
*/
int getPendingOperatorCount();

/**
* Retrieves the JSON representation of the stream graph for the original job.
*
* @return the JSON representation of the stream graph.
*/
String getJsonStreamGraph();
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,9 @@ public int getConsumersMaxParallelism(
public int getPendingOperatorCount() {
return 0;
}

@Override
public String getJsonStreamGraph() {
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public class AdaptiveGraphManager implements AdaptiveGraphGenerator {
// We need cache all job vertices to create JobEdge for downstream vertex.
private final Map<Integer, JobVertex> startNodeToJobVertexMap;

private final Map<Integer, JobVertexID> streamNodeIdsToJobVertexMap;

// Records the ID of the job vertex that has completed execution.
private final Set<JobVertexID> finishedJobVertices;

Expand Down Expand Up @@ -159,6 +161,7 @@ public AdaptiveGraphManager(

this.jobVertexToStartNodeMap = new HashMap<>();
this.jobVertexToChainedStreamNodeIdsMap = new HashMap<>();
this.streamNodeIdsToJobVertexMap = new HashMap<>();

this.finishedJobVertices = new HashSet<>();

Expand Down Expand Up @@ -226,6 +229,15 @@ public List<Integer> getStreamNodeIdsByJobVertexId(JobVertexID jobVertexId) {
return jobVertexToChainedStreamNodeIdsMap.get(jobVertexId);
}

/**
* Retrieves the mapping between stream node IDs and job vertices.
*
* @return A map of stream node IDs to job vertices.
*/
public Map<Integer, JobVertexID> getStreamNodeIdsToJobVertexMap() {
return this.streamNodeIdsToJobVertexMap;
}

/**
* Retrieves the ID of the stream node that produces the IntermediateDataSet.
*
Expand Down Expand Up @@ -433,6 +445,7 @@ private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildCo
.computeIfAbsent(
jobVertex.getID(), key -> new ArrayList<>())
.add(node.getId());
streamNodeIdsToJobVertexMap.put(node.getId(), jobVertex.getID());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ public int getSourceId() {
public String getEdgeId() {
return streamEdge.getEdgeId();
}

public ImmutableStreamPartitioner getPartitioner() {
return new ImmutableStreamPartitioner(streamEdge.getPartitioner());
}

public ImmutableStreamExchangeMode getExchangeMode() {
return new ImmutableStreamExchangeMode(streamEdge.getExchangeMode());
}
}
Loading