From f44f3c33abe25aa14f78d9f2a5674dda37003448 Mon Sep 17 00:00:00 2001 From: Yu Chen Date: Sun, 1 Sep 2024 14:12:53 +0800 Subject: [PATCH] [FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph --- .../program/rest/RestClusterClientTest.java | 1 + .../src/test/resources/rest_api_v1.snapshot | 4 + .../executiongraph/AccessExecutionGraph.java | 7 + .../ArchivedExecutionGraph.java | 17 +- .../executiongraph/DefaultExecutionGraph.java | 5 + .../jobgraph/jsonplan/JsonPlanGenerator.java | 58 +++++ .../rest/handler/job/JobDetailsHandler.java | 9 +- .../rest/messages/job/JobDetailsInfo.java | 21 +- ...daptiveExecutionPlanSchedulingContext.java | 9 + .../ExecutionPlanSchedulingContext.java | 7 + ...daptiveExecutionPlanSchedulingContext.java | 5 + .../api/graph/AdaptiveGraphManager.java | 13 + .../api/graph/util/ImmutableStreamEdge.java | 8 + .../util/ImmutableStreamExchangeMode.java | 35 +++ .../api/graph/util/ImmutableStreamGraph.java | 18 +- .../api/graph/util/ImmutableStreamNode.java | 8 + .../util/ImmutableStreamPartitioner.java | 35 +++ .../jobgraph/jsonplan/JsonGeneratorTest.java | 67 +++++ .../jsonplan/JsonStreamGraphSchema.java | 240 ++++++++++++++++++ .../handler/job/JobDetailHandlerTest.java | 120 +++++++++ .../utils/ArchivedExecutionGraphBuilder.java | 9 +- .../rest/messages/job/JobDetailsInfoTest.java | 3 +- .../adapter/DefaultExecutionTopologyTest.java | 5 + .../StateTrackingMockExecutionGraph.java | 5 + 24 files changed, 693 insertions(+), 16 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamExchangeMode.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamPartitioner.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonStreamGraphSchema.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailHandlerTest.java diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 29968f192b944..07406eb1da78d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -1277,6 +1277,7 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception { Collections.singletonMap(JobStatus.RUNNING, 1L), jobVertexDetailsInfos, Collections.singletonMap(ExecutionState.RUNNING, 1), + new JobPlanInfo.RawJson("{\"id\":\"1234\"}"), new JobPlanInfo.RawJson("{\"id\":\"1234\"}")); final TestJobDetailsInfoHandler jobDetailsInfoHandler = new TestJobDetailsInfoHandler(jobDetailsInfo); diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 2c3601bf15b81..d4c3a00cf2148 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -967,6 +967,10 @@ "plan" : { "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson" + }, + "stream-graph-plan" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson" } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index e9998b1a78fc8..8863eb3f5c38e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends JobStatusProvider { */ String getJsonPlan(); + /** + * Returns the stream graph as a JSON string. + * + * @return stream graph as a JSON string + */ + String getJsonStreamGraph(); + /** * Returns the {@link JobID} for this execution graph. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index f322523bcbf2a..04e62cd61ebcd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -113,6 +113,8 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl @Nullable private final String changelogStorageName; + @Nullable private final String jsonStreamGraph; + public ArchivedExecutionGraph( JobID jobID, String jobName, @@ -132,7 +134,8 @@ public ArchivedExecutionGraph( @Nullable String stateBackendName, @Nullable String checkpointStorageName, @Nullable TernaryBoolean stateChangelogEnabled, - @Nullable String changelogStorageName) { + @Nullable String changelogStorageName, + @Nullable String jsonStreamGraph) { this.jobID = Preconditions.checkNotNull(jobID); this.jobName = Preconditions.checkNotNull(jobName); @@ -153,6 +156,7 @@ public ArchivedExecutionGraph( this.checkpointStorageName = checkpointStorageName; this.stateChangelogEnabled = stateChangelogEnabled; this.changelogStorageName = changelogStorageName; + this.jsonStreamGraph = jsonStreamGraph; } // -------------------------------------------------------------------------------------------- @@ -162,6 +166,11 @@ public String getJsonPlan() { return jsonPlan; } + @Override + public String getJsonStreamGraph() { + return jsonStreamGraph; + } + @Override public JobID getJobID() { return jobID; @@ -366,7 +375,8 @@ public static ArchivedExecutionGraph createFrom( executionGraph.getStateBackendName().orElse(null), executionGraph.getCheckpointStorageName().orElse(null), executionGraph.isChangelogStateBackendEnabled(), - executionGraph.getChangelogStorageName().orElse(null)); + executionGraph.getChangelogStorageName().orElse(null), + executionGraph.getJsonStreamGraph()); } /** @@ -487,6 +497,7 @@ private static ArchivedExecutionGraph createSparseArchivedExecutionGraph( checkpointingSettings == null ? TernaryBoolean.UNDEFINED : checkpointingSettings.isChangelogStateBackendEnabled(), - checkpointingSettings == null ? null : "Unknown"); + checkpointingSettings == null ? null : "Unknown", + null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 1114671a1b56c..e56bb2e3eb35f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -612,6 +612,11 @@ public void setJsonPlan(String jsonPlan) { this.jsonPlan = jsonPlan; } + @Override + public String getJsonStreamGraph() { + return executionPlanSchedulingContext.getJsonStreamGraph(); + } + @Override public String getJsonPlan() { return jsonPlan; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java index 6cdcc77cc537a..5d114e1e718b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java @@ -26,6 +26,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -34,6 +37,7 @@ import java.io.StringWriter; import java.util.List; +import java.util.Map; @Internal public class JsonPlanGenerator { @@ -170,4 +174,58 @@ public static String generatePlan( throw new RuntimeException("Failed to generate plan", e); } } + + public static String generateJsonStreamGraph( + ImmutableStreamGraph sg, + Map jobVertexIdMap, + int pendingOperatorCount) { + try (final StringWriter writer = new StringWriter(1024)) { + try (final JsonGenerator gen = new JsonFactory().createGenerator(writer)) { + // start of everything + gen.writeStartObject(); + + gen.writeNumberField("pending_operator_count", pendingOperatorCount); + + gen.writeArrayFieldStart("nodes"); + + // info per vertex + for (ImmutableStreamNode node : sg.getStreamNodes()) { + gen.writeStartObject(); + gen.writeStringField("id", String.valueOf(node.getId())); + gen.writeNumberField("parallelism", node.getParallelism()); + gen.writeStringField("operator", node.getOperatorName()); + gen.writeStringField("description", node.getOperatorDescription()); + if (jobVertexIdMap.containsKey(node.getId())) { + gen.writeStringField( + "job_vertex_id", jobVertexIdMap.get(node.getId()).toString()); + } + + // write the input edge properties + gen.writeArrayFieldStart("inputs"); + + List inEdges = node.getInEdges(); + for (int inputNum = 0; inputNum < inEdges.size(); inputNum++) { + ImmutableStreamEdge edge = inEdges.get(inputNum); + gen.writeStartObject(); + gen.writeNumberField("num", inputNum); + gen.writeStringField("id", String.valueOf(edge.getSourceId())); + gen.writeStringField("ship_strategy", edge.getPartitioner().toString()); + gen.writeStringField("exchange", edge.getExchangeMode().name()); + gen.writeEndObject(); + } + + gen.writeEndArray(); + + gen.writeEndObject(); + } + + // end of everything + gen.writeEndArray(); + gen.writeEndObject(); + } + return writer.toString(); + } catch (Exception e) { + throw new RuntimeException("Failed to generate json stream plan", e); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index efc49c1d8c711..5ac21afaa811d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; @@ -141,6 +142,11 @@ private static JobDetailsInfo createJobDetailsInfo( executionState, jobVerticesPerState[executionState.ordinal()]); } + JobPlanInfo.RawJson jsonStreamGraph = null; + if (!StringUtils.isNullOrWhitespaceOnly(executionGraph.getJsonStreamGraph())) { + jsonStreamGraph = new JobPlanInfo.RawJson(executionGraph.getJsonStreamGraph()); + } + return new JobDetailsInfo( executionGraph.getJobID(), executionGraph.getJobName(), @@ -155,7 +161,8 @@ private static JobDetailsInfo createJobDetailsInfo( timestamps, jobVertexInfos, jobVerticesPerStateMap, - new JobPlanInfo.RawJson(executionGraph.getJsonPlan())); + new JobPlanInfo.RawJson(executionGraph.getJsonPlan()), + jsonStreamGraph); } private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index 250407005c155..db1c95b5bfa14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -79,6 +79,8 @@ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JSON_PLAN = "plan"; + public static final String FIELD_NAME_JSON_STREAM_GRAPH_PLAN = "stream-graph-plan"; + @JsonProperty(FIELD_NAME_JOB_ID) @JsonSerialize(using = JobIDSerializer.class) private final JobID jobId; @@ -122,6 +124,9 @@ public class JobDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_JSON_PLAN) private final JobPlanInfo.RawJson jsonPlan; + @JsonProperty(FIELD_NAME_JSON_STREAM_GRAPH_PLAN) + private final JobPlanInfo.RawJson jsonStreamGraphPlan; + @JsonCreator public JobDetailsInfo( @JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID) @@ -140,7 +145,9 @@ public JobDetailsInfo( Collection jobVertexInfos, @JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE) Map jobVerticesPerState, - @JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan) { + @JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan, + @JsonProperty(FIELD_NAME_JSON_STREAM_GRAPH_PLAN) + JobPlanInfo.RawJson jsonStreamGraphPlan) { this.jobId = Preconditions.checkNotNull(jobId); this.name = Preconditions.checkNotNull(name); this.isStoppable = isStoppable; @@ -155,6 +162,7 @@ public JobDetailsInfo( this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos); this.jobVerticesPerState = Preconditions.checkNotNull(jobVerticesPerState); this.jsonPlan = Preconditions.checkNotNull(jsonPlan); + this.jsonStreamGraphPlan = jsonStreamGraphPlan; } @Override @@ -179,7 +187,8 @@ public boolean equals(Object o) { && Objects.equals(timestamps, that.timestamps) && Objects.equals(jobVertexInfos, that.jobVertexInfos) && Objects.equals(jobVerticesPerState, that.jobVerticesPerState) - && Objects.equals(jsonPlan, that.jsonPlan); + && Objects.equals(jsonPlan, that.jsonPlan) + && Objects.equals(jsonStreamGraphPlan, that.jsonStreamGraphPlan); } @Override @@ -198,7 +207,8 @@ public int hashCode() { timestamps, jobVertexInfos, jobVerticesPerState, - jsonPlan); + jsonPlan, + jsonStreamGraphPlan); } @JsonIgnore @@ -271,6 +281,11 @@ public String getJsonPlan() { return jsonPlan.toString(); } + @JsonIgnore + public String getJsonStreamGraphPlan() { + return jsonStreamGraphPlan.toString(); + } + // --------------------------------------------------- // Static inner classes // --------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java index 5575f4e27be42..6c797bf71d0aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.streaming.api.graph.AdaptiveGraphManager; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode; @@ -118,6 +119,14 @@ public int getPendingOperatorCount() { return adaptiveGraphManager.getPendingOperatorsCount(); } + @Override + public String getJsonStreamGraph() { + return JsonPlanGenerator.generateJsonStreamGraph( + adaptiveGraphManager.getStreamGraphContext().getStreamGraph(), + adaptiveGraphManager.getStreamNodeIdsToJobVertexMap(), + getPendingOperatorCount()); + } + private int getParallelism(int streamNodeId) { return adaptiveGraphManager .getStreamGraphContext() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java index 86919814e9c9a..cba65c8371e87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java @@ -58,4 +58,11 @@ int getConsumersMaxParallelism( * @return the number of pending operators. */ int getPendingOperatorCount(); + + /** + * Retrieves the JSON representation of the stream graph for the original job. + * + * @return the JSON representation of the stream graph. + */ + String getJsonStreamGraph(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java index 72de8edec66ec..a40ef22b910a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java @@ -107,4 +107,9 @@ public int getConsumersMaxParallelism( public int getPendingOperatorCount() { return 0; } + + @Override + public String getJsonStreamGraph() { + return ""; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index bec248898b115..e9902828408d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -125,6 +125,8 @@ public class AdaptiveGraphManager implements AdaptiveGraphGenerator { // We need cache all job vertices to create JobEdge for downstream vertex. private final Map startNodeToJobVertexMap; + private final Map streamNodeIdsToJobVertexMap; + // Records the ID of the job vertex that has completed execution. private final Set finishedJobVertices; @@ -159,6 +161,7 @@ public AdaptiveGraphManager( this.jobVertexToStartNodeMap = new HashMap<>(); this.jobVertexToChainedStreamNodeIdsMap = new HashMap<>(); + this.streamNodeIdsToJobVertexMap = new HashMap<>(); this.finishedJobVertices = new HashSet<>(); @@ -226,6 +229,15 @@ public List getStreamNodeIdsByJobVertexId(JobVertexID jobVertexId) { return jobVertexToChainedStreamNodeIdsMap.get(jobVertexId); } + /** + * Retrieves the mapping between stream node IDs and job vertices. + * + * @return A map of stream node IDs to job vertices. + */ + public Map getStreamNodeIdsToJobVertexMap() { + return this.streamNodeIdsToJobVertexMap; + } + /** * Retrieves the ID of the stream node that produces the IntermediateDataSet. * @@ -433,6 +445,7 @@ private void recordCreatedJobVerticesInfo(JobVertexBuildContext jobVertexBuildCo .computeIfAbsent( jobVertex.getID(), key -> new ArrayList<>()) .add(node.getId()); + streamNodeIdsToJobVertexMap.put(node.getId(), jobVertex.getID()); }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java index f0ab4d42e123c..4cb463ab29ab5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java @@ -45,4 +45,12 @@ public int getSourceId() { public String getEdgeId() { return streamEdge.getEdgeId(); } + + public ImmutableStreamPartitioner getPartitioner() { + return new ImmutableStreamPartitioner(streamEdge.getPartitioner()); + } + + public ImmutableStreamExchangeMode getExchangeMode() { + return new ImmutableStreamExchangeMode(streamEdge.getExchangeMode()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamExchangeMode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamExchangeMode.java new file mode 100644 index 0000000000000..4a9a59b3331bb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamExchangeMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; + +@Internal +public class ImmutableStreamExchangeMode { + private final StreamExchangeMode streamExchangeMode; + + public ImmutableStreamExchangeMode(StreamExchangeMode streamExchangeMode) { + this.streamExchangeMode = streamExchangeMode; + } + + public String name() { + return streamExchangeMode.name(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java index eef1f001c314d..b154813eeeb10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamGraph.java @@ -20,27 +20,31 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import java.util.Collection; import java.util.HashMap; import java.util.Map; /** Helper class that provides read-only StreamGraph. */ @Internal public class ImmutableStreamGraph { - private final StreamGraph streamGraph; private final Map immutableStreamNodes; public ImmutableStreamGraph(StreamGraph streamGraph) { - this.streamGraph = streamGraph; this.immutableStreamNodes = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + immutableStreamNodes.put( + node.getId(), new ImmutableStreamNode(streamGraph.getStreamNode(node.getId()))); + } } public ImmutableStreamNode getStreamNode(Integer vertexId) { - if (streamGraph.getStreamNode(vertexId) == null) { - return null; - } - return immutableStreamNodes.computeIfAbsent( - vertexId, id -> new ImmutableStreamNode(streamGraph.getStreamNode(id))); + return immutableStreamNodes.get(vertexId); + } + + public Collection getStreamNodes() { + return immutableStreamNodes.values(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java index d4d3d76af198e..757513d154012 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java @@ -68,4 +68,12 @@ public int getMaxParallelism() { public int getParallelism() { return streamNode.getParallelism(); } + + public String getOperatorName() { + return streamNode.getOperatorName(); + } + + public String getOperatorDescription() { + return streamNode.getOperatorDescription(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamPartitioner.java new file mode 100644 index 0000000000000..0523a7c3172de --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamPartitioner.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; + +@Internal +public class ImmutableStreamPartitioner { + private final StreamPartitioner streamPartitioner; + + public ImmutableStreamPartitioner(StreamPartitioner streamPartitioner) { + this.streamPartitioner = streamPartitioner; + } + + public String toString() { + return streamPartitioner.toString(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java index 5eaf28a7bc1b5..d254233d8f0c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java @@ -25,15 +25,25 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph; import org.apache.flink.util.jackson.JacksonMapperFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput; import static org.junit.Assert.assertEquals; @@ -151,4 +161,61 @@ private void checkVertexExists(String vertexId, JobGraph graph) { } fail("could not find vertex with id " + vertexId + " in JobGraph"); } + + @Test + public void testGenerateJsonStreamGraph() throws JsonProcessingException { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + env.fromSequence(0L, 1L).disableChaining().print(); + StreamGraph streamGraph = env.getStreamGraph(); + Map jobVertexIdMap = new HashMap<>(); + String jsonStreamGraph = + JsonPlanGenerator.generateJsonStreamGraph( + new ImmutableStreamGraph(streamGraph), jobVertexIdMap, 2); + + ObjectMapper mapper = JacksonMapperFactory.createObjectMapper(); + JsonStreamGraphSchema parsedStreamGraph = + mapper.readValue(jsonStreamGraph, JsonStreamGraphSchema.class); + + assertEquals(2, parsedStreamGraph.getPendingOperatorCount()); + + List expectedJobVertexIds = new ArrayList<>(); + expectedJobVertexIds.add(null); + expectedJobVertexIds.add(null); + validateStreamGraph(streamGraph, parsedStreamGraph, expectedJobVertexIds); + + jobVertexIdMap.put(1, new JobVertexID()); + jobVertexIdMap.put(2, new JobVertexID()); + jsonStreamGraph = + JsonPlanGenerator.generateJsonStreamGraph( + new ImmutableStreamGraph(streamGraph), jobVertexIdMap, 0); + + parsedStreamGraph = mapper.readValue(jsonStreamGraph, JsonStreamGraphSchema.class); + assertEquals(0, parsedStreamGraph.getPendingOperatorCount()); + validateStreamGraph( + streamGraph, + parsedStreamGraph, + jobVertexIdMap.values().stream() + .map(JobVertexID::toString) + .collect(Collectors.toList())); + } + + public static void validateStreamGraph( + StreamGraph streamGraph, + JsonStreamGraphSchema parsedStreamGraph, + List expectedJobVertexIds) { + List realJobVertexIds = new ArrayList<>(); + parsedStreamGraph + .getNodes() + .forEach( + node -> { + StreamNode streamNode = + streamGraph.getStreamNode(Integer.parseInt(node.getId())); + assertEquals(node.getOperator(), streamNode.getOperatorName()); + assertEquals( + node.getParallelism(), (Integer) streamNode.getParallelism()); + assertEquals(node.getInputs().size(), streamNode.getInEdges().size()); + realJobVertexIds.add(node.getJobVertexId()); + }); + assertEquals(expectedJobVertexIds, realJobVertexIds); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonStreamGraphSchema.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonStreamGraphSchema.java new file mode 100644 index 0000000000000..919d5ad792b93 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonStreamGraphSchema.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.jsonplan; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +public class JsonStreamGraphSchema { + public static final String FIELD_NAME_NODES = "nodes"; + public static final String FIELD_PENDING_OPERATOR_COUNT = "pending_operator_count"; + + @JsonProperty(FIELD_NAME_NODES) + private List nodes; + + @JsonProperty(FIELD_PENDING_OPERATOR_COUNT) + private int pendingOperatorCount; + + @JsonCreator + public JsonStreamGraphSchema( + @JsonProperty(FIELD_NAME_NODES) List nodes, + @JsonProperty(FIELD_PENDING_OPERATOR_COUNT) int pendingOperatorCount) { + this.nodes = nodes; + this.pendingOperatorCount = pendingOperatorCount; + } + + @JsonIgnore + public List getNodes() { + return nodes; + } + + @JsonIgnore + public int getPendingOperatorCount() { + return pendingOperatorCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JsonStreamGraphSchema that = (JsonStreamGraphSchema) o; + return Objects.equals(nodes, that.nodes) + && pendingOperatorCount == that.pendingOperatorCount; + } + + @Override + public int hashCode() { + return Objects.hash(nodes, pendingOperatorCount); + } + + public static class JsonStreamNodeSchema { + public static final String FIELD_NAME_NODE_ID = "id"; + public static final String FIELD_NAME_NODE_PARALLELISM = "parallelism"; + public static final String FIELD_NAME_NODE_OPERATOR = "operator"; + public static final String FIELD_NAME_NODE_DESCRIPTION = "description"; + public static final String FIELD_NAME_NODE_JOB_VERTEX_ID = "job_vertex_id"; + public static final String FIELD_NAME_NODE_INPUTS = "inputs"; + + @JsonProperty(FIELD_NAME_NODE_ID) + private final String id; + + @JsonProperty(FIELD_NAME_NODE_PARALLELISM) + private final Integer parallelism; + + @JsonProperty(FIELD_NAME_NODE_OPERATOR) + private final String operator; + + @JsonProperty(FIELD_NAME_NODE_DESCRIPTION) + private final String description; + + @JsonProperty(FIELD_NAME_NODE_JOB_VERTEX_ID) + private final String jobVertexId; + + @JsonProperty(FIELD_NAME_NODE_INPUTS) + private final List inputs; + + @JsonCreator + public JsonStreamNodeSchema( + @JsonProperty(FIELD_NAME_NODE_ID) String id, + @JsonProperty(FIELD_NAME_NODE_PARALLELISM) Integer parallelism, + @JsonProperty(FIELD_NAME_NODE_OPERATOR) String operator, + @JsonProperty(FIELD_NAME_NODE_DESCRIPTION) String description, + @JsonProperty(FIELD_NAME_NODE_JOB_VERTEX_ID) String jobVertexId, + @JsonProperty(FIELD_NAME_NODE_INPUTS) List inputs) { + this.id = id; + this.parallelism = parallelism; + this.operator = operator; + this.description = description; + this.jobVertexId = jobVertexId; + this.inputs = inputs; + } + + @JsonIgnore + public String getId() { + return id; + } + + @JsonIgnore + public Integer getParallelism() { + return parallelism; + } + + @JsonIgnore + public String getOperator() { + return operator; + } + + @JsonIgnore + public String getDescription() { + return description; + } + + @JsonIgnore + public String getJobVertexId() { + return jobVertexId; + } + + @JsonIgnore + public List getInputs() { + return inputs; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JsonStreamNodeSchema that = (JsonStreamNodeSchema) o; + return Objects.equals(id, that.id) + && Objects.equals(parallelism, that.parallelism) + && Objects.equals(operator, that.operator) + && Objects.equals(description, that.description) + && Objects.equals(jobVertexId, that.jobVertexId) + && Objects.equals(inputs, that.inputs); + } + + @Override + public int hashCode() { + return Objects.hash(id, parallelism, operator, description, jobVertexId, inputs); + } + } + + public static class JsonStreamEdgeSchema { + public static final String FIELD_NAME_EDGE_INPUT_NUM = "num"; + public static final String FIELD_NAME_EDGE_ID = "id"; + public static final String FIELD_NAME_EDGE_SHIP_STRATEGY = "ship_strategy"; + public static final String FIELD_NAME_EDGE_EXCHANGE = "exchange"; + + @JsonProperty(FIELD_NAME_EDGE_INPUT_NUM) + private final Integer num; + + @JsonProperty(FIELD_NAME_EDGE_ID) + private final String id; + + @JsonProperty(FIELD_NAME_EDGE_SHIP_STRATEGY) + private final String shipStrategy; + + @JsonProperty(FIELD_NAME_EDGE_EXCHANGE) + private final String exchange; + + @JsonCreator + public JsonStreamEdgeSchema( + @JsonProperty(FIELD_NAME_EDGE_INPUT_NUM) Integer num, + @JsonProperty(FIELD_NAME_EDGE_ID) String id, + @JsonProperty(FIELD_NAME_EDGE_SHIP_STRATEGY) String shipStrategy, + @JsonProperty(FIELD_NAME_EDGE_EXCHANGE) String exchange) { + this.num = num; + this.id = id; + this.shipStrategy = shipStrategy; + this.exchange = exchange; + } + + @JsonIgnore + public Integer getNum() { + return num; + } + + @JsonIgnore + public String getId() { + return id; + } + + @JsonIgnore + public String getShipStrategy() { + return shipStrategy; + } + + @JsonIgnore + public String getExchange() { + return exchange; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JsonStreamEdgeSchema that = (JsonStreamEdgeSchema) o; + return Objects.equals(num, that.num) + && Objects.equals(id, that.id) + && Objects.equals(shipStrategy, that.shipStrategy) + && Objects.equals(exchange, that.exchange); + } + + @Override + public int hashCode() { + return Objects.hash(num, id, shipStrategy, exchange); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailHandlerTest.java new file mode 100644 index 0000000000000..5a54c8b77b909 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailHandlerTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionConfigBuilder; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +public class JobDetailHandlerTest { + private JobDetailsHandler jobDetailsHandler; + private HandlerRequest handlerRequest; + private AccessExecutionGraph archivedExecutionGraph; + private final String expectedJsonStreamGraph = + "{\"pending_operator_count:2,\":\"nodes\":[{\"id\":\"1\",\"parallelism\":1,\"operator\":\"Source: Sequence Source\",\"description\":\"Source: Sequence Source\",\"inputs\":[]},{\"id\":\"2\",\"parallelism\":1,\"operator\":\"Sink: Print to Std. Out\",\"description\":\"Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"1\",\"ship_strategy\":\"FORWARD\",\"exchange\":\"UNDEFINED\"}]}]}"; + + private static HandlerRequest createRequest(JobID jobId) + throws HandlerRequestException { + Map pathParameters = new HashMap<>(); + pathParameters.put(JobIDPathParameter.KEY, jobId.toString()); + return HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new TaskManagerMessageParameters(), + pathParameters, + Collections.emptyMap(), + Collections.emptyList()); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) throws HandlerRequestException { + GatewayRetriever leaderRetriever = + () -> CompletableFuture.completedFuture(null); + final RestHandlerConfiguration restHandlerConfiguration = + RestHandlerConfiguration.fromConfiguration(new Configuration()); + final MetricFetcher metricFetcher = + new MetricFetcherImpl<>( + () -> null, + address -> null, + Executors.directExecutor(), + Duration.ofMillis(1000L), + MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue().toMillis()); + final ArchivedExecutionConfig archivedExecutionConfig = + new ArchivedExecutionConfigBuilder().build(); + + archivedExecutionGraph = + new ArchivedExecutionGraphBuilder() + .setJsonStreamGraph(expectedJsonStreamGraph) + .setArchivedExecutionConfig(archivedExecutionConfig) + .build(); + jobDetailsHandler = + new JobDetailsHandler( + leaderRetriever, + TestingUtils.TIMEOUT, + Collections.emptyMap(), + JobDetailsHeaders.getInstance(), + new DefaultExecutionGraphCache( + restHandlerConfiguration.getTimeout(), + Duration.ofMillis(restHandlerConfiguration.getRefreshInterval())), + Executors.directExecutor(), + metricFetcher); + handlerRequest = createRequest(archivedExecutionGraph.getJobID()); + } + + @Test + void testGetJobDetailsWithJsonStreamGraph() throws RestHandlerException { + JobDetailsInfo jobDetailsInfo = + jobDetailsHandler.handleRequest(handlerRequest, archivedExecutionGraph); + assertThat(jobDetailsInfo.getJsonStreamGraphPlan()) + .isEqualTo(new JobPlanInfo.RawJson(expectedJsonStreamGraph).toString()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java index 58255a5b6dc28..d9839357cdb5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java @@ -57,6 +57,7 @@ public class ArchivedExecutionGraphBuilder { private boolean isStoppable; private Map>> serializedUserAccumulators; private CheckpointStatsSnapshot checkpointStatsSnapshot; + private String jsonStreamGraph; public ArchivedExecutionGraphBuilder setJobID(JobID jobID) { this.jobID = jobID; @@ -101,6 +102,11 @@ public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) { return this; } + public ArchivedExecutionGraphBuilder setJsonStreamGraph(String jsonStreamGraph) { + this.jsonStreamGraph = jsonStreamGraph; + return this; + } + public ArchivedExecutionGraphBuilder setArchivedUserAccumulators( StringifiedAccumulatorResult[] archivedUserAccumulators) { this.archivedUserAccumulators = archivedUserAccumulators; @@ -171,6 +177,7 @@ public ArchivedExecutionGraph build() { "stateBackendName", "checkpointStorageName", TernaryBoolean.UNDEFINED, - "changelogStorageName"); + "changelogStorageName", + jsonStreamGraph); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java index 6bb69b357e0cd..27823804dae4b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java @@ -84,7 +84,8 @@ protected JobDetailsInfo getTestResponseInstance() throws Exception { timestamps, jobVertexInfos, jobVerticesPerState, - new JobPlanInfo.RawJson(jsonPlan)); + new JobPlanInfo.RawJson(jsonPlan), + null); } private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random random) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java index 2b4813b2e9300..d61047a9bbe70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java @@ -449,5 +449,10 @@ public int getConsumersMaxParallelism( public int getPendingOperatorCount() { return 0; } + + @Override + public String getJsonStreamGraph() { + return ""; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index ce49ff5bded0e..ad11b85490831 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -187,6 +187,11 @@ public String getJsonPlan() { return ""; } + @Override + public String getJsonStreamGraph() { + return ""; + } + @Override public void setJsonPlan(String jsonPlan) {}