Skip to content

Commit 17862df

Browse files
committed
[FLINK-38511][table] Join that consumes cdc source without delete may be converted to delta join
1 parent e9c7d81 commit 17862df

File tree

10 files changed

+1445
-522
lines changed

10 files changed

+1445
-522
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@
7979
import org.slf4j.Logger;
8080
import org.slf4j.LoggerFactory;
8181

82+
import javax.annotation.Nullable;
83+
8284
import java.util.Arrays;
8385
import java.util.List;
8486
import java.util.Map;
@@ -116,6 +118,9 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
116118
"lookupRightTableJoinSpec";
117119
private static final String FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC = "lookupLeftTableJoinSpec";
118120

121+
private static final String FIELD_NAME_LEFT_UPSERT_KEY = "leftUpsertKey";
122+
private static final String FIELD_NAME_RIGHT_UPSERT_KEY = "rightUpsertKey";
123+
119124
private static final String FIELD_NAME_JOIN_TYPE = "joinType";
120125

121126
public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
@@ -134,6 +139,10 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
134139
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS)
135140
private final int[] leftJoinKeys;
136141

142+
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
143+
@JsonInclude(JsonInclude.Include.NON_NULL)
144+
private final int[] leftUpsertKeys;
145+
137146
// left (streaming) side join right (lookup) side
138147
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
139148
private final DeltaJoinSpec lookupRightTableJoinSpec;
@@ -143,6 +152,10 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
143152
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS)
144153
private final int[] rightJoinKeys;
145154

155+
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
156+
@JsonInclude(JsonInclude.Include.NON_NULL)
157+
private final int[] rightUpsertKeys;
158+
146159
// right (streaming) side join left (lookup) side
147160
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
148161
private final DeltaJoinSpec lookupLeftTableJoinSpec;
@@ -152,9 +165,11 @@ public StreamExecDeltaJoin(
152165
FlinkJoinType flinkJoinType,
153166
// delta join args related with the left side
154167
int[] leftJoinKeys,
168+
@Nullable int[] leftUpsertKeys,
155169
DeltaJoinSpec lookupRightTableJoinSpec,
156170
// delta join args related with the right side
157171
int[] rightJoinKeys,
172+
@Nullable int[] rightUpsertKeys,
158173
DeltaJoinSpec lookupLeftTableJoinSpec,
159174
InputProperty leftInputProperty,
160175
InputProperty rightInputProperty,
@@ -167,8 +182,10 @@ public StreamExecDeltaJoin(
167182
ExecNodeContext.newPersistedConfig(StreamExecDeltaJoin.class, tableConfig),
168183
flinkJoinType,
169184
leftJoinKeys,
185+
leftUpsertKeys,
170186
lookupRightTableJoinSpec,
171187
rightJoinKeys,
188+
rightUpsertKeys,
172189
lookupLeftTableJoinSpec,
173190
Lists.newArrayList(leftInputProperty, rightInputProperty),
174191
outputType,
@@ -183,9 +200,11 @@ public StreamExecDeltaJoin(
183200
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
184201
@JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType flinkJoinType,
185202
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) int[] leftJoinKeys,
203+
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) @Nullable int[] leftUpsertKeys,
186204
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
187205
DeltaJoinSpec lookupRightTableJoinSpec,
188206
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) int[] rightJoinKeys,
207+
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) @Nullable int[] rightUpsertKeys,
189208
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
190209
DeltaJoinSpec lookupLeftTableJoinSpec,
191210
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@@ -196,8 +215,10 @@ public StreamExecDeltaJoin(
196215

197216
this.flinkJoinType = flinkJoinType;
198217
this.leftJoinKeys = leftJoinKeys;
218+
this.leftUpsertKeys = leftUpsertKeys;
199219
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
200220
this.rightJoinKeys = rightJoinKeys;
221+
this.rightUpsertKeys = rightUpsertKeys;
201222
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
202223
this.asyncLookupOptions = asyncLookupOptions;
203224
}
@@ -236,17 +257,15 @@ protected Transformation<RowData> translateToPlanInternal(
236257
RowDataKeySelector leftJoinKeySelector =
237258
KeySelectorUtil.getRowDataSelector(
238259
classLoader, leftJoinKeys, InternalTypeInfo.of(leftStreamType));
239-
// currently, delta join only supports consuming INSERT-ONLY stream
240260
RowDataKeySelector leftUpsertKeySelector =
241-
getUpsertKeySelector(new int[0], leftStreamType, classLoader);
261+
getUpsertKeySelector(leftUpsertKeys, leftStreamType, classLoader);
242262

243263
// right side selector
244264
RowDataKeySelector rightJoinKeySelector =
245265
KeySelectorUtil.getRowDataSelector(
246266
classLoader, rightJoinKeys, InternalTypeInfo.of(rightStreamType));
247-
// currently, delta join only supports consuming INSERT-ONLY stream
248267
RowDataKeySelector rightUpsertKeySelector =
249-
getUpsertKeySelector(new int[0], rightStreamType, classLoader);
268+
getUpsertKeySelector(rightUpsertKeys, rightStreamType, classLoader);
250269

251270
StreamOperatorFactory<RowData> operatorFactory =
252271
createAsyncLookupDeltaJoin(
@@ -484,9 +503,9 @@ public RexNode visitInputRef(RexInputRef inputRef) {
484503
}
485504

486505
private RowDataKeySelector getUpsertKeySelector(
487-
int[] upsertKey, RowType rowType, ClassLoader classLoader) {
506+
@Nullable int[] upsertKey, RowType rowType, ClassLoader classLoader) {
488507
final int[] rightUpsertKeys;
489-
if (upsertKey.length > 0) {
508+
if (upsertKey != null && upsertKey.length > 0) {
490509
rightUpsertKeys = upsertKey;
491510
} else {
492511
rightUpsertKeys = IntStream.range(0, rowType.getFields().size()).toArray();

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,39 +22,36 @@
2222
import org.apache.flink.table.api.TableConfig;
2323
import org.apache.flink.table.api.config.ExecutionConfigOptions;
2424
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
25+
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
2526
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
2627
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
2728
import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
2829
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin;
2930
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
31+
import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
3032
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
33+
import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil;
3134
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
32-
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
3335

3436
import org.apache.calcite.plan.RelOptCluster;
3537
import org.apache.calcite.plan.RelTraitSet;
36-
import org.apache.calcite.rel.BiRel;
3738
import org.apache.calcite.rel.RelNode;
3839
import org.apache.calcite.rel.RelWriter;
39-
import org.apache.calcite.rel.core.JoinInfo;
40+
import org.apache.calcite.rel.core.Join;
41+
import org.apache.calcite.rel.core.JoinRelType;
4042
import org.apache.calcite.rel.hint.Hintable;
4143
import org.apache.calcite.rel.hint.RelHint;
4244
import org.apache.calcite.rel.type.RelDataType;
4345
import org.apache.calcite.rex.RexNode;
4446

47+
import java.util.Collections;
4548
import java.util.List;
4649
import java.util.Optional;
4750

4851
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
4952

5053
/** Stream physical RelNode for delta join. */
51-
public class StreamPhysicalDeltaJoin extends BiRel implements StreamPhysicalRel, Hintable {
52-
53-
private final FlinkJoinType joinType;
54-
55-
private final RexNode originalJoinCondition;
56-
57-
private final com.google.common.collect.ImmutableList<RelHint> hints;
54+
public class StreamPhysicalDeltaJoin extends Join implements StreamPhysicalRel, Hintable {
5855

5956
private final RelDataType rowType;
6057

@@ -70,15 +67,20 @@ public StreamPhysicalDeltaJoin(
7067
List<RelHint> hints,
7168
RelNode left,
7269
RelNode right,
73-
FlinkJoinType joinType,
70+
JoinRelType joinType,
7471
RexNode originalJoinCondition,
7572
DeltaJoinSpec lookupRightTableJoinSpec,
7673
DeltaJoinSpec lookupLeftTableJoinSpec,
7774
RelDataType rowType) {
78-
super(cluster, traitSet, left, right);
79-
this.hints = com.google.common.collect.ImmutableList.copyOf(hints);
80-
this.joinType = joinType;
81-
this.originalJoinCondition = originalJoinCondition;
75+
super(
76+
cluster,
77+
traitSet,
78+
hints,
79+
left,
80+
right,
81+
originalJoinCondition,
82+
Collections.emptySet(),
83+
joinType);
8284
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
8385
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
8486
this.rowType = rowType;
@@ -97,15 +99,20 @@ public ExecNode<?> translateToExecNode() {
9799
// scenarios to enhance throughput as much as possible.
98100
true,
99101
AsyncDataStream.OutputMode.ORDERED);
102+
FlinkRelMetadataQuery fmq =
103+
FlinkRelMetadataQuery.reuseOrCreate(this.getCluster().getMetadataQuery());
100104

101-
JoinInfo joinInfo = JoinInfo.of(left, right, originalJoinCondition);
105+
int[] leftUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(left)).orElse(null);
106+
int[] rightUpsertKey = UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(right)).orElse(null);
102107

103108
return new StreamExecDeltaJoin(
104109
config,
105-
joinType,
110+
JoinTypeUtil.getFlinkJoinType(joinType),
106111
joinInfo.leftKeys.toIntArray(),
112+
leftUpsertKey,
107113
lookupRightTableJoinSpec,
108114
joinInfo.rightKeys.toIntArray(),
115+
rightUpsertKey,
109116
lookupLeftTableJoinSpec,
110117
InputProperty.DEFAULT,
111118
InputProperty.DEFAULT,
@@ -120,16 +127,21 @@ public boolean requireWatermark() {
120127
}
121128

122129
@Override
123-
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
124-
assert inputs.size() == 2;
130+
public Join copy(
131+
RelTraitSet traitSet,
132+
RexNode conditionExpr,
133+
RelNode left,
134+
RelNode right,
135+
JoinRelType joinType,
136+
boolean semiJoinDone) {
125137
return new StreamPhysicalDeltaJoin(
126138
getCluster(),
127139
traitSet,
128140
hints,
129-
inputs.get(0),
130-
inputs.get(1),
141+
left,
142+
right,
131143
joinType,
132-
originalJoinCondition,
144+
conditionExpr,
133145
lookupRightTableJoinSpec,
134146
lookupLeftTableJoinSpec,
135147
rowType);
@@ -147,12 +159,13 @@ public com.google.common.collect.ImmutableList<RelHint> getHints() {
147159

148160
@Override
149161
public RelWriter explainTerms(RelWriter pw) {
150-
return super.explainTerms(pw)
151-
.item("joinType", joinType.toString())
162+
return pw.input("left", left)
163+
.input("right", right)
164+
.item("joinType", JoinTypeUtil.getFlinkJoinType(joinType).toString())
152165
.item(
153166
"where",
154167
getExpressionString(
155-
originalJoinCondition,
168+
condition,
156169
JavaScalaConversionUtil.toScala(this.getRowType().getFieldNames())
157170
.toList(),
158171
JavaScalaConversionUtil.toScala(Optional.empty()),

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
2626
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
2727
import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
28-
import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
2928

3029
import org.apache.calcite.plan.RelOptRuleCall;
3130
import org.apache.calcite.plan.RelRule;
@@ -44,7 +43,9 @@
4443
* <li>The join is INNER join.
4544
* <li>There is at least one join key pair in the join.
4645
* <li>The downstream nodes of this join can accept duplicate changes.
47-
* <li>All join inputs are insert only streams.
46+
* <li>All join inputs are with changelog "I" or "I, UA".
47+
* <li>If this join outputs update records, the non-equiv conditions must be applied on upsert
48+
* keys of this join.
4849
* <li>All upstream nodes of this join are in {@code
4950
* DeltaJoinUtil#ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES}
5051
* <li>The join keys include at least one complete index in each source table of the join input.
@@ -92,7 +93,7 @@ private StreamPhysicalDeltaJoin convertToDeltaJoin(StreamPhysicalJoin join) {
9293
join.getHints(),
9394
join.getLeft(),
9495
join.getRight(),
95-
JoinTypeUtil.getFlinkJoinType(join.getJoinType()),
96+
join.getJoinType(),
9697
join.getCondition(),
9798
lookupRightTableJoinSpec,
9899
lookupLeftTableJoinSpec,

0 commit comments

Comments
 (0)