Skip to content

[FLINK-38229][runtime-web] Enhanced Job History Retention Policies for HistoryServer #26902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@
<td>Interval for refreshing the archived job directories.</td>
</tr>
<tr>
<td><h5>historyserver.archive.retained-jobs</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
<td><h5>historyserver.archive.retained-jobs.mode</h5></td>
<td style="word-wrap: break-word;">None</td>
<td><p>Enum</p></td>
<td>The mode to retain the jobs archived in each archive directory defined by `historyserver.archive.fs.dir`. <ul><li>The <code class="highlighter-rouge">None</code> mode will retain all jobs archived in each archive directory.</li><li>The <code class="highlighter-rouge">Ttl</code> mode will retain the jobs archived whose modification time is not out of the configured time to live in each archive directory.</li><li>The <code class="highlighter-rouge">Quantity</code> mode will retain the jobs archived whose modification index is smaller than the configured quantity in each archive directory.</li><li>The <code class="highlighter-rouge">TtlAndQuantity</code> mode will retain the jobs archived whose conditions are both met with <code class="highlighter-rouge">Ttl</code> rule and <code class="highlighter-rouge">Quantity</code> rule in each archive directory.</li><li>The <code class="highlighter-rouge">TtlOrQuantity</code> mode will retain the jobs archived whose conditions are met with <code class="highlighter-rouge">Ttl</code> rule or <code class="highlighter-rouge">Quantity</code> rule in each archive directory.</li></ul><br /><br />Possible values:<ul><li>"None"</li><li>"Ttl"</li><li>"Quantity"</li><li>"TtlAndQuantity"</li><li>"TtlOrQuantity"</li></ul></td>
</tr>
<tr>
<td><h5>historyserver.archive.retained-jobs.thresholds</h5></td>
<td style="word-wrap: break-word;">quantity:-1,ttl:0ms</td>
<td>Map</td>
<td>When the parameter `historyserver.archive.retained-jobs.mode` is enabled, the value of the current configuration item will take effect. <ul><li>The 'ttl' sub-option represents the time to live for the jobs archived in each archive directory. The default value is `0 ms`, which means that when the TTL-based retention policy is enabled, all jobs archived will be retained. </li><li>The 'quantity' sub-option represents the quantity of the jobs archived in each archive directory based on the descending job archived modification time. The default value is `-1`, which means that when the quantity-based retention policy is enabled, all jobs archived will be retained. </li></ul></td>
</tr>
<tr>
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.flink.configuration.description.Description;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;

/** The set of configuration options relating to the HistoryServer. */
@PublicEvolving
Expand Down Expand Up @@ -126,6 +129,11 @@ public class HistoryServerOptions {
"Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the"
+ " global SSL flag security.ssl.enabled is set to true.");

/**
* @deprecated Use {@link HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS_MODE and
* HistoryServerOptions#HISTORY_SERVER_RETAINED_JOBS_MODE_TTL_THRESHOLDS} to instead.
*/
@Deprecated
public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
key("historyserver.archive.retained-jobs")
.intType()
Expand All @@ -143,5 +151,93 @@ public class HistoryServerOptions {
code("IllegalConfigurationException"))
.build());

public static final ConfigOption<JobArchivedRetainedMode> HISTORY_SERVER_RETAINED_JOBS_MODE =
key("historyserver.archive.retained-jobs.mode")
.enumType(JobArchivedRetainedMode.class)
.defaultValue(JobArchivedRetainedMode.None)
.withDescription(
Description.builder()
.text(
String.format(
"The mode to retain the jobs archived in each archive directory defined by `%s`. ",
HISTORY_SERVER_ARCHIVE_DIRS.key()))
.list(
text(
"The %s mode will retain all jobs archived in each archive directory.",
code(JobArchivedRetainedMode.None.name())),
text(
"The %s mode will retain the jobs archived whose modification time is not out of the configured time to live in each archive directory.",
code(JobArchivedRetainedMode.Ttl.name())),
text(
"The %s mode will retain the jobs archived whose modification index is smaller than the configured quantity in each archive directory.",
code(JobArchivedRetainedMode.Quantity.name())),
text(
"The %s mode will retain the jobs archived whose conditions are both met with %s rule and %s rule in each archive directory.",
code(
JobArchivedRetainedMode.TtlAndQuantity
.name()),
code(JobArchivedRetainedMode.Ttl.name()),
code(JobArchivedRetainedMode.Quantity.name())),
text(
"The %s mode will retain the jobs archived whose conditions are met with %s rule or %s rule in each archive directory.",
code(
JobArchivedRetainedMode.TtlOrQuantity
.name()),
code(JobArchivedRetainedMode.Ttl.name()),
code(JobArchivedRetainedMode.Quantity.name())))
.build());

public static final ConfigOption<Map<String, String>>
HISTORY_SERVER_RETAINED_JOBS_MODE_THRESHOLDS =
key("historyserver.archive.retained-jobs.thresholds")
.mapType()
.defaultValue(
new HashMap<>() {
{
put("ttl", "0ms");
put("quantity", "-1");
}
})
.withDescription(
Description.builder()
.text(
String.format(
"When the parameter `%s` is enabled, the value of the current configuration item will take effect. ",
HISTORY_SERVER_RETAINED_JOBS_MODE
.key()))
.list(
text(
"The 'ttl' sub-option represents the time to live for the jobs archived in each archive directory. "
+ "The default value is `0 ms`, "
+ "which means that when the TTL-based retention policy is enabled, all jobs archived will be retained. "),
text(
"The 'quantity' sub-option represents the quantity of the jobs archived in each archive directory "
+ "based on the descending job archived modification time. "
+ "The default value is `-1`, "
+ "which means that when the quantity-based retention policy is enabled, all jobs archived will be retained. "))
.build());

public enum JobArchivedRetainedMode {
/** Keep all jobs archive files. */
None,
/** Keep the jobs archive files whose modified time in the time to live duration. */
Ttl,
/**
* Keep the jobs archive files whose ordered index based on modified time is smaller or
* equals to the quantity threshold.
*/
Quantity,
/**
* Keep the jobs archive files whose conditions are meet with {@link
* JobArchivedRetainedMode#Ttl} and {@link JobArchivedRetainedMode#Quantity}.
*/
TtlAndQuantity,
/**
* Keep the jobs archive files whose conditions are meet with {@link
* JobArchivedRetainedMode#Ttl} or {@link JobArchivedRetainedMode#Quantity}.
*/
TtlOrQuantity
}

private HistoryServerOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
Expand All @@ -38,6 +37,7 @@
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Runnables;
import org.apache.flink.runtime.webmonitor.history.retaining.CompositeBasedJobRetainedStrategy;
import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -238,19 +238,13 @@ public HistoryServer(

refreshIntervalMillis =
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
if (maxHistorySize == 0 || maxHistorySize < -1) {
throw new IllegalConfigurationException(
"Cannot set %s to 0 or less than -1",
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
}
archiveFetcher =
new HistoryServerArchiveFetcher(
refreshDirs,
webDir,
jobArchiveEventListener,
cleanupExpiredArchives,
maxHistorySize);
CompositeBasedJobRetainedStrategy.createFrom(config));

this.shutdownHook =
ShutdownHookUtil.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.webmonitor.history.retaining.JobArchivesRetainedStrategy;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.jackson.JacksonMapperFactory;

Expand Down Expand Up @@ -112,8 +113,7 @@ public ArchiveEventType getType() {
private final List<HistoryServer.RefreshLocation> refreshDirs;
private final Consumer<ArchiveEvent> jobArchiveEventListener;
private final boolean processExpiredArchiveDeletion;
private final boolean processBeyondLimitArchiveDeletion;
private final int maxHistorySize;
private final JobArchivesRetainedStrategy jobRetainedStrategy;

/** Cache of all available jobs identified by their id. */
private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
Expand All @@ -127,13 +127,12 @@ public ArchiveEventType getType() {
File webDir,
Consumer<ArchiveEvent> jobArchiveEventListener,
boolean cleanupExpiredArchives,
int maxHistorySize)
JobArchivesRetainedStrategy jobRetainedStrategy)
throws IOException {
this.refreshDirs = checkNotNull(refreshDirs);
this.jobArchiveEventListener = jobArchiveEventListener;
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
this.maxHistorySize = maxHistorySize;
this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>());
Expand Down Expand Up @@ -176,7 +175,7 @@ void fetchArchives() {
continue;
}

int historySize = 0;
int fileOrderedIndexOnModifiedTime = 0;
for (FileStatus jobArchive : jobArchives) {
Path jobArchivePath = jobArchive.getPath();
String jobID = jobArchivePath.getName();
Expand All @@ -186,8 +185,9 @@ void fetchArchives() {

jobsToRemove.get(refreshDir).remove(jobID);

historySize++;
if (historySize > maxHistorySize && processBeyondLimitArchiveDeletion) {
fileOrderedIndexOnModifiedTime++;
if (!jobRetainedStrategy.shouldRetain(
jobArchive, fileOrderedIndexOnModifiedTime)) {
archivesBeyondSizeLimit
.computeIfAbsent(refreshDir, ignored -> new HashSet<>())
.add(jobArchivePath);
Expand Down Expand Up @@ -220,7 +220,7 @@ void fetchArchives() {
&& processExpiredArchiveDeletion) {
events.addAll(cleanupExpiredJobs(jobsToRemove));
}
if (!archivesBeyondSizeLimit.isEmpty() && processBeyondLimitArchiveDeletion) {
if (!archivesBeyondSizeLimit.isEmpty()) {
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
}
if (!events.isEmpty()) {
Expand Down
Loading