Skip to content

Commit

Permalink
[FLINK-XXX][table] Add support convert alter materialized table as qu…
Browse files Browse the repository at this point in the history
…ery node to operation
  • Loading branch information
hackergin committed Jan 2, 2025
1 parent 4a09fad commit ff460b5
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.apache.flink.table.operations.materializedtable;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
import org.apache.flink.table.operations.QueryOperation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;

/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
@Internal
public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation {

protected final ObjectIdentifier tableIdentifier;

private final QueryOperation queryOperation;

public AlterMaterializedTableAsQueryOperation(
ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
super(tableIdentifier);
this.tableIdentifier = tableIdentifier;
this.queryOperation = queryOperation;
}

public QueryOperation getQueryOperation() {
return queryOperation;
}

@Override
public TableResultInternal execute(Context ctx) {
ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
throw new ValidationException(
String.format(
"Table %s is not a materialized table, does not support materialized table related operation.",
tableIdentifier));
}

ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
(ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;

// validate new schema and derived origin primary key and watermark spec
ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema();
ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema();
List<TableChange> tableChanges =
validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream()
.map(TableChange::add)
.collect(Collectors.toList());
ResolvedSchema newResolvedSchema =
new ResolvedSchema(
resolvedQuerySchema.getColumns(),
oldResolvedSchema.getWatermarkSpecs(),
oldResolvedSchema.getPrimaryKey().orElse(null));
Schema newSchema = Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build();

// update schema and definition query
String definitionQuery = queryOperation.asSerializableString();
CatalogMaterializedTable catalogMaterializedTable =
oldResolvedMaterializedTable.getOrigin().copy(newSchema, definitionQuery);

ResolvedCatalogMaterializedTable newResolvedMaterializedTable =
new ResolvedCatalogMaterializedTable(catalogMaterializedTable, newResolvedSchema);
ctx.getCatalogManager()
.alterTable(newResolvedMaterializedTable, tableChanges, tableIdentifier, false);

return TableResultImpl.TABLE_RESULT_OK;
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("identifier", tableIdentifier);
return OperationUtils.formatWithChildren(
"ALTER MATERIALIZED TABLE",
params,
Collections.singletonList(queryOperation),
Operation::asSummaryString);
}

private List<Column> validateAndExtractNewColumns(
ResolvedSchema oldSchema, ResolvedSchema newSchema) {
List<Column> newAddedColumns = new ArrayList<>();

if (oldSchema.getColumns().size() > newSchema.getColumns().size()) {
throw new ValidationException(
"Cannot alter table. The new schema has fewer columns than the original schema.");
}

for (int i = 0; i < oldSchema.getColumns().size(); i++) {
Column oldColumn = oldSchema.getColumns().get(i);
Column newColumn = newSchema.getColumns().get(i);
if (!oldColumn.equals(newColumn)) {
throw new ValidationException(
String.format(
"Cannot alter table. The schema of existing column %s has changed.",
oldColumn.getName()));
}
}
for (int i = oldSchema.getColumns().size(); i < newSchema.getColumns().size(); i++) {
Column newColumn = newSchema.getColumns().get(i);
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
}
return newAddedColumns;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;

import org.apache.calcite.sql.SqlNode;

/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
public class SqlAlterMaterializedTableAsQueryConverter
implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {

@Override
public Operation convertSqlNode(
SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
ConvertContext context) {
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(sqlAlterMaterializedTableAsQuery.fullTableName());
ObjectIdentifier identifier =
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);

// get query schema and definition query
SqlNode validateQuery =
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
PlannerQueryOperation queryOperation =
new PlannerQueryOperation(
context.toRelRoot(validateQuery).project(),
() -> context.toQuotedSqlString(validateQuery));
return new AlterMaterializedTableAsQueryOperation(identifier, queryOperation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class SqlNodeConverters {
register(new SqlAlterMaterializedTableRefreshConverter());
register(new SqlAlterMaterializedTableSuspendConverter());
register(new SqlAlterMaterializedTableResumeConverter());
register(new SqlAlterMaterializedTableAsQueryConverter());
register(new SqlDropMaterializedTableConverter());
register(new SqlShowTablesConverter());
register(new SqlShowViewsConverter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
Expand Down Expand Up @@ -391,6 +392,17 @@ void testAlterMaterializedTableResume() {
.isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])");
}

@Test
void testAlterMaterializedTableAsQuery() {
final String sql = "ALTER MATERIALIZED TABLE mtbl1 AS SELECT * FROM t1";
Operation operation = parse(sql);
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
assertThat(operation.asSummaryString())
.isEqualTo(
"ALTER MATERIALIZED TABLE: (identifier: [`builtin`.`default`.`mtbl1`])\n"
+ " PlannerNode:");
}

@Test
void testDropMaterializedTable() {
final String sql = "DROP MATERIALIZED TABLE mtbl1";
Expand Down

0 comments on commit ff460b5

Please sign in to comment.