Skip to content

Commit 0656c75

Browse files
authored
[FLINK-37974][table] Rename argument traits to ROW_SEMANTIC_TABLE/SET_SEMANTIC_TABLE
1 parent 97377ef commit 0656c75

File tree

31 files changed

+394
-370
lines changed

31 files changed

+394
-370
lines changed

docs/content.zh/docs/dev/table/functions/ptfs.md

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ import static org.apache.flink.table.api.Expressions.*;
8181
// A PTF that takes a table argument, conceptually viewing the table as a row.
8282
// The result is never stateful and derived purely based on the current row.
8383
public static class Greeting extends ProcessTableFunction<String> {
84-
public void eval(@ArgumentHint(ArgumentTrait.TABLE_AS_ROW) Row input) {
84+
public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
8585
collect("Hello " + input.getFieldAs("name") + "!");
8686
}
8787
}
@@ -140,7 +140,7 @@ public static class GreetingWithMemory extends ProcessTableFunction<String> {
140140
public long counter = 0L;
141141
}
142142

143-
public void eval(@StateHint CountState state, @ArgumentHint(ArgumentTrait.TABLE_AS_SET) Row input) {
143+
public void eval(@StateHint CountState state, @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
144144
state.counter++;
145145
collect("Hello " + input.getFieldAs("name") + ", your " + state.counter + " time?");
146146
}
@@ -204,7 +204,7 @@ public static class GreetingWithFollowUp extends ProcessTableFunction<String> {
204204
public void eval(
205205
Context ctx,
206206
@StateHint StayState state,
207-
@ArgumentHint(ArgumentTrait.TABLE_AS_SET) Row input
207+
@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input
208208
) {
209209
state.name = input.getFieldAs("name");
210210
state.counter++;
@@ -276,7 +276,7 @@ Table Semantics and Virtual Processors
276276
PTFs can produce a new table by consuming tables as arguments. For scalability, input tables are distributed across
277277
so-called "virtual processors". A virtual processor, as defined by the SQL standard, executes a PTF instance and has
278278
access only to a portion of the entire table. The argument declaration decides about the size of the portion and
279-
co-location of data. Conceptually, tables can be processed either "as row" (i.e. with row semantics) or "as set"
279+
co-location of data. Conceptually, tables can be processed either "per row" (i.e. with row semantics) or "per set"
280280
(i.e. with set semantics).
281281

282282
{{<img alt="PTF Table Semantics" src="/fig/table-streaming/ptf_table_semantics.png" width="80%">}}
@@ -313,7 +313,7 @@ Given a PTF `TableFilter` that has been implemented as:
313313
```java
314314
@DataTypeHint("ROW<threshold INT, score BIGINT>")
315315
public static class TableFilter extends ProcessTableFunction<Row> {
316-
public void eval(@ArgumentHint(ArgumentTrait.TABLE_AS_ROW) Row input, int threshold) {
316+
public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input, int threshold) {
317317
long score = input.getFieldAs("score");
318318
if (score > threshold) {
319319
collect(Row.of(threshold, score));
@@ -326,7 +326,7 @@ public static class TableFilter extends ProcessTableFunction<Row> {
326326

327327
The effective call signature is:
328328
```text
329-
TableFilter(input => {TABLE, TABLE AS ROW}, threshold => INT NOT NULL, on_time => DESCRIPTOR, uid => STRING)
329+
TableFilter(input => {TABLE, ROW SEMANTIC TABLE}, threshold => INT NOT NULL, on_time => DESCRIPTOR, uid => STRING)
330330
```
331331

332332
Both `on_time` and `uid` are optional by default. The `on_time` is required if time semantics are needed. The `uid` must
@@ -472,7 +472,7 @@ The `@ArgumentHint` annotation enables declaring the name, data type, and traits
472472

473473
In most cases, the system can automatically infer the name and data type reflectively, so they do not need to be
474474
specified. However, traits must be provided explicitly, particularly when defining the argument's kind. The argument
475-
of a PTF can be set to `ArgumentTrait.SCALAR`, `ArgumentTrait.TABLE_AS_SET`, or `ArgumentTrait.TABLE_AS_ROW`. By default,
475+
of a PTF can be set to `ArgumentTrait.SCALAR`, `ArgumentTrait.ROW_SEMANTIC_TABLE`, or `ArgumentTrait.SET_SEMANTIC_TABLE`. By default,
476476
arguments are treated as scalar values.
477477

478478
The following examples show usages of the `@ArgumentHint` annotation:
@@ -485,7 +485,7 @@ The following examples show usages of the `@ArgumentHint` annotation:
485485
class ThresholdFunction extends ProcessTableFunction<Integer> {
486486
public void eval(
487487
// For table arguments, a data type for Row is optional (leading to polymorphic behavior)
488-
@ArgumentHint(value = ArgumentTrait.TABLE_AS_SET, name = "input_table") Row t,
488+
@ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE, name = "input_table") Row t,
489489
// Scalar arguments require a data type either explicit or via reflection
490490
@ArgumentHint(value = ArgumentTrait.SCALAR, name = "threshold") Integer threshold
491491
) {
@@ -501,7 +501,7 @@ class ThresholdFunction extends ProcessTableFunction<Integer> {
501501

502502
#### Table Arguments
503503

504-
The traits `ArgumentTrait.TABLE_AS_SET` and `ArgumentTrait.TABLE_AS_ROW` define table arguments.
504+
The traits `ArgumentTrait.SET_SEMANTIC_TABLE` and `ArgumentTrait.ROW_SEMANTIC_TABLE` define table arguments.
505505

506506
Table arguments can declare a concrete data type (of either row or structured type) or accept any type of row in a
507507
polymorphic fashion.
@@ -523,7 +523,7 @@ e.g. `f(my_table_arg => TABLE t)`.
523523
class MyPTF extends ProcessTableFunction<String> {
524524
public void eval(
525525
Context ctx,
526-
@ArgumentHint(value = ArgumentTrait.TABLE_AS_SET, type = @DataTypeHint("ROW<s STRING>")) Row t
526+
@ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE, type = @DataTypeHint("ROW<s STRING>")) Row t
527527
) {
528528
TableSemantics semantics = ctx.tableSemanticsFor("t");
529529
// Always returns "ROW < s STRING >"
@@ -536,7 +536,7 @@ class MyPTF extends ProcessTableFunction<String> {
536536
class MyPTF extends ProcessTableFunction<String> {
537537
public void eval(
538538
Context ctx,
539-
@ArgumentHint(value = ArgumentTrait.TABLE_AS_SET) Customer c
539+
@ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE) Customer c
540540
) {
541541
TableSemantics semantics = ctx.tableSemanticsFor("c");
542542
// Always returns structured type of "Customer"
@@ -549,7 +549,7 @@ class MyPTF extends ProcessTableFunction<String> {
549549
class MyPTF extends ProcessTableFunction<String> {
550550
public void eval(
551551
Context ctx,
552-
@ArgumentHint(value = ArgumentTrait.TABLE_AS_SET) Row t
552+
@ArgumentHint(value = ArgumentTrait.SET_SEMANTIC_TABLE) Row t
553553
) {
554554
TableSemantics semantics = ctx.tableSemanticsFor("t");
555555
// Always returns "ROW" but content depends on the table that is passed into the call
@@ -572,7 +572,7 @@ information about the input tables and other services provided by the framework.
572572
// Function that accesses the Context for reading the PARTITION BY columns and
573573
// excluding them when building a result string
574574
class ConcatNonKeysFunction extends ProcessTableFunction<String> {
575-
public void eval(Context ctx, @ArgumentHint(ArgumentTrait.TABLE_AS_SET) Row inputTable) {
575+
public void eval(Context ctx, @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row inputTable) {
576576
TableSemantics semantics = ctx.tableSemanticsFor("inputTable");
577577

578578
List<Integer> keys = Arrays.asList(semantics.partitionByColumns());
@@ -626,7 +626,7 @@ class CountingFunction extends ProcessTableFunction<String> {
626626

627627
public void eval(
628628
@StateHint CountState memory,
629-
@ArgumentHint(TABLE_AS_SET) Row input
629+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
630630
) {
631631
memory.count++;
632632
collect("Seen rows: " + memory.count);
@@ -641,7 +641,7 @@ class CountingFunction extends ProcessTableFunction<String> {
641641

642642
public void eval(
643643
@StateHint SeenState memory,
644-
@ArgumentHint(TABLE_AS_SET) Row input
644+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
645645
) {
646646
if (memory.first == null) {
647647
memory.first = input.toString();
@@ -655,7 +655,7 @@ class CountingFunction extends ProcessTableFunction<String> {
655655
class CountingFunction extends ProcessTableFunction<String> {
656656
public void eval(
657657
@StateHint(type = @DataTypeHint("ROW<count BIGINT>")) Row memory,
658-
@ArgumentHint(TABLE_AS_SET) Row input
658+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
659659
) {
660660
Long newCount = 1L;
661661
if (memory.getField("count") != null) {
@@ -699,7 +699,7 @@ class CountingFunction extends ProcessTableFunction<String> {
699699
@StateHint(ttl = "1 hour") SomeState shortTermState,
700700
@StateHint(ttl = "1 day") SomeState longTermState,
701701
@StateHint SomeState infiniteState, // potentially influenced by table.exec.state.ttl
702-
@ArgumentHint(TABLE_AS_SET) Row input
702+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
703703
) {
704704
...
705705
}
@@ -749,7 +749,7 @@ large state is stored as a map state.
749749
class LargeHistoryFunction extends ProcessTableFunction<String> {
750750
public void eval(
751751
@StateHint MapView<String, Integer> largeMemory,
752-
@ArgumentHint(TABLE_AS_SET) Row input
752+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
753753
) {
754754
String eventId = input.getFieldAs("eventId");
755755
Integer count = largeMemory.get(eventId);
@@ -778,7 +778,7 @@ and the `MAP` data type for map views.
778778
class LargeHistoryFunction extends ProcessTableFunction<String> {
779779
public void eval(
780780
@StateHint(type = @DataTypeHint("ARRAY<ROW<s STRING, i INT>>")) ListView<Row> largeMemory,
781-
@ArgumentHint(TABLE_AS_SET) Row input
781+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
782782
) {
783783
...
784784
}
@@ -806,7 +806,7 @@ class CountingFunction extends ProcessTableFunction<String> {
806806
public void eval(
807807
Context ctx,
808808
@StateHint(ttl = "1 day") SeenState memory,
809-
@ArgumentHint(TABLE_AS_SET) Row input
809+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
810810
) {
811811
if (memory.first == null) {
812812
memory.first = input.toString();
@@ -866,7 +866,7 @@ Once an `on_time` argument is provided, timers can be used. The following motiva
866866
public static class PingLaterFunction extends ProcessTableFunction<String> {
867867
public void eval(
868868
Context ctx,
869-
@ArgumentHint({ArgumentTrait.TABLE_AS_SET, ArgumentTrait.REQUIRE_ON_TIME}) Row input
869+
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME}) Row input
870870
) {
871871
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
872872
// Replaces an existing timer and thus potentially resets the minute if necessary
@@ -986,7 +986,7 @@ class TimerFunction extends ProcessTableFunction<String> {
986986
public void eval(
987987
Context ctx,
988988
@StateHint SeenState memory,
989-
@ArgumentHint({TABLE_AS_SET, REQUIRE_ON_TIME}) Row input
989+
@ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input
990990
) {
991991
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
992992
if (memory.seen == null) {
@@ -1068,11 +1068,11 @@ public static class GreetingWithLastPurchase extends ProcessTableFunction<String
10681068
public String lastItem;
10691069
}
10701070

1071-
// The eval() method takes two @ArgumentHint(TABLE_AS_SET) arguments
1071+
// The eval() method takes two @ArgumentHint(SET_SEMANTIC_TABLE) arguments
10721072
public void eval(
10731073
@StateHint LastItemState state,
1074-
@ArgumentHint(TABLE_AS_SET) Row visit,
1075-
@ArgumentHint(TABLE_AS_SET) Row purchase) {
1074+
@ArgumentHint(SET_SEMANTIC_TABLE) Row visit,
1075+
@ArgumentHint(SET_SEMANTIC_TABLE) Row purchase) {
10761076

10771077
// Process row from table Purchases
10781078
if (purchase != null) {
@@ -1374,7 +1374,7 @@ upsertsOnly.execute().print();
13741374

13751375
@DataTypeHint("ROW<flag STRING, sum INT>")
13761376
public static class ToChangelogFunction extends ProcessTableFunction<Row> {
1377-
public void eval(@ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row input) {
1377+
public void eval(@ArgumentHint({SET_SEMANTIC_TABLE, SUPPORT_UPDATES}) Row input) {
13781378
// Forwards the sum column and includes the row's kind as a string column.
13791379
Row changelogRow =
13801380
Row.of(
@@ -1434,7 +1434,7 @@ called for each step:
14341434
regardless of nullability constraints. {@link ChangelogContext#getRequiredChangelogMode()}
14351435
indicates whether a downstream operator requires full deletes.
14361436

1437-
Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see `ArgumentTrait.TABLE_AS_SET`).
1437+
Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see `ArgumentTrait.SET_SEMANTIC_TABLE`).
14381438
In case of upserts, the upsert key must be equal to the PARTITION BY key.
14391439

14401440
It is perfectly valid for a `ChangelogFunction` implementation to return a fixed `ChangelogMode`, regardless of the
@@ -1493,7 +1493,7 @@ public static class CustomAggregation
14931493
public Integer sum = 0;
14941494
}
14951495

1496-
public void eval(@StateHint Accumulator state, @ArgumentHint(TABLE_AS_SET) Row input) {
1496+
public void eval(@StateHint Accumulator state, @ArgumentHint(SET_SEMANTIC_TABLE) Row input) {
14971497
int score = input.getFieldAs("score");
14981498

14991499
// A negative state indicates that the partition
@@ -1614,7 +1614,7 @@ public static class CheckoutProcessor extends ProcessTableFunction<Row> {
16141614
public void eval(
16151615
Context ctx,
16161616
@StateHint ShoppingCart cart,
1617-
@ArgumentHint({TABLE_AS_SET, REQUIRE_ON_TIME}) Row events,
1617+
@ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row events,
16181618
Duration reminderInterval,
16191619
Duration timeoutInterval
16201620
) {
@@ -1808,24 +1808,24 @@ public static class Joiner extends ProcessTableFunction<JoinResult> {
18081808
public void eval(
18091809
Context ctx,
18101810
@StateHint(ttl = "1 hour") JoinResult seen,
1811-
@ArgumentHint(TABLE_AS_SET) Order order,
1812-
@ArgumentHint(TABLE_AS_SET) Payment payment
1811+
@ArgumentHint(SET_SEMANTIC_TABLE) Order order,
1812+
@ArgumentHint(SET_SEMANTIC_TABLE) Payment payment
18131813
) {
1814-
if (input.order != null) {
1814+
if (order != null) {
18151815
if (seen.order != null) {
18161816
// skip duplicates
18171817
return;
18181818
} else {
18191819
// wait for matching payment
1820-
seen.order = input.order;
1820+
seen.order = order;
18211821
}
1822-
} else if (input.payment != null) {
1822+
} else if (payment != null) {
18231823
if (seen.payment != null) {
18241824
// skip duplicates
18251825
return;
18261826
} else {
18271827
// wait for matching order
1828-
seen.payment = input.payment;
1828+
seen.payment = payment;
18291829
}
18301830
}
18311831

0 commit comments

Comments
 (0)