Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class BaseReplacePartitions extends MergingSnapshotProducer<ReplacePartit
private boolean validateConflictingData = false;
private boolean validateConflictingDeletes = false;

BaseReplacePartitions(String tableName, TableOperations ops) {
protected BaseReplacePartitions(String tableName, TableOperations ops) {
super(tableName, ops);
set(SnapshotSummary.REPLACE_PARTITIONS_PROP, "true");
replacedPartitions = PartitionSet.create(ops.current().specsById());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.CommitSummary;
Expand Down Expand Up @@ -69,8 +71,6 @@
*/
@Internal
class DynamicCommitter implements Committer<DynamicCommittable> {

private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
private static final Logger LOG = LoggerFactory.getLogger(DynamicCommitter.class);
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
private static final WriteResult EMPTY_WRITE_RESULT =
Expand All @@ -79,13 +79,9 @@ class DynamicCommitter implements Committer<DynamicCommittable> {
.addDeleteFiles(Lists.newArrayList())
.build();

private static final long INITIAL_CHECKPOINT_ID = -1L;

@VisibleForTesting
static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";

private static final String FLINK_JOB_ID = "flink.job-id";
private static final String OPERATOR_ID = "flink.operator-id";
private final Map<String, String> snapshotProperties;
private final boolean replacePartitions;
private final DynamicCommitterMetrics committerMetrics;
Expand Down Expand Up @@ -138,7 +134,7 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable();
long maxCommittedCheckpointId =
getMaxCommittedCheckpointId(
MaxCommittedCheckpointIdValidator.getMaxCommittedCheckpointId(
table, last.jobId(), last.operatorId(), entry.getKey().branch());
// Mark the already committed FilesCommittable(s) as finished
entry
Expand All @@ -155,31 +151,6 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
}
}

private static long getMaxCommittedCheckpointId(
Table table, String flinkJobId, String operatorId, String branch) {
Snapshot snapshot = table.snapshot(branch);
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
&& (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
if (value != null) {
lastCommittedCheckpointId = Long.parseLong(value);
break;
}
}

Long parentSnapshotId = snapshot.parentId();
snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
}

return lastCommittedCheckpointId;
}

/**
* Commits the data to the Iceberg table by reading the file data from the {@link DeltaManifests}
* ordered by the checkpointId, and writing the new snapshot to the Iceberg table. The {@link
Expand Down Expand Up @@ -274,9 +245,17 @@ private void replacePartitions(
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
long checkpointId = pendingResults.lastKey();

// Iceberg tables are unsorted. So the order of the append data does not matter.
// Hence, we commit everything in one snapshot.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
ReplacePartitions dynamicOverwrite =
new FlinkReplacePartitions(
fullTableName(table),
tableOperations(table),
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId))
.validateFromSnapshot(table.snapshot(branch))
.scanManifestsWith(workerPool);

for (List<WriteResult> writeResults : pendingResults.values()) {
for (WriteResult result : writeResults) {
Expand Down Expand Up @@ -306,7 +285,14 @@ private void commitDeltaTxn(
long checkpointId = e.getKey();
List<WriteResult> writeResults = e.getValue();

RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
RowDelta rowDelta =
new FlinkRowDelta(
fullTableName(table),
tableOperations(table),
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId))
.validateFromSnapshot(table.snapshot(branch))
.scanManifestsWith(workerPool);

for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that write equality deletes.
// Equality deletes are applied to data in all previous sequence numbers, so retries may
Expand Down Expand Up @@ -350,9 +336,8 @@ void commitOperation(
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict with internal ones
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
MaxCommittedCheckpointIdValidator.setFlinkProperties(
operation, checkpointId, newFlinkJobId, operatorId);
operation.toBranch(branch);

long startNano = System.nanoTime();
Expand All @@ -370,6 +355,19 @@ void commitOperation(
}
}

private String fullTableName(Table table) {
return CatalogUtil.fullTableName(catalog.name(), TableIdentifier.parse(table.name()));
}

private static TableOperations tableOperations(Table table) {
if (table instanceof HasTableOperations) {
return ((HasTableOperations) table).operations();
}

throw new IllegalArgumentException(
"Catalog tables must implement: " + HasTableOperations.class.getSimpleName());
}

@Override
public void close() throws IOException {
workerPool.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.iceberg.BaseReplacePartitions;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;

class FlinkReplacePartitions extends BaseReplacePartitions
implements FlinkSnapshotValidator<FlinkReplacePartitions> {
private final Consumer<Snapshot> snapshotValidator;

private Long startingSnapshotId = null; // check all versions by default

FlinkReplacePartitions(
String tableName, TableOperations ops, Consumer<Snapshot> snapshotValidator) {
super(tableName, ops);
this.snapshotValidator = snapshotValidator;
}

@Override
public void validateSnapshot(Snapshot snapshot) {
snapshotValidator.accept(snapshot);
}

@Nullable
@Override
public Long startingSnapshotId() {
return startingSnapshotId;
}

@Override
public void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);
validateSnapshots(base, parent);
}

FlinkReplacePartitions validateFromSnapshot(@Nullable Snapshot snapshot) {
if (snapshot != null) {
super.validateFromSnapshot(snapshot.snapshotId());
startingSnapshotId = snapshot.snapshotId();
}

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.iceberg.BaseRowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;

class FlinkRowDelta extends BaseRowDelta implements FlinkSnapshotValidator<FlinkRowDelta> {
private final Consumer<Snapshot> snapshotValidator;

private Long startingSnapshotId = null; // check all versions by default

FlinkRowDelta(String tableName, TableOperations ops, Consumer<Snapshot> snapshotValidator) {
super(tableName, ops);
this.snapshotValidator = snapshotValidator;
}

@Override
public void validateSnapshot(Snapshot snapshot) {
snapshotValidator.accept(snapshot);
}

@Nullable
@Override
public Long startingSnapshotId() {
return startingSnapshotId;
}

@Override
protected void validate(TableMetadata base, Snapshot parent) {
super.validate(base, parent);
validateSnapshots(base, parent);
}

FlinkRowDelta validateFromSnapshot(@Nullable Snapshot snapshot) {
if (snapshot != null) {
super.validateFromSnapshot(snapshot.snapshotId());
startingSnapshotId = snapshot.snapshotId();
}

return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import javax.annotation.Nullable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.util.SnapshotUtil;

interface FlinkSnapshotValidator<T> {
void validateSnapshot(Snapshot snapshot);

@Nullable
Long startingSnapshotId();

default void validateSnapshots(TableMetadata base, @Nullable Snapshot parent) {
if (parent == null) {
return;
}

SnapshotUtil.ancestorsBetween(parent.snapshotId(), startingSnapshotId(), base::snapshot)
.forEach(this::validateSnapshot);
}
}
Loading
Loading