From 9e823ce42583f92ff5e409eacf82d93bc168762b Mon Sep 17 00:00:00 2001 From: Pan Yuepeng Date: Sat, 9 Aug 2025 00:28:29 +0800 Subject: [PATCH] [FLINK-38229][runtime-web] Enhanced Job History Retention Policies for HistoryServer --- .../history_server_configuration.html | 14 +- .../configuration/HistoryServerOptions.java | 96 ++++++++ .../webmonitor/history/HistoryServer.java | 10 +- .../history/HistoryServerArchiveFetcher.java | 18 +- .../CompositeBasedJobRetainedStrategy.java | 193 +++++++++++++++ .../JobArchivesRetainedStrategy.java | 34 +++ ...CompositeBasedJobRetainedStrategyTest.java | 229 ++++++++++++++++++ 7 files changed, 573 insertions(+), 21 deletions(-) create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategy.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobArchivesRetainedStrategy.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategyTest.java diff --git a/docs/layouts/shortcodes/generated/history_server_configuration.html b/docs/layouts/shortcodes/generated/history_server_configuration.html index 3f80eb1fbdf1b..179b1c7cc0c75 100644 --- a/docs/layouts/shortcodes/generated/history_server_configuration.html +++ b/docs/layouts/shortcodes/generated/history_server_configuration.html @@ -27,10 +27,16 @@ Interval for refreshing the archived job directories. -
historyserver.archive.retained-jobs
- -1 - Integer - The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an IllegalConfigurationException. +
historyserver.archive.retained-jobs.mode
+ None +

Enum

+ The mode to retain the jobs archived in each archive directory defined by `historyserver.archive.fs.dir`.

Possible values: + + +
historyserver.archive.retained-jobs.thresholds
+ quantity:-1,ttl:0ms + Map + When the parameter `historyserver.archive.retained-jobs.mode` is enabled, the value of the current configuration item will take effect.
historyserver.log.jobmanager.url-pattern
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index 0ead9cdeed1a6..4ca1f8bad672c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -22,9 +22,12 @@ import org.apache.flink.configuration.description.Description; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; /** The set of configuration options relating to the HistoryServer. */ @PublicEvolving @@ -126,6 +129,11 @@ public class HistoryServerOptions { "Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the" + " global SSL flag security.ssl.enabled is set to true."); + /** + * @deprecated Use {@link HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS_MODE and + * HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS_MODE_TTL_THRESHOLDS} to instead. + */ + @Deprecated public static final ConfigOption HISTORY_SERVER_RETAINED_JOBS = key("historyserver.archive.retained-jobs") .intType() @@ -143,5 +151,93 @@ public class HistoryServerOptions { code("IllegalConfigurationException")) .build()); + public static final ConfigOption HISTORY_SERVER_RETAINED_JOBS_MODE = + key("historyserver.archive.retained-jobs.mode") + .enumType(JobArchivedRetainedMode.class) + .defaultValue(JobArchivedRetainedMode.None) + .withDescription( + Description.builder() + .text( + String.format( + "The mode to retain the jobs archived in each archive directory defined by `%s`. ", + HISTORY_SERVER_ARCHIVE_DIRS.key())) + .list( + text( + "The %s mode will retain all jobs archived in each archive directory.", + code(JobArchivedRetainedMode.None.name())), + text( + "The %s mode will retain the jobs archived whose modification time is not out of the configured time to live in each archive directory.", + code(JobArchivedRetainedMode.Ttl.name())), + text( + "The %s mode will retain the jobs archived whose modification index is smaller than the configured quantity in each archive directory.", + code(JobArchivedRetainedMode.Quantity.name())), + text( + "The %s mode will retain the jobs archived whose conditions are both met with %s rule and %s rule in each archive directory.", + code( + JobArchivedRetainedMode.TtlAndQuantity + .name()), + code(JobArchivedRetainedMode.Ttl.name()), + code(JobArchivedRetainedMode.Quantity.name())), + text( + "The %s mode will retain the jobs archived whose conditions are met with %s rule or %s rule in each archive directory.", + code( + JobArchivedRetainedMode.TtlOrQuantity + .name()), + code(JobArchivedRetainedMode.Ttl.name()), + code(JobArchivedRetainedMode.Quantity.name()))) + .build()); + + public static final ConfigOption> + HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS = + key("historyserver.archive.retained-jobs.thresholds") + .mapType() + .defaultValue( + new HashMap<>() { + { + put("ttl", "0ms"); + put("quantity", "-1"); + } + }) + .withDescription( + Description.builder() + .text( + String.format( + "When the parameter `%s` is enabled, the value of the current configuration item will take effect. ", + HISTORY_SERVER_RETAINED_JOBS_MODE + .key())) + .list( + text( + "The 'ttl' sub-option represents the time to live for the jobs archived in each archive directory. " + + "The default value is `0 ms`, " + + "which means that when the TTL-based retention policy is enabled, all jobs archived will be retained. "), + text( + "The 'quantity' sub-option represents the quantity of the jobs archived in each archive directory " + + "based on the descending job archived modification time. " + + "The default value is `-1`, " + + "which means that when the quantity-based retention policy is enabled, all jobs archived will be retained. ")) + .build()); + + public enum JobArchivedRetainedMode { + /** Keep all jobs archive files. */ + None, + /** Keep the jobs archive files whose modified time in the time to live duration. */ + Ttl, + /** + * Keep the jobs archive files whose ordered index based on modified time is smaller or + * equals to the quantity threshold. + */ + Quantity, + /** + * Keep the jobs archive files whose conditions are meet with {@link + * JobArchivedRetainedMode#Ttl} and {@link JobArchivedRetainedMode#Quantity}. + */ + TtlAndQuantity, + /** + * Keep the jobs archive files whose conditions are meet with {@link + * JobArchivedRetainedMode#Ttl} or {@link JobArchivedRetainedMode#Quantity}. + */ + TtlOrQuantity + } + private HistoryServerOptions() {} } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 00b96e0bbf3d2..127f085fd4f52 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HistoryServerOptions; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.plugin.PluginUtils; @@ -38,6 +37,7 @@ import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.Runnables; +import org.apache.flink.runtime.webmonitor.history.retaining.CompositeBasedJobRetainedStrategy; import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.ExceptionUtils; @@ -238,19 +238,13 @@ public HistoryServer( refreshIntervalMillis = config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis(); - int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS); - if (maxHistorySize == 0 || maxHistorySize < -1) { - throw new IllegalConfigurationException( - "Cannot set %s to 0 or less than -1", - HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key()); - } archiveFetcher = new HistoryServerArchiveFetcher( refreshDirs, webDir, jobArchiveEventListener, cleanupExpiredArchives, - maxHistorySize); + CompositeBasedJobRetainedStrategy.createFrom(config)); this.shutdownHook = ShutdownHookUtil.addShutdownHook( diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index a8d782354a05d..89999f2fe8e8f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.webmonitor.history.retaining.JobArchivesRetainedStrategy; import org.apache.flink.util.FileUtils; import org.apache.flink.util.jackson.JacksonMapperFactory; @@ -112,8 +113,7 @@ public ArchiveEventType getType() { private final List refreshDirs; private final Consumer jobArchiveEventListener; private final boolean processExpiredArchiveDeletion; - private final boolean processBeyondLimitArchiveDeletion; - private final int maxHistorySize; + private final JobArchivesRetainedStrategy jobRetainedStrategy; /** Cache of all available jobs identified by their id. */ private final Map> cachedArchivesPerRefreshDirectory; @@ -127,13 +127,12 @@ public ArchiveEventType getType() { File webDir, Consumer jobArchiveEventListener, boolean cleanupExpiredArchives, - int maxHistorySize) + JobArchivesRetainedStrategy jobRetainedStrategy) throws IOException { this.refreshDirs = checkNotNull(refreshDirs); this.jobArchiveEventListener = jobArchiveEventListener; this.processExpiredArchiveDeletion = cleanupExpiredArchives; - this.maxHistorySize = maxHistorySize; - this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0; + this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy); this.cachedArchivesPerRefreshDirectory = new HashMap<>(); for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>()); @@ -176,7 +175,7 @@ void fetchArchives() { continue; } - int historySize = 0; + int fileOrderedIndexOnModifiedTime = 0; for (FileStatus jobArchive : jobArchives) { Path jobArchivePath = jobArchive.getPath(); String jobID = jobArchivePath.getName(); @@ -186,8 +185,9 @@ void fetchArchives() { jobsToRemove.get(refreshDir).remove(jobID); - historySize++; - if (historySize > maxHistorySize && processBeyondLimitArchiveDeletion) { + fileOrderedIndexOnModifiedTime++; + if (!jobRetainedStrategy.shouldRetain( + jobArchive, fileOrderedIndexOnModifiedTime)) { archivesBeyondSizeLimit .computeIfAbsent(refreshDir, ignored -> new HashSet<>()) .add(jobArchivePath); @@ -220,7 +220,7 @@ void fetchArchives() { && processExpiredArchiveDeletion) { events.addAll(cleanupExpiredJobs(jobsToRemove)); } - if (!archivesBeyondSizeLimit.isEmpty() && processBeyondLimitArchiveDeletion) { + if (!archivesBeyondSizeLimit.isEmpty()) { events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit)); } if (!events.isEmpty()) { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategy.java new file mode 100644 index 0000000000000..588d9885d5e52 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategy.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history.retaining; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.HistoryServerOptions.JobArchivedRetainedMode; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.util.TimeUtils; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS_MODE; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS; + +/** The composite based retained strategy. */ +public class CompositeBasedJobRetainedStrategy implements JobArchivesRetainedStrategy { + + public static JobArchivesRetainedStrategy createFrom(ReadableConfig config) { + Optional retainedModeOpt = + config.getOptional(HISTORY_SERVER_RETAINED_JOBS_MODE); + JobArchivedRetainedMode retainedMode; + if (!retainedModeOpt.isPresent()) { + int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS); + if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) { + throw new IllegalConfigurationException( + "Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key()); + } + return maxHistorySizeByOldKey != HISTORY_SERVER_RETAINED_JOBS.defaultValue() + ? new CompositeBasedJobRetainedStrategy( + LogicEvaluate.OR, + new QuantityBasedJobRetainedStrategy(maxHistorySizeByOldKey)) + : new CompositeBasedJobRetainedStrategy(LogicEvaluate.OR); + } else { + retainedMode = retainedModeOpt.get(); + } + + return getTargetJobRetainedStrategy(config, retainedMode); + } + + private static CompositeBasedJobRetainedStrategy getTargetJobRetainedStrategy( + ReadableConfig config, JobArchivedRetainedMode retainedMode) { + Map thresholds = config.get(HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS); + + switch (retainedMode) { + case None: + return new CompositeBasedJobRetainedStrategy(LogicEvaluate.OR); + case Ttl: + return new CompositeBasedJobRetainedStrategy( + LogicEvaluate.OR, new TimeToLiveBasedJobRetainedStrategy(thresholds)); + case Quantity: + return new CompositeBasedJobRetainedStrategy( + LogicEvaluate.OR, new QuantityBasedJobRetainedStrategy(thresholds)); + case TtlAndQuantity: + return new CompositeBasedJobRetainedStrategy( + LogicEvaluate.AND, + new TimeToLiveBasedJobRetainedStrategy(thresholds), + new QuantityBasedJobRetainedStrategy(thresholds)); + case TtlOrQuantity: + return new CompositeBasedJobRetainedStrategy( + LogicEvaluate.OR, + new TimeToLiveBasedJobRetainedStrategy(thresholds), + new QuantityBasedJobRetainedStrategy(thresholds)); + default: + throw new IllegalConfigurationException( + "Unsupported retained mode " + retainedMode); + } + } + + /** The enum to represent the logic to evaluate the target retained strategies. */ + enum LogicEvaluate { + /** Logic OR Rule to evaluate the target strategies. */ + OR, + /** Logic AND Rule to evaluate the target strategies. */ + AND, + } + + private final LogicEvaluate logicEvaluate; + private final List strategies; + + CompositeBasedJobRetainedStrategy( + LogicEvaluate logicEvaluate, JobArchivesRetainedStrategy... strategies) { + this.logicEvaluate = logicEvaluate; + this.strategies = + strategies == null || strategies.length == 0 + ? Collections.emptyList() + : Arrays.asList(strategies); + } + + @Override + public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { + if (strategies.isEmpty()) { + return true; + } + + if (logicEvaluate == LogicEvaluate.OR) { + return strategies.stream().anyMatch(s -> s.shouldRetain(file, fileOrderedIndex)); + } else if (logicEvaluate == LogicEvaluate.AND) { + return strategies.stream().allMatch(s -> s.shouldRetain(file, fileOrderedIndex)); + } + return true; + } + + @VisibleForTesting + public List getStrategies() { + return Collections.unmodifiableList(strategies); + } + + @VisibleForTesting + public LogicEvaluate getLogicEvaluate() { + return logicEvaluate; + } +} + +/** The time to live based retained strategy. */ +class TimeToLiveBasedJobRetainedStrategy implements JobArchivesRetainedStrategy { + + static final String TTL_KEY = "ttl"; + + private final Duration ttlThreshold; + + TimeToLiveBasedJobRetainedStrategy(Map thresholds) { + this.ttlThreshold = + TimeUtils.parseDuration( + thresholds.getOrDefault( + TTL_KEY, + HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS + .defaultValue() + .get(TTL_KEY))); + } + + @Override + public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { + if (ttlThreshold == null || ttlThreshold.toMillis() <= 0L) { + return true; + } + return Instant.now().toEpochMilli() - file.getModificationTime() < ttlThreshold.toMillis(); + } +} + +/** The job quantity based retained strategy. */ +class QuantityBasedJobRetainedStrategy implements JobArchivesRetainedStrategy { + + static final String QUANTITY_KEY = "quantity"; + + private final Integer quantityThreshold; + + QuantityBasedJobRetainedStrategy(Map thresholds) { + this.quantityThreshold = + Integer.parseInt( + thresholds.getOrDefault( + QUANTITY_KEY, + HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS + .defaultValue() + .get(QUANTITY_KEY))); + } + + QuantityBasedJobRetainedStrategy(int quantityThreshold) { + this.quantityThreshold = quantityThreshold; + } + + @Override + public boolean shouldRetain(FileStatus file, int fileOrderedIndex) { + if (quantityThreshold == null || quantityThreshold < 0) { + return true; + } + return quantityThreshold >= fileOrderedIndex; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobArchivesRetainedStrategy.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobArchivesRetainedStrategy.java new file mode 100644 index 0000000000000..f00cb20994b8d --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/retaining/JobArchivesRetainedStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history.retaining; + +import org.apache.flink.core.fs.FileStatus; + +/** To define the strategy interface to judge whether the file should be retained. */ +public interface JobArchivesRetainedStrategy { + + /** + * Judge whether the file should be retained. + * + * @param file the target file to judge. + * @param fileOrderedIndex the specified order index position of the target file, + * @return The result that indicates whether the file should be retained. + */ + boolean shouldRetain(FileStatus file, int fileOrderedIndex); +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategyTest.java new file mode 100644 index 0000000000000..6ae7f6601808d --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/retaining/CompositeBasedJobRetainedStrategyTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.history.retaining; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.Path; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS_MODE; +import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS; +import static org.apache.flink.configuration.HistoryServerOptions.JobArchivedRetainedMode; +import static org.apache.flink.runtime.webmonitor.history.retaining.CompositeBasedJobRetainedStrategy.LogicEvaluate.AND; +import static org.apache.flink.runtime.webmonitor.history.retaining.CompositeBasedJobRetainedStrategy.LogicEvaluate.OR; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Testing for {@link CompositeBasedJobRetainedStrategy}. */ +class CompositeBasedJobRetainedStrategyTest { + + @Test + void testCompatibilityBetweenOldSizedAndModeBasedJobRetainedStrategy() { + final Configuration confWithoutNewOptionValue = new Configuration(); + CompositeBasedJobRetainedStrategy targetJobRetainedStrategy; + + targetJobRetainedStrategy = + (CompositeBasedJobRetainedStrategy) + CompositeBasedJobRetainedStrategy.createFrom(confWithoutNewOptionValue); + assertThat(targetJobRetainedStrategy.getStrategies()).isEmpty(); + + confWithoutNewOptionValue.set(HISTORY_SERVER_RETAINED_JOBS, -2); + assertIllegalConfigurationException(confWithoutNewOptionValue); + + confWithoutNewOptionValue.set(HISTORY_SERVER_RETAINED_JOBS, 0); + assertIllegalConfigurationException(confWithoutNewOptionValue); + + confWithoutNewOptionValue.set(HISTORY_SERVER_RETAINED_JOBS, 1); + assertOnlyDeprecatedOptionEvaluateLogic( + confWithoutNewOptionValue, QuantityBasedJobRetainedStrategy.class); + + Configuration confWithNewOptionValue = new Configuration(); + confWithNewOptionValue.set( + HISTORY_SERVER_RETAINED_JOBS_MODE, JobArchivedRetainedMode.Quantity); + assertOnlyDeprecatedOptionEvaluateLogic( + confWithNewOptionValue, QuantityBasedJobRetainedStrategy.class); + + confWithNewOptionValue.set(HISTORY_SERVER_RETAINED_JOBS_MODE, JobArchivedRetainedMode.Ttl); + assertOnlyDeprecatedOptionEvaluateLogic( + confWithNewOptionValue, TimeToLiveBasedJobRetainedStrategy.class); + + confWithNewOptionValue.set( + HISTORY_SERVER_RETAINED_JOBS_MODE, JobArchivedRetainedMode.TtlAndQuantity); + assertComposedStrategies(confWithNewOptionValue, AND); + + confWithNewOptionValue.set( + HISTORY_SERVER_RETAINED_JOBS_MODE, JobArchivedRetainedMode.TtlOrQuantity); + assertComposedStrategies(confWithNewOptionValue, OR); + } + + private static void assertComposedStrategies( + Configuration confWithNewOptionValue, + CompositeBasedJobRetainedStrategy.LogicEvaluate logicEvaluate) { + CompositeBasedJobRetainedStrategy targetJobRetainedStrategy; + targetJobRetainedStrategy = + (CompositeBasedJobRetainedStrategy) + CompositeBasedJobRetainedStrategy.createFrom(confWithNewOptionValue); + assertThat(targetJobRetainedStrategy.getStrategies()) + .hasSize(2) + .hasOnlyElementsOfTypes( + QuantityBasedJobRetainedStrategy.class, + TimeToLiveBasedJobRetainedStrategy.class); + assertThat(targetJobRetainedStrategy.getLogicEvaluate()).isEqualTo(logicEvaluate); + } + + private static void assertOnlyDeprecatedOptionEvaluateLogic( + Configuration confWithoutNewOptionValue, Class targetStrategyClass) { + CompositeBasedJobRetainedStrategy targetJobRetainedStrategy; + targetJobRetainedStrategy = + (CompositeBasedJobRetainedStrategy) + CompositeBasedJobRetainedStrategy.createFrom(confWithoutNewOptionValue); + assertThat(targetJobRetainedStrategy.getStrategies()) + .hasSize(1) + .hasExactlyElementsOfTypes(targetStrategyClass); + } + + private static void assertIllegalConfigurationException( + Configuration confWithoutNewOptionValue) { + assertThatThrownBy( + () -> + CompositeBasedJobRetainedStrategy.createFrom( + confWithoutNewOptionValue)) + .isInstanceOf(IllegalConfigurationException.class); + } + + @Test + void testTimeToLiveBasedJobRetainedStrategy() { + Map props = new HashMap<>(); + JobArchivesRetainedStrategy strategy = new TimeToLiveBasedJobRetainedStrategy(props); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); + assertThat( + strategy.shouldRetain( + new TestingFileStatus( + Instant.now().toEpochMilli() + - Duration.ofMinutes(1).toMillis()), + 1)) + .isTrue(); + + props = Map.of("ttl", "1min"); + strategy = new TimeToLiveBasedJobRetainedStrategy(props); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); + assertThat( + strategy.shouldRetain( + new TestingFileStatus( + Instant.now().toEpochMilli() + - Duration.ofMinutes(1).toMillis()), + 1)) + .isFalse(); + } + + @Test + void testQuantityBasedJobRetainedStrategy() { + Map props = new HashMap<>(); + JobArchivesRetainedStrategy strategy = new TimeToLiveBasedJobRetainedStrategy(props); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isTrue(); + + props = Map.of("quantity", "2"); + strategy = new QuantityBasedJobRetainedStrategy(props); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isFalse(); + } + + @Test + void testCompositeBasedJobRetainedStrategy() { + + final long outOfTtlMillis = + Instant.now().toEpochMilli() - Duration.ofMinutes(2L).toMillis(); + + final Configuration conf = new Configuration(); + conf.set( + HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS, + Map.of("quantity", "2", "ttl", "1min")); + conf.set(HISTORY_SERVER_RETAINED_JOBS_MODE, JobArchivedRetainedMode.TtlAndQuantity); + JobArchivesRetainedStrategy strategy = CompositeBasedJobRetainedStrategy.createFrom(conf); + assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isFalse(); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 10)).isFalse(); + assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isFalse(); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); + + conf.set(HISTORY_SERVER_RETAINED_JOBS_MODE, JobArchivedRetainedMode.TtlOrQuantity); + strategy = CompositeBasedJobRetainedStrategy.createFrom(conf); + assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 1)).isTrue(); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 3)).isTrue(); + assertThat(strategy.shouldRetain(new TestingFileStatus(), 1)).isTrue(); + assertThat(strategy.shouldRetain(new TestingFileStatus(outOfTtlMillis), 3)).isFalse(); + } + + private static final class TestingFileStatus implements FileStatus { + + private final long modificationTime; + + TestingFileStatus() { + this(Instant.now().toEpochMilli()); + } + + TestingFileStatus(long modificationTime) { + this.modificationTime = modificationTime; + } + + @Override + public long getLen() { + return 0; + } + + @Override + public long getBlockSize() { + return 0; + } + + @Override + public short getReplication() { + return 0; + } + + @Override + public long getModificationTime() { + return modificationTime; + } + + @Override + public long getAccessTime() { + return 0; + } + + @Override + public boolean isDir() { + return false; + } + + @Override + public Path getPath() { + return null; + } + } +}