Skip to content

Commit

Permalink
Flink: Backport PR apache#10179 to Flink 1.20 for v2 sink (apache#11011
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rodmeneses authored and zachdisc committed Dec 12, 2024
1 parent 6b490b8 commit 4717c86
Show file tree
Hide file tree
Showing 23 changed files with 4,129 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlinkManifestUtil {

private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class);
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;

Expand Down Expand Up @@ -129,4 +134,26 @@ static WriteResult readCompletedFiles(

return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
}

static void deleteCommittedManifests(
Table table, List<ManifestFile> manifests, String newFlinkJobId, long checkpointId) {
for (ManifestFile manifest : manifests) {
try {
table.io().deleteFile(manifest.path());
} catch (Exception e) {
// The flink manifests cleaning failure shouldn't abort the completed checkpoint.
String details =
MoreObjects.toStringHelper(FlinkManifestUtil.class)
.add("tableName", table.name())
.add("flinkJobId", newFlinkJobId)
.add("checkpointId", checkpointId)
.add("manifestPath", manifest.path())
.toString();
LOG.warn(
"The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}",
details,
e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);

// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds();
List<Integer> equalityFieldIds =
SinkUtil.checkAndGetEqualityFieldIds(table, equalityFieldColumns);

RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
int writerParallelism =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* The aggregated results of a single checkpoint which should be committed. Containing the
* serialized {@link org.apache.iceberg.flink.sink.DeltaManifests} file - which contains the commit
* data, and the jobId, operatorId, checkpointId triplet which helps identifying the specific commit
*
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer
* and the Aggregator operator and between the Aggregator and the Committer as well.
*/
class IcebergCommittable implements Serializable {
private final byte[] manifest;
private final String jobId;
private final String operatorId;
private final long checkpointId;

IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) {
this.manifest = manifest;
this.jobId = jobId;
this.operatorId = operatorId;
this.checkpointId = checkpointId;
}

byte[] manifest() {
return manifest;
}

String jobId() {
return jobId;
}

String operatorId() {
return operatorId;
}

Long checkpointId() {
return checkpointId;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobId", jobId)
.add("checkpointId", checkpointId)
.add("operatorId", operatorId)
.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

IcebergCommittable that = (IcebergCommittable) o;
return checkpointId == that.checkpointId
&& Arrays.equals(manifest, that.manifest)
&& Objects.equals(jobId, that.jobId)
&& Objects.equals(operatorId, that.operatorId);
}

@Override
public int hashCode() {
int result = Objects.hash(jobId, operatorId, checkpointId);
result = 31 * result + Arrays.hashCode(manifest);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;

/**
* This serializer is used for serializing the {@link IcebergCommittable} objects between the Writer
* and the Aggregator operator and between the Aggregator and the Committer as well.
*
* <p>In both cases only the respective part is serialized.
*/
class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
private static final int VERSION = 1;

@Override
public int getVersion() {
return VERSION;
}

@Override
public byte[] serialize(IcebergCommittable committable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
view.writeUTF(committable.jobId());
view.writeUTF(committable.operatorId());
view.writeLong(committable.checkpointId());
view.writeInt(committable.manifest().length);
view.write(committable.manifest());
return out.toByteArray();
}

@Override
public IcebergCommittable deserialize(int version, byte[] serialized) throws IOException {
if (version == 1) {
DataInputDeserializer view = new DataInputDeserializer(serialized);
String jobId = view.readUTF();
String operatorId = view.readUTF();
long checkpointId = view.readLong();
int manifestLen = view.readInt();
byte[] manifestBuf;
manifestBuf = new byte[manifestLen];
view.read(manifestBuf);
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
Loading

0 comments on commit 4717c86

Please sign in to comment.