Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Maintenance - TableManager + ExpireSnapshots #11144

Merged
merged 12 commits into from
Nov 4, 2024
Merged
Prev Previous commit
Next Next commit
Steven's new comments
  • Loading branch information
Peter Vary committed Sep 26, 2024
commit e3dd6f45c3e6d43d4ea3df73f664edbb4c723bed
Original file line number Diff line number Diff line change
@@ -112,7 +112,7 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
.transform(
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize))
new DeleteFilesProcessor(name(), tableLoader().loadTable(), deleteBatchSize))
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
Original file line number Diff line number Diff line change
@@ -218,8 +218,6 @@ DataStream<TaskResult> append(
slotSharingGroup = mainSlotSharingGroup;
}

tableLoader.open();

return append(sourceStream);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
@@ -57,9 +58,7 @@ public class TableMaintenance {
static final String FILTER_OPERATOR_NAME_PREFIX = "Filter ";
static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover";

private TableMaintenance() {
// Do not instantiate directly
}
private TableMaintenance() {}

/**
* Use when the change stream is already provided, like in the {@link
@@ -214,7 +213,7 @@ public Builder add(MaintenanceTaskBuilder<?> task) {
}

/** Builds the task graph for the maintenance tasks. */
public void append() {
public void append() throws IOException {
Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task");
Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null");

@@ -225,73 +224,67 @@ public void append() {
evaluators.add(taskBuilders.get(i).evaluator());
}

DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true)
.process(
new TriggerManager(
tableLoader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel()
.assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();

// Add the specific tasks
DataStream<TaskResult> unioned = null;
for (int i = 0; i < taskBuilders.size(); ++i) {
int finalIndex = i;
DataStream<Trigger> filtered =
triggers
.filter(t -> t.taskId() != null && t.taskId() == finalIndex)
.name(FILTER_OPERATOR_NAME_PREFIX + i)
try (TableLoader loader = tableLoader.clone()) {
DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true)
.process(
new TriggerManager(
loader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel()
.uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
DataStream<TaskResult> result =
builder.append(
filtered,
i,
taskNames.get(i),
tableLoader,
uidSuffix,
slotSharingGroup,
parallelism);
if (unioned == null) {
unioned = result;
} else {
unioned = unioned.union(result);
.assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();

// Add the specific tasks
DataStream<TaskResult> unioned = null;
for (int i = 0; i < taskBuilders.size(); ++i) {
int finalIndex = i;
DataStream<Trigger> filtered =
triggers
.filter(t -> t.taskId() != null && t.taskId() == finalIndex)
.name(FILTER_OPERATOR_NAME_PREFIX + i)
.forceNonParallel()
.uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
DataStream<TaskResult> result =
builder.append(
filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism);
if (unioned == null) {
unioned = result;
} else {
unioned = unioned.union(result);
}
}
}

// Add the LockRemover to the end
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
// Add the LockRemover to the end
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
}
}

private DataStream<TableChange> changeStream() {
private DataStream<TableChange> changeStream(TableLoader loader) {
if (inputStream == null) {
// Create a monitor source to provide the TableChange stream
MonitorSource source =
new MonitorSource(
tableLoader,
RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()),
maxReadBack);
loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack);
return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME)
.uid(SOURCE_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
@@ -50,12 +49,10 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
private transient Counter failedCounter;
private transient Counter succeededCounter;

public DeleteFilesProcessor(String name, TableLoader tableLoader, int batchSize) {
public DeleteFilesProcessor(String name, Table table, int batchSize) {
Preconditions.checkNotNull(name, "Name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(table, "Table should no be null");

tableLoader.open();
Table table = tableLoader.loadTable();
FileIO fileIO = table.io();
Preconditions.checkArgument(
fileIO instanceof SupportsBulkOperations,
Original file line number Diff line number Diff line change
@@ -121,6 +121,7 @@ public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> o
public void close() throws Exception {
super.close();

tableLoader.close();
if (plannerPoolSize != null) {
plannerPool.shutdown();
}
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
import java.util.Set;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -47,6 +46,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
void before() {
MetricsReporterFactoryForTests.reset();
this.table = createTable();
tableLoader().open();
}

@Test
@@ -95,8 +95,6 @@ void testFailure() throws Exception {
insert(table, 1, "a");
insert(table, 2, "b");

SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);

ExpireSnapshots.builder()
.append(
infra.triggerStream(),
@@ -130,7 +128,8 @@ void testFailure() throws Exception {
closeJobClient(jobClient);
}

// Check the metrics
// Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has
// no max age of number of snapshots set, so no files are removed.
MetricsReporterFactoryForTests.assertCounters(
new ImmutableMap.Builder<String, Long>()
.put(
Original file line number Diff line number Diff line change
@@ -224,7 +224,7 @@ void testMetrics() throws Exception {
}

@Test
void testUidAndSlotSharingGroup() {
void testUidAndSlotSharingGroup() throws IOException {
TableMaintenance.forChangeStream(
new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
tableLoader(),
@@ -243,7 +243,7 @@ void testUidAndSlotSharingGroup() {
}

@Test
void testUidAndSlotSharingGroupUnset() {
void testUidAndSlotSharingGroupUnset() throws IOException {
TableMaintenance.forChangeStream(
new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
tableLoader(),
@@ -256,7 +256,7 @@ void testUidAndSlotSharingGroupUnset() {
}

@Test
void testUidAndSlotSharingGroupInherit() {
void testUidAndSlotSharingGroupInherit() throws IOException {
TableMaintenance.forChangeStream(
new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
tableLoader(),
@@ -271,7 +271,7 @@ void testUidAndSlotSharingGroupInherit() {
}

@Test
void testUidAndSlotSharingGroupOverWrite() {
void testUidAndSlotSharingGroupOverWrite() throws IOException {
String anotherUid = "Another-UID";
String anotherSlotSharingGroup = "Another-SlotSharingGroup";
TableMaintenance.forChangeStream(
@@ -312,7 +312,7 @@ void testUidAndSlotSharingGroupOverWrite() {
}

@Test
void testUidAndSlotSharingGroupForMonitorSource() {
void testUidAndSlotSharingGroupForMonitorSource() throws IOException {
TableMaintenance.forTable(env, tableLoader(), LOCK_FACTORY)
.uidSuffix(UID_SUFFIX)
.slotSharingGroup(SLOT_SHARING_GROUP)
Original file line number Diff line number Diff line change
@@ -87,9 +87,11 @@ void testInvalidURIScheme() throws Exception {
}

private void deleteFile(TableLoader tableLoader, String fileName) throws Exception {
tableLoader().open();
try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
new OneInputStreamOperatorTestHarness<>(
new DeleteFilesProcessor(DUMMY_NAME, tableLoader, 10), StringSerializer.INSTANCE)) {
new DeleteFilesProcessor(DUMMY_NAME, tableLoader.loadTable(), 10),
StringSerializer.INSTANCE)) {
testHarness.open();
testHarness.processElement(fileName, System.currentTimeMillis());
testHarness.processWatermark(EVENT_TIME);