Skip to content

Commit

Permalink
Flink: Backport apache#8553 to v1.15, v1.16
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 24, 2023
1 parent 21b3eea commit 27333e3
Show file tree
Hide file tree
Showing 32 changed files with 2,460 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
Expand Down Expand Up @@ -58,15 +59,20 @@
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
Expand All @@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table) {
Table table,
SerializableRecordEmitter<T> emitter) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
}

String name() {
Expand Down Expand Up @@ -152,7 +161,8 @@ public Boundedness getBoundedness() {
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, readerContext);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}

@Override
Expand Down Expand Up @@ -216,6 +226,8 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -237,6 +249,9 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -429,6 +444,33 @@ public Builder<T> setAll(Map<String, String> properties) {
return this;
}

/**
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
*/
public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
return this;
}

/**
* When the type of the {@link #watermarkColumn} is {@link
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
return this;
}

/** @deprecated Use {@link #setAll} instead. */
@Deprecated
public Builder<T> properties(Map<String, String> properties) {
Expand All @@ -453,6 +495,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));

SplitWatermarkExtractor watermarkExtractor =
new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -485,8 +539,14 @@ public IcebergSource<T> build() {

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<T>(
tableLoader, context, readerFunction, splitAssignerFactory, splitComparator, table);
return new IcebergSource<>(
tableLoader,
context,
readerFunction,
splitAssignerFactory,
splitComparator,
table,
emitter);
}

private void checkRequired() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.io.Serializable;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;

/**
* {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics
* to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link
* WatermarkExtractorRecordEmitter} along with the actual records.
*/
@Internal
public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable {
private final int eventTimeFieldId;
private final String eventTimeFieldName;
private final TimeUnit timeUnit;

/**
* Creates the extractor.
*
* @param schema The schema of the Table
* @param eventTimeFieldName The column which should be used as an event time
* @param timeUnit Used for converting the long value to epoch milliseconds
*/
public ColumnStatsWatermarkExtractor(
Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
Types.NestedField field = schema.findField(eventTimeFieldName);
TypeID typeID = field.type().typeId();
Preconditions.checkArgument(
typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
"Found %s, expected a LONG or TIMESTAMP column for watermark generation.",
typeID);
this.eventTimeFieldId = field.fieldId();
this.eventTimeFieldName = eventTimeFieldName;
// Use the timeUnit only for Long columns.
this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS;
}

@VisibleForTesting
ColumnStatsWatermarkExtractor(int eventTimeFieldId, String eventTimeFieldName) {
this.eventTimeFieldId = eventTimeFieldId;
this.eventTimeFieldName = eventTimeFieldName;
this.timeUnit = TimeUnit.MICROSECONDS;
}

/**
* Get the watermark for a split using column statistics.
*
* @param split The split
* @return The watermark
* @throws IllegalArgumentException if there is no statistics for the column
*/
@Override
public long extractWatermark(IcebergSourceSplit split) {
return split.task().files().stream()
.map(
scanTask -> {
Preconditions.checkArgument(
scanTask.file().lowerBounds() != null
&& scanTask.file().lowerBounds().get(eventTimeFieldId) != null,
"Missing statistics for column name = %s in file = %s",
eventTimeFieldName,
eventTimeFieldId,
scanTask.file());
return timeUnit.toMillis(
Conversions.fromByteBuffer(
Types.LongType.get(), scanTask.file().lowerBounds().get(eventTimeFieldId)));
})
.min(Comparator.comparingLong(l -> l))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

public IcebergSourceReader(
SerializableRecordEmitter<T> emitter,
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(metrics, readerFunction, splitComparator, context),
new IcebergSourceRecordEmitter<>(),
emitter,
context.getConfiguration(),
context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.io.Serializable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

@Internal
@FunctionalInterface
public interface SerializableRecordEmitter<T>
extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>, Serializable {
static <T> SerializableRecordEmitter<T> defaultEmitter() {
return (element, output, split) -> {
output.collect(element.record());
split.updatePosition(element.fileOffset(), element.recordOffset());
};
}

static <T> SerializableRecordEmitter<T> emitterWithWatermark(SplitWatermarkExtractor extractor) {
return new WatermarkExtractorRecordEmitter<>(extractor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@
*/
package org.apache.iceberg.flink.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import java.io.Serializable;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

final class IcebergSourceRecordEmitter<T>
implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {

IcebergSourceRecordEmitter() {}

@Override
public void emitRecord(
RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {
output.collect(element.record());
split.updatePosition(element.fileOffset(), element.recordOffset());
}
/** The interface used to extract watermarks from splits. */
public interface SplitWatermarkExtractor extends Serializable {
/** Get the watermark for a split. */
long extractWatermark(IcebergSourceSplit split);
}
Loading

0 comments on commit 27333e3

Please sign in to comment.