diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java index a87de9641..f10bf0e06 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java @@ -21,6 +21,8 @@ import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.autoscaler.JobAutoScalerContext; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.validation.AutoscalerValidator; +import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -41,6 +43,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -51,6 +54,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM; @@ -69,6 +73,7 @@ public class StandaloneAutoscalerExecutor(); this.baseConf = new UnmodifiableConfiguration(conf); + this.autoscalerValidator = new DefaultAutoscalerValidator(); } public void start() { @@ -189,7 +195,19 @@ private void cleanupStoppedJob(Collection jobList) { protected void scalingSingleJob(Context jobContext) { try { MDC.put("job.key", jobContext.getJobKey().toString()); - autoScaler.scale(jobContext); + Optional validationError = + autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration()); + if (validationError.isPresent()) { + eventHandler.handleEvent( + jobContext, + AutoScalerEventHandler.Type.Warning, + "AutoScaler Options Validation", + validationError.get(), + null, + baseConf.get(SCALING_EVENT_INTERVAL)); + } else { + autoScaler.scale(jobContext); + } } catch (Throwable e) { LOG.error("Error while scaling job", e); eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e); diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java new file mode 100644 index 000000000..4b895bb4f --- /dev/null +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java @@ -0,0 +1,87 @@ +package org.apache.flink.autoscaler.standalone; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.JobAutoScaler; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.TestingEventCollector; +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +class StandaloneAutoscalerValidatorTest { + private List> jobList; + private TestingEventCollector> eventCollector; + private ConcurrentHashMap scaleCounter; + private Configuration correctConfiguration; + private Configuration invalidConfiguration; + + @BeforeEach + void setUp() { + jobList = new ArrayList<>(); + eventCollector = new TestingEventCollector<>(); + scaleCounter = new ConcurrentHashMap<>(); + + correctConfiguration = new Configuration(); + correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true); + + invalidConfiguration = new Configuration(); + invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true); + invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, -1.0); + } + + @Test + void testAutoScalerWithInvalidConfig() { + JobAutoScalerContext validJob = createJobAutoScalerContext(correctConfiguration); + JobAutoScalerContext invalidJob = createJobAutoScalerContext(invalidConfiguration); + + jobList.add(validJob); + jobList.add(invalidJob); + + try (StandaloneAutoscalerExecutor> autoscalerExecutor = + new StandaloneAutoscalerExecutor<>( + new Configuration(), + baseConf -> jobList, + eventCollector, + new JobAutoScaler<>() { + @Override + public void scale(JobAutoScalerContext context) { + scaleCounter.merge(context.getJobKey(), 1, Integer::sum); + } + + @Override + public void cleanup(JobAutoScalerContext context) { + // No cleanup required for the test + } + })) { + + List> scaledFutures = autoscalerExecutor.scaling(); + + // Verification triggers two scaling tasks + assertThat(scaledFutures).hasSize(2); + + // Only legally configured tasks are scaled + assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey()); + + // Verification Event Collector captures an event + assertThat(eventCollector.events).hasSize(1); + assertThat(eventCollector.events) + .allMatch(event -> event.getContext().equals(invalidJob)); + } + } + + private JobAutoScalerContext createJobAutoScalerContext(Configuration configuration) { + JobID jobID = new JobID(); + return new JobAutoScalerContext<>( + jobID, jobID, JobStatus.RUNNING, configuration, null, null); + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java new file mode 100644 index 000000000..9c293b335 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.validation; + +import org.apache.flink.configuration.Configuration; + +import java.util.Optional; + +/** Validator for Autoscaler. */ +public interface AutoscalerValidator { + + /** + * Validate autoscaler config and return optional error. + * + * @param flinkConf autoscaler config + * @return Optional error string, should be present iff validation resulted in an error + */ + Optional validateAutoscalerOptions(Configuration flinkConf); +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/DefaultAutoscalerValidator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/DefaultAutoscalerValidator.java new file mode 100644 index 000000000..a62c36bf6 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/DefaultAutoscalerValidator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.validation; + +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.utils.CalendarUtils; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; + +import java.util.Optional; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; + +/** Default implementation of {@link AutoscalerValidator}. */ +public class DefaultAutoscalerValidator implements AutoscalerValidator { + + public Optional validateAutoscalerOptions(Configuration flinkConf) { + + if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) { + return Optional.empty(); + } + return firstPresent( + validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d), + validateNumber(flinkConf, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d), + validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d), + validateNumber(flinkConf, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d), + validateNumber(flinkConf, UTILIZATION_MAX, flinkConf.get(UTILIZATION_TARGET), 1.0d), + validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, flinkConf.get(UTILIZATION_TARGET)), + CalendarUtils.validateExcludedPeriods(flinkConf)); + } + + @SafeVarargs + private static Optional firstPresent(Optional... errOpts) { + for (Optional opt : errOpts) { + if (opt.isPresent()) { + return opt; + } + } + return Optional.empty(); + } + + private static Optional validateNumber( + Configuration flinkConfiguration, + ConfigOption autoScalerConfig, + Double min, + Double max) { + try { + var configValue = flinkConfiguration.get(autoScalerConfig); + if (configValue != null) { + double value = configValue.doubleValue(); + if ((min != null && value < min) || (max != null && value > max)) { + return Optional.of( + String.format( + "The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]", + autoScalerConfig.key(), + min != null ? min.toString() : "-Infinity", + max != null ? max.toString() : "+Infinity")); + } + } + return Optional.empty(); + } catch (IllegalArgumentException e) { + return Optional.of( + String.format( + "Invalid value in the autoscaler config %s", autoScalerConfig.key())); + } + } + + private static Optional validateNumber( + Configuration flinkConfiguration, ConfigOption autoScalerConfig, Double min) { + return validateNumber(flinkConfiguration, autoScalerConfig, min, null); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index e2b5db856..3e15cd70e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -17,10 +17,9 @@ package org.apache.flink.kubernetes.operator.validation; -import org.apache.flink.autoscaler.config.AutoScalerOptions; -import org.apache.flink.autoscaler.utils.CalendarUtils; +import org.apache.flink.autoscaler.validation.AutoscalerValidator; +import org.apache.flink.autoscaler.validation.DefaultAutoscalerValidator; import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; @@ -65,10 +64,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN; -import static org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET; - /** Default validator implementation for {@link FlinkDeployment}. */ public class DefaultValidator implements FlinkResourceValidator { @@ -87,9 +82,11 @@ public class DefaultValidator implements FlinkResourceValidator { Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME); private final FlinkConfigManager configManager; + private final AutoscalerValidator autoscalerValidator; public DefaultValidator(FlinkConfigManager configManager) { this.configManager = configManager; + this.autoscalerValidator = new DefaultAutoscalerValidator(); } @Override @@ -597,62 +594,12 @@ private Optional validateServiceAccount(String serviceAccount) { return Optional.empty(); } - public static Optional validateAutoScalerFlinkConfiguration( + public Optional validateAutoScalerFlinkConfiguration( Map effectiveConfig) { if (effectiveConfig == null) { return Optional.empty(); } Configuration flinkConfiguration = Configuration.fromMap(effectiveConfig); - if (!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) { - return Optional.empty(); - } - return firstPresent( - validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d), - validateNumber(flinkConfiguration, AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d), - validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 1.0d), - validateNumber( - flinkConfiguration, AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d), - validateNumber( - flinkConfiguration, - UTILIZATION_MAX, - flinkConfiguration.get(UTILIZATION_TARGET), - 1.0d), - validateNumber( - flinkConfiguration, - UTILIZATION_MIN, - 0.0d, - flinkConfiguration.get(UTILIZATION_TARGET)), - CalendarUtils.validateExcludedPeriods(flinkConfiguration)); - } - - private static Optional validateNumber( - Configuration flinkConfiguration, - ConfigOption autoScalerConfig, - Double min, - Double max) { - try { - var configValue = flinkConfiguration.get(autoScalerConfig); - if (configValue != null) { - double value = configValue.doubleValue(); - if ((min != null && value < min) || (max != null && value > max)) { - return Optional.of( - String.format( - "The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]", - autoScalerConfig.key(), - min != null ? min.toString() : "-Infinity", - max != null ? max.toString() : "+Infinity")); - } - } - return Optional.empty(); - } catch (IllegalArgumentException e) { - return Optional.of( - String.format( - "Invalid value in the autoscaler config %s", autoScalerConfig.key())); - } - } - - private static Optional validateNumber( - Configuration flinkConfiguration, ConfigOption autoScalerConfig, Double min) { - return validateNumber(flinkConfiguration, autoScalerConfig, min, null); + return autoscalerValidator.validateAutoscalerOptions(flinkConfiguration); } }