Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36869][table] Introduce TemporalRowTimeJoinOperator in TemporalJoin with Async State API #25778

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGenUtils;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
Expand All @@ -50,6 +51,8 @@
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.asyncprocessing.AsyncStateTemporalProcessTimeJoinOperator;
import org.apache.flink.table.runtime.operators.join.temporal.asyncprocessing.AsyncStateTemporalRowTimeJoinOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -265,23 +268,44 @@ private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(
long minRetentionTime = config.getStateRetentionTime();
long maxRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(config);
if (rightTimeAttributeIndex >= 0) {
return new TemporalRowTimeJoinOperator(
InternalTypeInfo.of(leftInputType),
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
leftTimeAttributeIndex,
rightTimeAttributeIndex,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
} else {
if (isTemporalFunctionJoin) {
return new TemporalProcessTimeJoinOperator(
if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) {
return new AsyncStateTemporalRowTimeJoinOperator(
InternalTypeInfo.of(leftInputType),
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
leftTimeAttributeIndex,
rightTimeAttributeIndex,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
} else {
return new TemporalRowTimeJoinOperator(
InternalTypeInfo.of(leftInputType),
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
leftTimeAttributeIndex,
rightTimeAttributeIndex,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
}
} else {
if (isTemporalFunctionJoin) {
if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) {
return new AsyncStateTemporalProcessTimeJoinOperator(
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
} else {
return new TemporalProcessTimeJoinOperator(
InternalTypeInfo.of(rightInputType),
generatedJoinCondition,
minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
}
} else {
// The exsiting TemporalProcessTimeJoinOperator has already supported temporal table
// join.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.factories.TestValuesTableFactory.{getResultsAsStrings, registerData}
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.utils.LegacyRowExtension
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters}
import org.apache.flink.types.Row

import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
Expand All @@ -34,11 +34,22 @@ import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension}

import java.time.LocalDateTime
import java.time.format.DateTimeParseException
import java.util

import scala.collection.JavaConversions._

@ExtendWith(Array(classOf[ParameterizedTestExtension]))
class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(state) {
class TemporalJoinITCase(state: StateBackendMode, enableAsyncState: Boolean)
extends StreamingWithStateTestBase(state) {

@BeforeEach
override def before(): Unit = {
super.before()

tEnv.getConfig.set(
ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED,
Boolean.box(enableAsyncState))
}

@RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] =
new EachCallbackWrapper[LegacyRowExtension](new LegacyRowExtension)
Expand Down Expand Up @@ -795,3 +806,14 @@ class TemporalJoinITCase(state: StateBackendMode) extends StreamingWithStateTest
TestValuesTableFactory.changelogRow(kind, objects.toArray: _*)
}
}

object TemporalJoinITCase {

@Parameters(name = "StateBackend={0}, EnableAsyncState={1}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
Array(HEAP_BACKEND, Boolean.box(false)),
Array(HEAP_BACKEND, Boolean.box(true)),
Array(ROCKSDB_BACKEND, Boolean.box(false)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static JoinRecordAsyncStateView create(
inputSideSpec.getUniqueKeySelector(),
ttlConfig);
}

} else {
return new InputSideHasNoUniqueKey(ctx, stateName, recordType, ttlConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.Optional;

/**
* An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state
* based on a TTL. This TTL should be specified in the provided {@code minRetentionTime} and {@code
* maxRetentionTime}.
* An abstract {@link TwoInputStreamOperator} that allows its subclasses to clean up their state in
* sync state based on a TTL. This TTL should be specified in the provided {@code minRetentionTime}
* and {@code maxRetentionTime}.
*
* <p>For each known key, this operator registers a timer (in processing time) to fire after the TTL
* expires. When the timer fires, the subclass can decide which state to cleanup and what further
Expand Down Expand Up @@ -115,7 +115,7 @@ protected void registerProcessingCleanupTimer() throws IOException {
Optional<Long> currentCleanupTime =
Optional.ofNullable(latestRegisteredCleanupTimer.value());

if (!currentCleanupTime.isPresent()
if (currentCleanupTime.isEmpty()
|| (currentProcessingTime + minRetentionTime) > currentCleanupTime.get()) {

updateCleanupTimer(currentProcessingTime, currentCleanupTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.operators.join.temporal.utils.TemporalProcessTimeJoinHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

/**
* The operator to temporal join a stream on processing time.
* The operator to temporal join a stream on processing time in sync state.
*
* <p>For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) and temporal table
* join (FOR SYSTEM_TIME AS OF), they can reuse same processing-time operator implementation, the
Expand All @@ -60,6 +61,8 @@ public class TemporalProcessTimeJoinOperator extends BaseTwoInputStreamOperatorW
private transient GenericRowData rightNullRow;
private transient TimestampedCollector<RowData> collector;

private transient SyncStateTemporalProcessTimeJoinHelper temporalProcessTimeJoinHelper;

public TemporalProcessTimeJoinOperator(
InternalTypeInfo<RowData> rightType,
GeneratedJoinCondition generatedJoinCondition,
Expand Down Expand Up @@ -88,38 +91,21 @@ public void open() throws Exception {
this.rightNullRow = new GenericRowData(rightType.toRowSize());
// consider watermark from left stream only.
super.processWatermark2(Watermark.MAX_WATERMARK);
this.temporalProcessTimeJoinHelper = new SyncStateTemporalProcessTimeJoinHelper();
}

@Override
public void processElement1(StreamRecord<RowData> element) throws Exception {
RowData leftSideRow = element.getValue();
RowData rightSideRow = rightState.value();

if (rightSideRow == null) {
if (isLeftOuterJoin) {
collectJoinedRow(leftSideRow, rightNullRow);
} else {
return;
}
} else {
if (joinCondition.apply(leftSideRow, rightSideRow)) {
collectJoinedRow(leftSideRow, rightSideRow);
} else {
if (isLeftOuterJoin) {
collectJoinedRow(leftSideRow, rightNullRow);
}
}
temporalProcessTimeJoinHelper.processElement1(leftSideRow, rightSideRow);
if (rightSideRow != null) {
// register a cleanup timer only if the rightSideRow is not null
registerProcessingCleanupTimer();
}
}

private void collectJoinedRow(RowData leftRow, RowData rightRow) {
outRow.setRowKind(leftRow.getRowKind());
outRow.replace(leftRow, rightRow);
collector.collect(outRow);
}

@Override
public void processElement2(StreamRecord<RowData> element) throws Exception {
if (RowDataUtil.isAccumulateMsg(element.getValue())) {
Expand Down Expand Up @@ -150,4 +136,10 @@ public void cleanupState(long time) {
/** Invoked when an event-time timer fires. */
@Override
public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {}

private class SyncStateTemporalProcessTimeJoinHelper extends TemporalProcessTimeJoinHelper {
public SyncStateTemporalProcessTimeJoinHelper() {
super(isLeftOuterJoin, joinCondition, outRow, rightNullRow, collector);
}
}
}
Loading