diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index 0ed547c134..098840d779 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -110,7 +110,7 @@ public boolean scale(FlinkResourceContext> registerResourceScalingMetrics(resource, ctx.getResourceMetricGroup()); var specAdjusted = - scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics); + scalingExecutor.scaleResource(ctx, autoScalerInfo, conf, evaluatedMetrics); autoScalerInfo.replaceInKubernetes(kubernetesClient); return specAdjusted; } catch (Exception e) { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java index f0d6a183fb..cc167469e6 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java @@ -19,11 +19,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -65,7 +65,7 @@ public JobVertexScaler(EventRecorder eventRecorder) { } public int computeScaleTargetParallelism( - AbstractFlinkResource resource, + FlinkResourceContext ctx, Configuration conf, JobVertexID vertex, Map evaluatedMetrics, @@ -112,7 +112,7 @@ public int computeScaleTargetParallelism( if (newParallelism == currentParallelism || blockScalingBasedOnPastActions( - resource, + ctx, vertex, conf, evaluatedMetrics, @@ -129,7 +129,7 @@ public int computeScaleTargetParallelism( } private boolean blockScalingBasedOnPastActions( - AbstractFlinkResource resource, + FlinkResourceContext ctx, JobVertexID vertex, Configuration conf, Map evaluatedMetrics, @@ -148,8 +148,7 @@ private boolean blockScalingBasedOnPastActions( if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) { if (scaledUp) { - return detectIneffectiveScaleUp( - resource, vertex, conf, evaluatedMetrics, lastSummary); + return detectIneffectiveScaleUp(ctx, vertex, conf, evaluatedMetrics, lastSummary); } else { return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs); } @@ -172,7 +171,7 @@ private boolean detectImmediateScaleDownAfterScaleUp( } private boolean detectIneffectiveScaleUp( - AbstractFlinkResource resource, + FlinkResourceContext ctx, JobVertexID vertex, Configuration conf, Map evaluatedMetrics, @@ -200,7 +199,7 @@ private boolean detectIneffectiveScaleUp( var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex); eventRecorder.triggerEvent( - resource, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.IneffectiveScaling, EventRecorder.Component.Operator, diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java index d0df57b73b..6d018f12b9 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -88,18 +89,18 @@ public ScalingExecutor( } public boolean scaleResource( - AbstractFlinkResource resource, + FlinkResourceContext ctx, AutoScalerInfo scalingInformation, Configuration conf, Map> evaluatedMetrics) { + var resource = ctx.getResource(); if (!stabilizationPeriodPassed(resource, conf)) { return false; } var scalingHistory = scalingInformation.getScalingHistory(); - var scalingSummaries = - computeScalingSummary(resource, conf, evaluatedMetrics, scalingHistory); + var scalingSummaries = computeScalingSummary(ctx, conf, evaluatedMetrics, scalingHistory); if (scalingSummaries.isEmpty()) { LOG.info("All job vertices are currently running at their target parallelism."); @@ -114,7 +115,7 @@ public boolean scaleResource( var scalingReport = scalingReport(scalingSummaries, scalingEnabled); eventRecorder.triggerEvent( - resource, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.ScalingReport, EventRecorder.Component.Operator, @@ -217,7 +218,7 @@ protected static boolean allVerticesWithinUtilizationTarget( } private Map computeScalingSummary( - AbstractFlinkResource resource, + FlinkResourceContext ctx, Configuration conf, Map> evaluatedMetrics, Map> scalingHistory) { @@ -235,7 +236,7 @@ private Map computeScalingSummary( (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); var newParallelism = jobVertexScaler.computeScaleTargetParallelism( - resource, + ctx, conf, v, metrics, diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java index d4c3d6a009..6817178dbf 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java @@ -23,6 +23,8 @@ import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -58,6 +60,8 @@ public class JobVertexScalerTest { private FlinkDeployment flinkDep; + private FlinkResourceContext ctx; + @BeforeEach public void setup() { flinkDep = TestUtils.buildApplicationCluster(); @@ -67,6 +71,7 @@ public void setup() { conf = new Configuration(); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); + ctx = new FlinkDeploymentContext(flinkDep, null, null, null, null); } @Test @@ -76,55 +81,55 @@ public void testParallelismScaling() { assertEquals( 5, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 8, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 80, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 80, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 8, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 60, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 60, 100), Collections.emptySortedMap())); assertEquals( 8, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 59, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 59, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(2, 100, 40), Collections.emptySortedMap())); + ctx, conf, op, evaluated(2, 100, 40), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); assertEquals( 4, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(2, 100, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(2, 100, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( 5, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); assertEquals( 4, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); + ctx, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); } @Test @@ -168,7 +173,7 @@ public void testMinParallelismLimitIsUsed() { assertEquals( 5, vertexScaler.computeScaleTargetParallelism( - flinkDep, + ctx, conf, new JobVertexID(), evaluated(10, 100, 500), @@ -182,7 +187,7 @@ public void testMaxParallelismLimitIsUsed() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - flinkDep, + ctx, conf, new JobVertexID(), evaluated(10, 500, 100), @@ -200,31 +205,27 @@ public void testScaleDownAfterScaleUpDetection() { var evaluated = evaluated(5, 100, 50); var history = new TreeMap(); assertEquals( - 10, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); // Should not allow scale back down immediately evaluated = evaluated(10, 50, 100); assertEquals( - 10, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); // Pass some time... clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61)); vertexScaler.setClock(clock); assertEquals( - 5, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 5, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); history.put(clock.instant(), new ScalingSummary(10, 5, evaluated)); // Allow immediate scale up evaluated = evaluated(5, 100, 50); assertEquals( - 10, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); } @@ -237,16 +238,14 @@ public void testIneffectiveScalingDetection() { var evaluated = evaluated(5, 100, 50); var history = new TreeMap(); assertEquals( - 10, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); // Allow to scale higher if scaling was effective (80%) evaluated = evaluated(10, 180, 90); assertEquals( - 20, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); @@ -254,44 +253,38 @@ public void testIneffectiveScalingDetection() { // 90 -> 94. Do not try to scale above 20 evaluated = evaluated(20, 180, 94); assertEquals( - 20, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Still considered ineffective (less than <10%) evaluated = evaluated(20, 180, 98); assertEquals( - 20, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Allow scale up if current parallelism doesnt match last (user rescaled manually) evaluated = evaluated(10, 180, 90); assertEquals( - 20, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 20, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); // Over 10%, effective evaluated = evaluated(20, 180, 100); assertEquals( - 36, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 36, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Ineffective but detection is turned off conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); evaluated = evaluated(20, 180, 90); assertEquals( - 40, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 40, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); // Allow scale down even if ineffective evaluated = evaluated(20, 45, 90); assertEquals( - 10, - vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); + 10, vertexScaler.computeScaleTargetParallelism(ctx, conf, op, evaluated, history)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); } @@ -306,7 +299,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, jobVertexID, evaluated, history)); + ctx, conf, jobVertexID, evaluated, history)); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); @@ -315,7 +308,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, jobVertexID, evaluated, history)); + ctx, conf, jobVertexID, evaluated, history)); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); assertEquals(0, eventCollector.events.size()); @@ -325,7 +318,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - flinkDep, conf, jobVertexID, evaluated, history)); + ctx, conf, jobVertexID, evaluated, history)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(1, eventCollector.events.size()); var event = eventCollector.events.poll(); diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java index 57c7445288..f6c96319f8 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -31,6 +31,8 @@ import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -84,6 +86,8 @@ public class MetricsCollectionAndEvaluationTest { private Instant startTime; + private FlinkResourceContext ctx; + @BeforeEach public void setup() { evaluator = new ScalingMetricEvaluator(); @@ -126,6 +130,8 @@ public void setup() { app.getStatus().getJobStatus().setStartTime(String.valueOf(startTime.toEpochMilli())); app.getStatus().getJobStatus().setUpdateTime(String.valueOf(startTime.toEpochMilli())); app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name()); + + ctx = new FlinkDeploymentContext(app, null, null, null, null); } @Test @@ -173,7 +179,7 @@ public void testEndToEnd() throws Exception { assertEquals(3, collectedMetrics.getMetricHistory().size()); var evaluation = evaluator.evaluate(conf, collectedMetrics); - scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation); + scalingExecutor.scaleResource(ctx, scalingInfo, conf, evaluation); var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(4, scaledParallelism.size()); @@ -187,7 +193,7 @@ public void testEndToEnd() throws Exception { conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.); evaluation = evaluator.evaluate(conf, collectedMetrics); - scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation); + scalingExecutor.scaleResource(ctx, scalingInfo, conf, evaluation); scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(4, scaledParallelism.get(source1)); @@ -393,7 +399,7 @@ public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception { 625., evaluation.get(source1).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent()); - scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation); + scalingExecutor.scaleResource(ctx, scalingInfo, conf, evaluation); var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(1, scaledParallelism.get(source1)); } @@ -430,7 +436,7 @@ public void testScaleDownWithZeroProcessingRate() throws Exception { 0., evaluation.get(source1).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent()); - scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation); + scalingExecutor.scaleResource(ctx, scalingInfo, conf, evaluation); var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app); assertEquals(1, scaledParallelism.get(source1)); } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java index cfe0695e97..884e62e519 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -25,6 +25,8 @@ import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -64,6 +66,8 @@ public class ScalingExecutorTest { private KubernetesClient kubernetesClient; private FlinkDeployment flinkDep; + private FlinkResourceContext ctx; + @BeforeEach public void setup() { eventCollector = new EventCollector(); @@ -82,6 +86,7 @@ public void setup() { jobStatus.setStartTime(String.valueOf(System.currentTimeMillis())); jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis())); jobStatus.setState(JobStatus.RUNNING.name()); + ctx = new FlinkDeploymentContext(flinkDep, null, null, null, null); } @Test @@ -97,35 +102,35 @@ public void testStabilizationPeriod() throws Exception { jobStatus.setUpdateTime(String.valueOf(clock.instant().toEpochMilli())); scalingDecisionExecutor.setClock(clock); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); clock = Clock.offset(clock, Duration.ofSeconds(30)); scalingDecisionExecutor.setClock(clock); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); clock = Clock.offset(clock, Duration.ofSeconds(20)); scalingDecisionExecutor.setClock(clock); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); clock = Clock.offset(clock, Duration.ofSeconds(20)); scalingDecisionExecutor.setClock(clock); - assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertTrue(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); // A job should not be considered stable in a non-RUNNING state jobStatus.setState(JobStatus.FAILING.name()); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); jobStatus.setState(JobStatus.RUNNING.name()); jobStatus.setUpdateTime(String.valueOf(clock.instant().toEpochMilli())); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); clock = Clock.offset(clock, Duration.ofSeconds(59)); scalingDecisionExecutor.setClock(clock); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); clock = Clock.offset(clock, Duration.ofSeconds(2)); scalingDecisionExecutor.setClock(clock); - assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertTrue(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); } @Test @@ -205,10 +210,10 @@ public void testVertexesExclusionForScaling() { evaluated(10, 80, 100)); // filter operator should not scale conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(filterOperatorHexString)); - assertFalse(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertFalse(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); // filter operator should scale conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of()); - assertTrue(scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertTrue(scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); } @ParameterizedTest @@ -220,7 +225,7 @@ public void testScalingEvents(boolean scalingEnabled) { var scalingInfo = new AutoScalerInfo(new HashMap<>()); assertEquals( scalingEnabled, - scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + scalingDecisionExecutor.scaleResource(ctx, scalingInfo, conf, metrics)); assertEquals(1, eventCollector.events.size()); var event = eventCollector.events.poll(); assertTrue( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 16f1137e78..fcdcfbcfc4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -97,16 +97,18 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) { return DeleteControl.defaultDelete(); } + statusRecorder.updateStatusFromCache(flinkApp); + var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext); + ctx.setIgnoreEventErrors(true); + String msg = "Cleaning up " + FlinkDeployment.class.getSimpleName(); LOG.info(msg); eventRecorder.triggerEvent( - flinkApp, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.Cleanup, EventRecorder.Component.Operator, msg); - statusRecorder.updateStatusFromCache(flinkApp); - var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext); try { observerFactory.getOrCreate(flinkApp).observe(ctx); } catch (DeploymentFailedException dfe) { @@ -131,7 +133,7 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext); try { observerFactory.getOrCreate(flinkApp).observe(ctx); - if (!validateDeployment(flinkApp)) { + if (!validateDeployment(ctx)) { statusRecorder.patchAndCacheStatus(flinkApp); return ReconciliationUtils.toUpdateControl( configManager.getOperatorConfiguration(), @@ -142,12 +144,12 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex statusRecorder.patchAndCacheStatus(flinkApp); reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx); } catch (RecoveryFailureException rfe) { - handleRecoveryFailed(flinkApp, rfe); + handleRecoveryFailed(ctx, rfe); } catch (DeploymentFailedException dfe) { - handleDeploymentFailed(flinkApp, dfe); + handleDeploymentFailed(ctx, dfe); } catch (Exception e) { eventRecorder.triggerEvent( - flinkApp, + ctx, EventRecorder.Type.Warning, "ClusterDeploymentException", e.getMessage(), @@ -161,26 +163,29 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex configManager.getOperatorConfiguration(), flinkApp, previousDeployment, true); } - private void handleDeploymentFailed(FlinkDeployment flinkApp, DeploymentFailedException dfe) { + private void handleDeploymentFailed( + FlinkResourceContext ctx, DeploymentFailedException dfe) { LOG.error("Flink Deployment failed", dfe); - flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); - flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name()); + var status = ctx.getResource().getStatus(); + status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); + status.getJobStatus().setState(JobStatus.RECONCILING.name()); ReconciliationUtils.updateForReconciliationError( - flinkApp, dfe, configManager.getOperatorConfiguration()); + ctx.getResource(), dfe, configManager.getOperatorConfiguration()); eventRecorder.triggerEvent( - flinkApp, + ctx, EventRecorder.Type.Warning, dfe.getReason(), dfe.getMessage(), EventRecorder.Component.JobManagerDeployment); } - private void handleRecoveryFailed(FlinkDeployment flinkApp, RecoveryFailureException rfe) { + private void handleRecoveryFailed( + FlinkResourceContext ctx, RecoveryFailureException rfe) { LOG.error("Flink recovery failed", rfe); ReconciliationUtils.updateForReconciliationError( - flinkApp, rfe, configManager.getOperatorConfiguration()); + ctx.getResource(), rfe, configManager.getOperatorConfiguration()); eventRecorder.triggerEvent( - flinkApp, + ctx, EventRecorder.Type.Warning, rfe.getReason(), rfe.getMessage(), @@ -206,12 +211,13 @@ public ErrorStatusUpdateControl updateErrorStatus( configManager.getOperatorConfiguration()); } - private boolean validateDeployment(FlinkDeployment deployment) { + private boolean validateDeployment(FlinkResourceContext ctx) { + var deployment = ctx.getResource(); for (FlinkResourceValidator validator : validators) { Optional validationError = validator.validateDeployment(deployment); if (validationError.isPresent()) { eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.ValidationError, EventRecorder.Component.Operator, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java index 0ae1b875fe..d8cb4204f8 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java @@ -26,6 +26,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Setter; /** Context for reconciling a Flink resource. * */ @RequiredArgsConstructor @@ -34,6 +35,7 @@ public abstract class FlinkResourceContext josdkContext; @Getter private final KubernetesResourceMetricGroup resourceMetricGroup; + @Getter @Setter private boolean ignoreEventErrors; private Configuration observeConfig; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 78ead73e2b..6fe8057e5e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -102,7 +102,7 @@ public UpdateControl reconcile( var ctx = ctxFactory.getResourceContext(flinkSessionJob, josdkContext); observer.observe(ctx); - if (!validateSessionJob(flinkSessionJob, josdkContext)) { + if (!validateSessionJob(ctx)) { statusRecorder.patchAndCacheStatus(flinkSessionJob); return ReconciliationUtils.toUpdateControl( configManager.getOperatorConfiguration(), flinkSessionJob, previousJob, false); @@ -113,7 +113,7 @@ public UpdateControl reconcile( reconciler.reconcile(ctx); } catch (Exception e) { eventRecorder.triggerEvent( - flinkSessionJob, + ctx, EventRecorder.Type.Warning, "SessionJobException", e.getMessage(), @@ -130,18 +130,19 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) { if (canaryResourceManager.handleCanaryResourceDeletion(sessionJob)) { return DeleteControl.defaultDelete(); } + var ctx = ctxFactory.getResourceContext(sessionJob, josdkContext); + ctx.setIgnoreEventErrors(true); String msg = "Cleaning up " + FlinkSessionJob.class.getSimpleName(); LOG.info(msg); eventRecorder.triggerEvent( - sessionJob, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.Cleanup, EventRecorder.Component.Operator, msg); statusRecorder.removeCachedStatus(sessionJob); - var ctx = ctxFactory.getResourceContext(sessionJob, josdkContext); return reconciler.cleanup(ctx); } @@ -163,20 +164,21 @@ public Map prepareEventSources( EventSourceUtils.getFlinkDeploymentInformerEventSource(context)); } - private boolean validateSessionJob(FlinkSessionJob sessionJob, Context context) { + private boolean validateSessionJob(FlinkResourceContext ctx) { for (FlinkResourceValidator validator : validators) { Optional validationError = validator.validateSessionJob( - sessionJob, context.getSecondaryResource(FlinkDeployment.class)); + ctx.getResource(), + ctx.getJosdkContext().getSecondaryResource(FlinkDeployment.class)); if (validationError.isPresent()) { eventRecorder.triggerEvent( - sessionJob, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.ValidationError, EventRecorder.Component.Operator, validationError.get()); return ReconciliationUtils.applyValidationErrorAndResetSpec( - sessionJob, + ctx.getResource(), validationError.get(), configManager.getOperatorConfiguration()); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java index ee68d53f2f..90e0c40e9a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java @@ -52,7 +52,7 @@ public final void observe(FlinkResourceContext ctx) { // Trigger resource specific observe logic observeInternal(ctx); - SavepointUtils.resetTriggerIfJobNotRunning(ctx.getResource(), eventRecorder); + SavepointUtils.resetTriggerIfJobNotRunning(ctx, eventRecorder); } /** diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index 1062796f34..82cc707fb4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -17,7 +17,6 @@ package org.apache.flink.kubernetes.operator.observer; -import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.status.JobStatus; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; @@ -86,7 +85,7 @@ public boolean observe(FlinkResourceContext ctx) { if (targetJobStatusMessage.isEmpty()) { LOG.warn("No matching jobs found on the cluster"); ifRunningMoveToReconciling(jobStatus, previousJobStatus); - onTargetJobNotFound(resource, ctx.getObserveConfig()); + onTargetJobNotFound(ctx); return false; } else { updateJobStatus(ctx, targetJobStatusMessage.get()); @@ -97,7 +96,7 @@ public boolean observe(FlinkResourceContext ctx) { LOG.debug("No jobs found on the cluster"); // No jobs found on the cluster, it is possible that the jobmanager is still starting up ifRunningMoveToReconciling(jobStatus, previousJobStatus); - onNoJobsFound(resource, ctx.getObserveConfig()); + onNoJobsFound(ctx); return false; } } @@ -105,18 +104,16 @@ public boolean observe(FlinkResourceContext ctx) { /** * Callback when no matching target job was found on a cluster where jobs were found. * - * @param resource The Flink resource. - * @param config Deployed/observe configuration. + * @param ctx The Flink resource context. */ - protected abstract void onTargetJobNotFound(R resource, Configuration config); + protected abstract void onTargetJobNotFound(FlinkResourceContext ctx); /** * Callback when no jobs were found on the cluster. * - * @param resource The Flink resource. - * @param config Deployed/observe configuration. + * @param ctx The Flink resource context. */ - protected void onNoJobsFound(R resource, Configuration config) {} + protected void onNoJobsFound(FlinkResourceContext ctx) {} /** * If we observed the job previously in RUNNING state we move to RECONCILING instead as we are @@ -181,7 +178,7 @@ private void updateJobStatus(FlinkResourceContext ctx, JobStatusMessage clust setErrorIfPresent(ctx, clusterJobStatus); eventRecorder.triggerEvent( - resource, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.JobStatusChanged, EventRecorder.Component.Job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java index 8b7042e16a..3690410cb0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java @@ -111,7 +111,7 @@ private void observeTriggeredSavepoint(FlinkResourceContext ctx, String jobI LOG.warn("Savepoint failed within grace period, retrying: " + err); } eventRecorder.triggerEvent( - resource, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java index cceb6aad44..9e6959703a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java @@ -155,7 +155,7 @@ protected void observeJmDeployment(FlinkResourceContext ctx) { if (previousJmStatus != JobManagerDeploymentStatus.MISSING && previousJmStatus != JobManagerDeploymentStatus.ERROR) { - onMissingDeployment(flinkApp); + onMissingDeployment(ctx); } } @@ -217,15 +217,15 @@ protected boolean isSuspendedJob(FlinkDeployment deployment) { && lastReconciledSpec.getJob().getState() == JobState.SUSPENDED; } - private void onMissingDeployment(FlinkDeployment deployment) { + private void onMissingDeployment(FlinkResourceContext ctx) { String err = "Missing JobManager deployment"; logger.error(err); ReconciliationUtils.updateForReconciliationError( - deployment, + ctx.getResource(), new MissingJobManagerException(err), configManager.getOperatorConfiguration()); eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.JobManagerDeployment, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java index a08cccda38..4d437ee3c0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java @@ -17,7 +17,6 @@ package org.apache.flink.kubernetes.operator.observer.deployment; -import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobStatus; @@ -88,31 +87,31 @@ protected Optional filterTargetJob( } @Override - protected void onTargetJobNotFound(FlinkDeployment resource, Configuration config) { + protected void onTargetJobNotFound(FlinkResourceContext ctx) { // This should never happen for application clusters, there is something // wrong - setUnknownJobError(resource); + setUnknownJobError(ctx); } /** * We found a job on an application cluster that doesn't match the expected job. Trigger * error. * - * @param deployment Application deployment. + * @param ctx Application deployment ctx. */ - private void setUnknownJobError(FlinkDeployment deployment) { - deployment + private void setUnknownJobError(FlinkResourceContext ctx) { + ctx.getResource() .getStatus() .getJobStatus() .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); String err = "Unrecognized Job for Application deployment"; logger.error(err); ReconciliationUtils.updateForReconciliationError( - deployment, + ctx.getResource(), new UnknownJobException(err), configManager.getOperatorConfiguration()); eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.Job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java index a9b3df2330..c17474ede0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.observer.sessionjob; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.api.status.JobStatus; @@ -164,38 +163,36 @@ protected Optional filterTargetJob( } @Override - protected void onTargetJobNotFound(FlinkSessionJob resource, Configuration config) { - ifHaDisabledMarkSessionJobMissing(resource, config); + protected void onTargetJobNotFound(FlinkResourceContext ctx) { + ifHaDisabledMarkSessionJobMissing(ctx); } @Override - protected void onNoJobsFound(FlinkSessionJob resource, Configuration config) { - ifHaDisabledMarkSessionJobMissing(resource, config); + protected void onNoJobsFound(FlinkResourceContext ctx) { + ifHaDisabledMarkSessionJobMissing(ctx); } /** * When HA is disabled the session job will not recover on JM restarts. If the JM goes down * / restarted the session job should be marked missing. * - * @param sessionJob Flink session job. - * @param conf Flink config. + * @param ctx Flink session job context. */ - private void ifHaDisabledMarkSessionJobMissing( - FlinkSessionJob sessionJob, Configuration conf) { - if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) { + private void ifHaDisabledMarkSessionJobMissing(FlinkResourceContext ctx) { + if (HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())) { return; } - sessionJob + ctx.getResource() .getStatus() .getJobStatus() .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); LOG.error(MISSING_SESSION_JOB_ERR); ReconciliationUtils.updateForReconciliationError( - sessionJob, + ctx.getResource(), new MissingSessionJobException(MISSING_SESSION_JOB_ERR), configManager.getOperatorConfiguration()); eventRecorder.triggerEvent( - sessionJob, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.Missing, EventRecorder.Component.Job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 251b854718..200621ca15 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -143,7 +143,7 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { LOG.info(specChangeMessage); if (reconciliationStatus.getState() != ReconciliationState.UPGRADING) { eventRecorder.triggerEvent( - cr, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.SpecChanged, EventRecorder.Component.JobManagerDeployment, @@ -168,7 +168,7 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } LOG.warn(MSG_ROLLBACK); eventRecorder.triggerEvent( - cr, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.Rollback, EventRecorder.Component.JobManagerDeployment, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index b20f0074e4..f471926b49 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -108,7 +108,7 @@ protected boolean reconcileSpecChange(FlinkResourceContext ctx, Configuratio } eventRecorder.triggerEvent( - resource, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.Suspended, EventRecorder.Component.JobManagerDeployment, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 346accaa88..dd7fc47dc2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -175,7 +175,7 @@ public void deploy( setJobIdIfNecessary(spec, relatedResource, deployConfig); eventRecorder.triggerEvent( - relatedResource, + ctx, EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.JobManagerDeployment, @@ -265,7 +265,7 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) if (shouldRestartJobBecauseUnhealthy || shouldRecoverDeployment) { if (shouldRecoverDeployment) { eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.RecoverDeployment, EventRecorder.Component.Job, @@ -274,7 +274,7 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) if (shouldRestartJobBecauseUnhealthy) { eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.RestartUnhealthyJob, EventRecorder.Component.Job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index eb486a246e..3af84e2ccd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -150,7 +150,7 @@ public DeleteControl cleanupInternal(FlinkResourceContext ctx) .map(job -> job.getMetadata().getName()) .collect(Collectors.toList())); if (eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.CleanupFailed, EventRecorder.Component.Operator, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index d3a3a51c93..ad39461039 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -21,10 +21,13 @@ import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.listener.AuditUtils; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.client.KubernetesClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.function.BiConsumer; @@ -32,6 +35,8 @@ /** Helper class for creating Kubernetes events for Flink resources. */ public class EventRecorder { + private static final Logger LOG = LoggerFactory.getLogger(EventRecorder.class); + private final KubernetesClient client; private final BiConsumer, Event> eventListener; @@ -42,28 +47,37 @@ public EventRecorder( } public boolean triggerEvent( - AbstractFlinkResource resource, + FlinkResourceContext ctx, Type type, Reason reason, Component component, String message) { - return triggerEvent(resource, type, reason.toString(), message, component); + return triggerEvent(ctx, type, reason.toString(), message, component); } public boolean triggerEvent( - AbstractFlinkResource resource, + FlinkResourceContext ctx, Type type, String reason, String message, Component component) { - return EventUtils.createOrUpdateEvent( - client, - resource, - type, - reason, - message, - component, - e -> eventListener.accept(resource, e)); + try { + return EventUtils.createOrUpdateEvent( + client, + ctx.getResource(), + type, + reason, + message, + component, + e -> eventListener.accept(ctx.getResource(), e)); + } catch (Exception err) { + if (ctx.isIgnoreEventErrors()) { + LOG.error("Error while creating event, ignoring...", err); + return false; + } else { + throw err; + } + } } public static EventRecorder create( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java index ac7de49ef8..c02d3cc392 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.api.status.SavepointInfo; import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -190,7 +191,8 @@ public static boolean gracePeriodEnded(Configuration conf, SavepointInfo savepoi } public static void resetTriggerIfJobNotRunning( - AbstractFlinkResource resource, EventRecorder eventRecorder) { + FlinkResourceContext ctx, EventRecorder eventRecorder) { + var resource = ctx.getResource(); var status = resource.getStatus(); var jobStatus = status.getJobStatus(); if (!ReconciliationUtils.isJobRunning(status) @@ -200,7 +202,7 @@ public static void resetTriggerIfJobNotRunning( savepointInfo.resetTrigger(); LOG.error("Job is not running, cancelling savepoint operation"); eventRecorder.triggerEvent( - resource, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 541da7aa1f..e46e863954 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -47,6 +47,7 @@ import io.fabric8.kubernetes.api.model.networking.v1.Ingress; import io.fabric8.kubernetes.api.model.networking.v1.IngressRule; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -1128,6 +1129,25 @@ public void verifyCanaryHandling() throws Exception { assertEquals(0, testController.getCanaryResourceManager().getNumberOfActiveCanaries()); } + @Test + public void testEventErrorHandlingDuringCleanup() { + FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); + mockServer + .expect() + .post() + .withPath("/api/v1/namespaces/flink-operator-test/events") + .andReply( + 400, + r -> { + throw new KubernetesClientException("TestErr"); + }) + .always(); + var deleteControl = testController.cleanup(flinkDeployment, context); + assertTrue(deleteControl.isRemoveFinalizer()); + assertNotNull(deleteControl); + assertTrue(testController.events().isEmpty()); + } + private HasMetadata getIngress(FlinkDeployment deployment) { if (IngressUtils.ingressInNetworkingV1(kubernetesClient)) { return kubernetesClient diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index 8eeb49854d..c518d38226 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -33,7 +33,9 @@ import org.apache.flink.runtime.client.JobStatusMessage; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.Assertions; @@ -55,6 +57,8 @@ /** {@link FlinkSessionJobController} tests. */ @EnableKubernetesMockClient(crud = true) class FlinkSessionJobControllerTest { + + private KubernetesMockServer mockServer; private KubernetesClient kubernetesClient; private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration()); private final Context context = TestUtils.createContextWithReadyFlinkDeployment(); @@ -511,4 +515,22 @@ public void verifyCanaryHandling() throws Exception { assertEquals(0, testController.getInternalStatusUpdateCount()); assertEquals(0, testController.getCanaryResourceManager().getNumberOfActiveCanaries()); } + + @Test + public void testEventErrorHandlingDuringCleanup() { + mockServer + .expect() + .post() + .withPath("/api/v1/namespaces/flink-operator-test/events") + .andReply( + 400, + r -> { + throw new KubernetesClientException("TestErr"); + }) + .always(); + var deleteControl = testController.cleanup(sessionJob, context); + assertTrue(deleteControl.isRemoveFinalizer()); + assertNotNull(deleteControl); + assertTrue(testController.events().isEmpty()); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java index 0df27576e9..cd52d49daf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; @@ -100,15 +101,16 @@ public void testListeners() { JobManagerDeploymentStatus.DEPLOYING, updateContext.getNewStatus().getJobManagerDeploymentStatus()); + var ctx = new FlinkDeploymentContext(deployment, null, null, null, null); eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, "err"); assertEquals(1, listener1.events.size()); eventRecorder.triggerEvent( - deployment, + ctx, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator,