Skip to content

Commit

Permalink
[FLINK-36277][table] Create a DynamicTableSource instead of registeri…
Browse files Browse the repository at this point in the history
…ng a legacy TableSource in tests and remove TableEnvironmentInternal#registerTableSourceInternal
  • Loading branch information
xuyangzhong authored and lsyldliu committed Jan 3, 2025
1 parent c7b6212 commit bbbd160
Show file tree
Hide file tree
Showing 154 changed files with 8,285 additions and 8,081 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,8 @@ protected List<Transformation<?>> translate(List<ModifyOperation> modifyOperatio
return planner.translate(modifyOperations);
}

@Override
public void registerTableSourceInternal(String name, TableSource<?> tableSource) {
/** TODO FLINK-36132 Remove this method later. */
private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
validateTableSource(tableSource);
ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,6 @@ default String explainInternal(List<Operation> operations, ExplainDetail... extr
String explainInternal(
List<Operation> operations, ExplainFormat format, ExplainDetail... extraDetails);

/**
* Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog.
* Registered tables can be referenced in SQL queries.
*
* <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists,
* it will be inaccessible in the current session. To make the permanent object available again
* one can drop the corresponding temporary object.
*
* @param name The name under which the {@link TableSource} is registered.
* @param tableSource The {@link TableSource} to register.
*/
void registerTableSourceInternal(String name, TableSource<?> tableSource);

/**
* Registers an external {@link TableSink} with already configured field names and field types
* in this {@link TableEnvironment}'s catalog. Registered sink tables can be referenced in SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@
* : : +- Exchange(distribution=[hash[a]])
* : : +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
* : : +- Calc(select=[a])
* : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
* : : +- TableSourceScan(table=[[default_catalog, default_database, x]], fields=[a, b, c])
* : +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) AS cnt2], reuse_id=[1])
* : +- Exchange(distribution=[hash[d]])
* : +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0])
* : +- Calc(select=[d])
* : +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
* : +- TableSourceScan(table=[[default_catalog, default_database, y]], fields=[d, e, f])
* +- Calc(select=[cnt1, CAST(cnt2) AS cnt2])
* +- HashJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, cnt2, a, cnt1], build=[right])
* :- Reused(reference_id=[1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
import org.apache.calcite.plan.RelOptRule.{any, operandJ}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.{Aggregate, Filter, RelFactories}
import org.apache.calcite.rex.{RexShuttle, _}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.fun.SqlCountAggFunction
Expand All @@ -39,11 +39,11 @@ import scala.collection.JavaConversions._
* {{{
* LogicalProject(a=[$0], b=[$1], c=[$2])
* +- LogicalJoin(condition=[$3], joinType=[semi])
* :- LogicalTableScan(table=[[x, source: [TestTableSource(a, b, c)]]])
* :- LogicalTableScan(table=[[x]])
* +- LogicalProject($f0=[IS NOT NULL($0)])
* +- LogicalAggregate(group=[{}], m=[MIN($0)])
* +- LogicalProject(i=[true])
* +- LogicalTableScan(table=[[y, source: [TestTableSource(d, e, f)]]])
* +- LogicalTableScan(table=[[y]])
* }}}
*/
class FlinkRewriteSubQueryRule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ import scala.collection.JavaConversions._
* +- FlinkLogicalExpand(projects=[a, b, c, $f3, $f4, $e])
* +- FlinkLogicalCalc(select=[a, b, c, MOD(HASH_CODE(b), 1024) AS $f3,
* MOD(HASH_CODE(c), 1024) AS $f4])
* +- FlinkLogicalTableSourceScan(table=[[MyTable,
* source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
* +- FlinkLogicalTableSourceScan(table=[[MyTable]], fields=[a, b, c])
* }}}
*
* '$e = 1' is equivalent to 'group by a, hash(b) % 1024' '$e = 2' is equivalent to 'group by a,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ org.apache.flink.table.planner.plan.stream.sql.TestTableFactory
org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory
org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory
org.apache.flink.table.planner.factories.TestProcedureCatalogFactory
org.apache.flink.table.planner.utils.TestSimpleDynamicTableSourceFactory
Original file line number Diff line number Diff line change
Expand Up @@ -450,27 +450,27 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Physical Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
== Optimized Execution Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testExplainWithTableSourceScan[extended=false]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Physical Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
== Optimized Execution Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,27 @@ DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a,
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Physical Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
== Optimized Execution Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testExplainTableSourceScan[extended=true]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
== Optimized Physical Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], changelogMode=[I]): rowcount = , cumulative cost = {rows, cpu, io, network, memory}
== Optimized Execution Plan ==
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit bbbd160

Please sign in to comment.