Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Emit watermarks from the IcebergSource #8553

Merged
merged 15 commits into from
Nov 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class GenericAppenderHelper {

private static final String ORC_CONFIG_PREFIX = "^orc.*";
private static final String PARQUET_CONFIG_PATTERN = ".*parquet.*";

private final Table table;
private final FileFormat fileFormat;
Expand Down Expand Up @@ -120,6 +121,10 @@ private static DataFile appendToLocalFile(
appenderFactory.setAll(conf.getValByRegex(ORC_CONFIG_PREFIX));
}

if (FileFormat.PARQUET.equals(format) && conf != null) {
pvary marked this conversation as resolved.
Show resolved Hide resolved
appenderFactory.setAll(conf.getValByRegex(PARQUET_CONFIG_PATTERN));
}

FileAppender<Record> appender = appenderFactory.newAppender(Files.localOutput(file), format);
try (FileAppender<Record> fileAppender = appender) {
fileAppender.addAll(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
Expand Down Expand Up @@ -58,15 +59,20 @@
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -80,6 +86,7 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
Expand All @@ -91,13 +98,15 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
ReaderFunction<T> readerFunction,
SplitAssignerFactory assignerFactory,
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table) {
Table table,
SerializableRecordEmitter<T> emitter) {
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
}

String name() {
Expand Down Expand Up @@ -152,7 +161,8 @@ public Boundedness getBoundedness() {
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
return new IcebergSourceReader<>(metrics, readerFunction, splitComparator, readerContext);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}

@Override
Expand Down Expand Up @@ -216,6 +226,8 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -237,6 +249,9 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
pvary marked this conversation as resolved.
Show resolved Hide resolved
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -429,6 +444,33 @@ public Builder<T> setAll(Map<String, String> properties) {
return this;
}

/**
* Emits watermarks once per split based on the min value of column statistics from files
* metadata in the given split. The generated watermarks are also used for ordering the splits
* for read. Accepted column types are timestamp/timestamptz/long. For long columns consider
* setting {@link #watermarkTimeUnit(TimeUnit)}.
*
* <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
* split when the watermark is used for watermark alignment.
*/
public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
return this;
}

/**
* When the type of the {@link #watermarkColumn} is {@link
* org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
this.watermarkTimeUnit = timeUnit;
return this;
}

/** @deprecated Use {@link #setAll} instead. */
@Deprecated
public Builder<T> properties(Map<String, String> properties) {
Expand All @@ -453,6 +495,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
if (watermarkColumn != null) {
Copy link
Contributor

@stevenzwu stevenzwu Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read.split.open-file-cost doesn't affect splitting big files. it just avoid bundling multiple smaller files in one split.

surface an earlier comment again on avoiding multiple small files in one split. it can increase out of orderliness, as multiple files with different time ranges are merged into one split.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The focus of the feature is correct watermark generation, and we need to make sure that the watermarks are emitted in order, but this does not mean automatically that the records need to be emitted in order too. These are two different aspects of a data stream.

In case of combined splits, we do not advance the watermark, so it doesn't cause issues wrt watermark generation. The user can decide if the record out of orderness is a problem them. If they decide so, they can set the configuration, but if they have enough memory, to keep the state, they can decide that reading speed (combining files to splits) is more important than reading files in order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those are reasonable points. I would be interested in other people's take.

To me, it is more important to limit the out of orderliness by default. that is the whole point of watermark alignment. I would be ok to sacrifice some read throughput with smaller files. Also, with stateful applications, source read throughput is rarely the bottleneck. typical bottleneck is the stateful operator.

Copy link
Contributor

@dchristle dchristle Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least when we use event-time alignment, the only bound we expect to be respected is the main "max out-of-orderness" constraint; the ordering of rows before the watermark is advanced isn't something we rely on. Our custom operators that use event-time won't emit results till the watermark is advanced beyond our timers anyway. So, even if the rows were received in perfect order, they'd still be buffered into state, which suggests to me there is little speed or memory benefit. Am I missing anything here?

We do use some custom process-time operators, where, in theory, less out-of-orderness would give more accurate results. But we discard the results emitted during the Iceberg backfill phase anyway, since our data are partitioned by day & the out-of-orderness allowed by ~25 hours constraint (we set it slightly above 24 hours as a precaution to avoid the aligner getting stuck w/ daily partitioned files) is too high for accurate-enough results.

I'm a bit confused on how read.split.open-file-cost relates to the code line this discussion is tagged at, and if maybe I'm not fully understanding.

  1. Does the current code try to respect read.split.open-file-cost when selecting files to include in a split?

  2. The only other case I can think of where ordering of row reads is key is in minimizing "straggler" files that would hold up reads, i.e. if all files within a perfectly sorted daily partition have been read except for the one with the earliest timestamp, a max out-of-orderness of ~24 hours would mean most SplitReaders are idle, since they cannot read more than 24 hours ahead of the min timestamp. But AFAIK, the current PR's enumerator sorts the files by min timestamp & assigns them in order for this exact reason, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchristle: The code line is mostly irrelevant for the conversation 😄

Iceberg planner collects the files which should be scanned by the execution engine, and creates ScanTasks from them. If a file is big, then it creates multiple splits for a given file, so multiple readers could read the given file parallel. OTOH if there are multiple small files, then it combines them to a CombinedScanTask which are read by a single reader - this way decreasing the number of splits, split assignments etc.

We generate a single watermark for every ScanTask. One CombineScanTask could group multiple data files, with wide range of timestamps, so generating a single watermark for it could be suboptimal. Setting read.split.open-file-cost could prevent the creation such CombinedScanTasks, and could result in better ordered input, and finer grained watermarks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu @pvary , given that the combining logic was already present before this PR and is independent, I suggest we keep the current default behaviour. (Like it's currently done in the PR)

@stevenzwu if you feel that the default combining logic should change for the Flink source, please open a separate discussion, but I personally feel that the current default is reasonable. This way Flink aligns with the default iceberg planner behaviour, but this should not be discussed further here I think.

Copy link
Contributor

@stevenzwu stevenzwu Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with tabling this as a follow-up.

  • The default combining logic is fine for batch processing where ordering doesn't matter. for streaming read and watermark alignment, ordering is important as it affects data buffering in the flink state. That is precisely what watermark alignment intended to solve.
  • Flink watermark alignment has been designed mostly with unbounded splits (like Kafka) in mind. also within an unbounded Kafka split, records are FIFO. there is a natural order within a split.

@pvary not sure every users understand the internal details. At least, we can at least document this option of disabling combining for better ordering in the doc then. Then users can make an informed choice. @dchristle can probably also chime in here and help review the doc change in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watermark alignment is one of the use-cases of watermarks. Watermarks could be used for handling late data, and windowing too. So for now, added this the the javadoc of the watermarkColumn method:

     * <p>Consider setting `read.split.open-file-cost` to prevent combining small files to a single
     * split when the watermark is used for watermark alignment.

I hope this will help users.

// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));

SplitWatermarkExtractor watermarkExtractor =
new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -485,8 +539,14 @@ public IcebergSource<T> build() {

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<T>(
tableLoader, context, readerFunction, splitAssignerFactory, splitComparator, table);
return new IcebergSource<>(
tableLoader,
context,
readerFunction,
splitAssignerFactory,
splitComparator,
table,
emitter);
}

private void checkRequired() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.io.Serializable;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;

/**
* {@link SplitWatermarkExtractor} implementation which uses an Iceberg timestamp column statistics
* to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by the {@link
* WatermarkExtractorRecordEmitter} along with the actual records.
*/
@Internal
public class ColumnStatsWatermarkExtractor implements SplitWatermarkExtractor, Serializable {
private final int eventTimeFieldId;
private final String eventTimeFieldName;
private final TimeUnit timeUnit;

/**
* Creates the extractor.
*
* @param schema The schema of the Table
* @param eventTimeFieldName The column which should be used as an event time
* @param timeUnit Used for converting the long value to epoch milliseconds
*/
public ColumnStatsWatermarkExtractor(
Schema schema, String eventTimeFieldName, TimeUnit timeUnit) {
Types.NestedField field = schema.findField(eventTimeFieldName);
TypeID typeID = field.type().typeId();
Preconditions.checkArgument(
typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
"Found %s, expected a LONG or TIMESTAMP column for watermark generation.",
typeID);
this.eventTimeFieldId = field.fieldId();
this.eventTimeFieldName = eventTimeFieldName;
// Use the timeUnit only for Long columns.
this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS;
pvary marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary let's create an issue to follow up on the time unit when PR #9008 is merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer!
Created #9137 to tackle this

}

@VisibleForTesting
ColumnStatsWatermarkExtractor(int eventTimeFieldId, String eventTimeFieldName) {
this.eventTimeFieldId = eventTimeFieldId;
this.eventTimeFieldName = eventTimeFieldName;
this.timeUnit = TimeUnit.MICROSECONDS;
}

/**
* Get the watermark for a split using column statistics.
*
* @param split The split
* @return The watermark
* @throws IllegalArgumentException if there is no statistics for the column
*/
@Override
public long extractWatermark(IcebergSourceSplit split) {
return split.task().files().stream()
.map(
scanTask -> {
Preconditions.checkArgument(
scanTask.file().lowerBounds() != null
&& scanTask.file().lowerBounds().get(eventTimeFieldId) != null,
"Missing statistics for column name = %s in file = %s",
eventTimeFieldName,
eventTimeFieldId,
scanTask.file());
return timeUnit.toMillis(
Conversions.fromByteBuffer(
Types.LongType.get(), scanTask.file().lowerBounds().get(eventTimeFieldId)));
})
.min(Comparator.comparingLong(l -> l))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class IcebergSourceReader<T>
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

public IcebergSourceReader(
SerializableRecordEmitter<T> emitter,
IcebergSourceReaderMetrics metrics,
ReaderFunction<T> readerFunction,
SerializableComparator<IcebergSourceSplit> splitComparator,
SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(metrics, readerFunction, splitComparator, context),
new IcebergSourceRecordEmitter<>(),
emitter,
context.getConfiguration(),
context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import java.io.Serializable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

@Internal
@FunctionalInterface
public interface SerializableRecordEmitter<T>
pvary marked this conversation as resolved.
Show resolved Hide resolved
pvary marked this conversation as resolved.
Show resolved Hide resolved
extends RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit>, Serializable {
static <T> SerializableRecordEmitter<T> defaultEmitter() {
return (element, output, split) -> {
output.collect(element.record());
split.updatePosition(element.fileOffset(), element.recordOffset());
};
}

static <T> SerializableRecordEmitter<T> emitterWithWatermark(SplitWatermarkExtractor extractor) {
return new WatermarkExtractorRecordEmitter<>(extractor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@
*/
package org.apache.iceberg.flink.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import java.io.Serializable;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

final class IcebergSourceRecordEmitter<T>
implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {

IcebergSourceRecordEmitter() {}

@Override
public void emitRecord(
RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {
output.collect(element.record());
split.updatePosition(element.fileOffset(), element.recordOffset());
}
/** The interface used to extract watermarks from splits. */
public interface SplitWatermarkExtractor extends Serializable {
/** Get the watermark for a split. */
long extractWatermark(IcebergSourceSplit split);
}
Loading