Skip to content

Commit

Permalink
[FLINK-36883] Adapt time attributes from the VIEWs query (#25779)
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys authored Dec 11, 2024
1 parent 4d4cb34 commit 529f640
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@

import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;

import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelRecordType;

import javax.annotation.Nullable;

import java.util.AbstractList;
import java.util.List;

/**
Expand Down Expand Up @@ -56,6 +60,62 @@ public SqlCatalogViewTable(
public RelNode convertToRel(ToRelContext context) {
RelNode original =
context.expandView(rowType, view.getExpandedQuery(), viewPath, names).project();
return RelOptUtil.createCastRel(original, rowType, true);
RelDataType castTargetType =
adaptTimeAttributes(
original.getRowType(), rowType, context.getCluster().getTypeFactory());
return RelOptUtil.createCastRel(original, castTargetType, true);
}

private static RelDataType adaptTimeAttributes(
RelDataType queryType, RelDataType targetType, RelDataTypeFactory typeFactory) {
if (queryType instanceof RelRecordType) {
if (RelOptUtil.areRowTypesEqual(queryType, targetType, true)) {
return targetType;
} else if (targetType.getFieldCount() != queryType.getFieldCount()) {
throw new IllegalArgumentException(
"Field counts are not equal: queryType ["
+ queryType
+ "]"
+ " castRowType ["
+ targetType
+ "]");
} else {
return adaptTimeAttributeInRecord(
(RelRecordType) queryType, (RelRecordType) targetType, typeFactory);
}
} else {
return adaptTimeAttributeInSimpleType(queryType, targetType, typeFactory);
}
}

private static RelDataType adaptTimeAttributeInRecord(
RelRecordType queryType, RelRecordType targetType, RelDataTypeFactory typeFactory) {
RelDataType structType =
typeFactory.createStructType(
targetType.getStructKind(),
new AbstractList<>() {
public RelDataType get(int index) {
RelDataType targetFieldType =
(targetType.getFieldList().get(index)).getType();
RelDataType queryFieldType =
(queryType.getFieldList().get(index)).getType();
return adaptTimeAttributes(
queryFieldType, targetFieldType, typeFactory);
}

public int size() {
return targetType.getFieldCount();
}
},
targetType.getFieldNames());
return typeFactory.createTypeWithNullability(structType, targetType.isNullable());
}

private static RelDataType adaptTimeAttributeInSimpleType(
RelDataType queryType, RelDataType targetType, RelDataTypeFactory typeFactory) {
if (queryType instanceof TimeIndicatorRelDataType) {
return typeFactory.createTypeWithNullability(queryType, targetType.isNullable());
}
return targetType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@
package org.apache.flink.table.planner.catalog;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
Expand All @@ -44,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;
Expand All @@ -58,10 +64,10 @@ private static Collection<Boolean> parameters() {
return Arrays.asList(true, false);
}

@Parameter private boolean isStreamingMode;
@Parameter private boolean streamingMode;

private TableTestUtil getTestUtil() {
if (isStreamingMode) {
if (streamingMode) {
return streamTestUtil(TableConfig.getDefault());
} else {
return batchTestUtil(TableConfig.getDefault());
Expand All @@ -75,7 +81,7 @@ void testResolvingSchemaOfCustomCatalogTableSql() throws Exception {
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
new CustomCatalogTable(streamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);
tableEnvironment.executeSql(
Expand All @@ -92,7 +98,7 @@ void testResolvingSchemaOfCustomCatalogTableTableApi() throws Exception {
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
new CustomCatalogTable(streamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);

Expand All @@ -107,7 +113,7 @@ void testResolvingSchemaOfCustomCatalogTableTableApi() throws Exception {

@TestTemplate
void testResolvingProctimeOfCustomTableSql() throws Exception {
if (!isStreamingMode) {
if (!streamingMode) {
// proctime not supported in batch
return;
}
Expand All @@ -116,7 +122,7 @@ void testResolvingProctimeOfCustomTableSql() throws Exception {
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
new CustomCatalogTable(streamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);

Expand All @@ -127,7 +133,7 @@ void testResolvingProctimeOfCustomTableSql() throws Exception {

@TestTemplate
void testResolvingProctimeOfCustomTableTableApi() throws Exception {
if (!isStreamingMode) {
if (!streamingMode) {
// proctime not supported in batch
return;
}
Expand All @@ -136,7 +142,7 @@ void testResolvingProctimeOfCustomTableTableApi() throws Exception {
GenericInMemoryCatalog genericInMemoryCatalog = new GenericInMemoryCatalog("in-memory");
genericInMemoryCatalog.createTable(
new ObjectPath("default", "testTable"),
new CustomCatalogTable(isStreamingMode),
new CustomCatalogTable(streamingMode),
false);
tableEnvironment.registerCatalog("testCatalog", genericInMemoryCatalog);

Expand All @@ -149,12 +155,119 @@ void testResolvingProctimeOfCustomTableTableApi() throws Exception {
testUtil.verifyExecPlan(table);
}

@TestTemplate
void testTimeAttributeOfView() {
if (!streamingMode) {
// time attributes not supported in batch
return;
}
TableTestUtil testUtil = getTestUtil();
TableEnvironment tableEnvironment = testUtil.getTableEnv();
tableEnvironment.registerCatalog("cat", new CustomCatalog("cat"));
tableEnvironment.executeSql(
"CREATE TABLE t(i INT, ts TIMESTAMP_LTZ(3), WATERMARK FOR "
+ "ts AS ts) WITH ('connector' = 'datagen')");
tableEnvironment.executeSql("CREATE VIEW `cat`.`default`.v AS SELECT * FROM t");
testUtil.verifyExecPlan(
"SELECT sum(i), window_start "
+ "FROM TUMBLE(\n"
+ " DATA => TABLE `cat`.`default`.v,\n"
+ " TIMECOL => DESCRIPTOR(ts),\n"
+ " SIZE => INTERVAL '10' MINUTES)\n"
+ "GROUP BY window_start, window_end");
}

private static class CustomCatalog extends GenericInMemoryCatalog {
public CustomCatalog(String name) {
super(name);
}

@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException {
CatalogBaseTable table = super.getTable(tablePath);
if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
return new CustomView((CatalogView) table);
}
return table;
}
}

private static class CustomView implements CatalogView {

private final CatalogView origin;

public CustomView(CatalogView table) {
this.origin = table;
}

@Override
public String getOriginalQuery() {
return origin.getOriginalQuery();
}

@Override
public String getExpandedQuery() {
return origin.getExpandedQuery();
}

@Override
public Map<String, String> getOptions() {
return origin.getOptions();
}

@Override
public Schema getUnresolvedSchema() {
Schema originalSchema = origin.getUnresolvedSchema();
return Schema.newBuilder()
.fromColumns(
originalSchema.getColumns().stream()
.map(
c -> {
if (c instanceof UnresolvedPhysicalColumn) {
DataType dataType =
(DataType)
((UnresolvedPhysicalColumn) c)
.getDataType();
String stringType =
dataType.getLogicalType()
.asSerializableString();
return new UnresolvedPhysicalColumn(
c.getName(), DataTypes.of(stringType));
}
throw new UnsupportedOperationException(
"Unexpected column type");
})
.collect(Collectors.toList()))
.build();
}

@Override
public String getComment() {
return origin.getComment();
}

@Override
public CatalogBaseTable copy() {
return new CustomView((CatalogView) origin.copy());
}

@Override
public Optional<String> getDescription() {
return origin.getDescription();
}

@Override
public Optional<String> getDetailedDescription() {
return origin.getDetailedDescription();
}
}

private static class CustomCatalogTable implements CatalogTable {

private final boolean isStreamingMode;
private final boolean streamingMode;

private CustomCatalogTable(boolean isStreamingMode) {
this.isStreamingMode = isStreamingMode;
private CustomCatalogTable(boolean streamingMode) {
this.streamingMode = streamingMode;
}

@Override
Expand All @@ -181,7 +294,7 @@ public Map<String, String> toProperties() {
public Map<String, String> getOptions() {
Map<String, String> map = new HashMap<>();
map.put("connector", "values");
map.put("bounded", Boolean.toString(!isStreamingMode));
map.put("bounded", Boolean.toString(!streamingMode));
return map;
}

Expand Down
Loading

0 comments on commit 529f640

Please sign in to comment.