Skip to content

Commit

Permalink
[FLINK-36993][table] Support converting ALTER MATERIALIZED TABLE AS n…
Browse files Browse the repository at this point in the history
…ode to operation
  • Loading branch information
hackergin committed Jan 6, 2025
1 parent e0cfa9b commit 81dfefe
Show file tree
Hide file tree
Showing 5 changed files with 461 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.materializedtable;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
@Internal
public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation {

private final List<TableChange> columnChanges;

private final TableChange definitionQueryChange;

private final CatalogMaterializedTable newMaterializedTable;

public AlterMaterializedTableAsQueryOperation(
ObjectIdentifier tableIdentifier,
List<TableChange> columnChanges,
TableChange definitionQueryChange,
CatalogMaterializedTable newMaterializedTable) {
super(tableIdentifier);
this.columnChanges = columnChanges;
this.definitionQueryChange = definitionQueryChange;
this.newMaterializedTable = newMaterializedTable;
}

public List<TableChange> getColumnChanges() {
return columnChanges;
}

public TableChange getDefinitionQueryChange() {
return definitionQueryChange;
}

public CatalogMaterializedTable getNewMaterializedTable() {
return newMaterializedTable;
}

@Override
public TableResultInternal execute(Context ctx) {
throw new UnsupportedOperationException(
"AlterMaterializedTableAsQueryOperation doesn't support ExecutableOperation yet.");
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("identifier", tableIdentifier);
params.put("columnChanges", columnChanges);
params.put("definitionQueryChange", definitionQueryChange);

return OperationUtils.formatWithChildren(
"ALTER MATERIALIZED TABLE",
params,
Collections.emptyList(),
Operation::asSummaryString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Arrays;
import java.util.Objects;

/** {@link TableChange} represents the modification of the table. */
/** {@link TableChange} represents the modification of the {@link CatalogBaseTable}. */
@PublicEvolving
public interface TableChange {

Expand Down Expand Up @@ -380,6 +380,16 @@ static ModifyRefreshHandler modifyRefreshHandler(
return new ModifyRefreshHandler(refreshHandlerDesc, refreshHandlerBytes);
}

/**
* A table change to modify materialized table definition query.
*
* @param definitionQuery the modified definition query.
* @return a TableChange represents the modification.
*/
static ModifyDefinitionQuery modifyDefinitionQuery(String definitionQuery) {
return new ModifyDefinitionQuery(definitionQuery);
}

// --------------------------------------------------------------------------------------------
// Add Change
// --------------------------------------------------------------------------------------------
Expand All @@ -394,7 +404,7 @@ static ModifyRefreshHandler modifyRefreshHandler(
* </pre>
*/
@PublicEvolving
class AddColumn implements TableChange {
class AddColumn implements CatalogTableChange, MaterializedTableChange {

private final Column column;
private final ColumnPosition position;
Expand Down Expand Up @@ -447,7 +457,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class AddUniqueConstraint implements TableChange {
class AddUniqueConstraint implements CatalogTableChange {

private final UniqueConstraint constraint;

Expand Down Expand Up @@ -493,7 +503,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class AddDistribution implements TableChange {
class AddDistribution implements CatalogTableChange {

private final TableDistribution distribution;

Expand Down Expand Up @@ -539,7 +549,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class AddWatermark implements TableChange {
class AddWatermark implements CatalogTableChange {

private final WatermarkSpec watermarkSpec;

Expand Down Expand Up @@ -601,7 +611,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class ModifyColumn implements TableChange {
class ModifyColumn implements CatalogTableChange {

protected final Column oldColumn;
protected final Column newColumn;
Expand Down Expand Up @@ -848,7 +858,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class ModifyUniqueConstraint implements TableChange {
class ModifyUniqueConstraint implements CatalogTableChange {

private final UniqueConstraint newConstraint;

Expand Down Expand Up @@ -894,7 +904,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class ModifyDistribution implements TableChange {
class ModifyDistribution implements CatalogTableChange {

private final TableDistribution distribution;

Expand Down Expand Up @@ -940,7 +950,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class ModifyWatermark implements TableChange {
class ModifyWatermark implements CatalogTableChange {

private final WatermarkSpec newWatermark;

Expand Down Expand Up @@ -990,7 +1000,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class DropColumn implements TableChange {
class DropColumn implements CatalogTableChange {

private final String columnName;

Expand Down Expand Up @@ -1036,7 +1046,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class DropWatermark implements TableChange {
class DropWatermark implements CatalogTableChange {
static final DropWatermark INSTANCE = new DropWatermark();

@Override
Expand All @@ -1055,7 +1065,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class DropConstraint implements TableChange {
class DropConstraint implements CatalogTableChange {

private final String constraintName;

Expand Down Expand Up @@ -1101,7 +1111,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class DropDistribution implements TableChange {
class DropDistribution implements CatalogTableChange {
static final DropDistribution INSTANCE = new DropDistribution();

@Override
Expand All @@ -1124,7 +1134,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class SetOption implements TableChange {
class SetOption implements CatalogTableChange {

private final String key;
private final String value;
Expand Down Expand Up @@ -1177,7 +1187,7 @@ public String toString() {
* </pre>
*/
@PublicEvolving
class ResetOption implements TableChange {
class ResetOption implements CatalogTableChange {

private final String key;

Expand Down Expand Up @@ -1279,6 +1289,13 @@ public String toString() {
}
}

// --------------------------------------------------------------------------------------------
// Catalog table change
// --------------------------------------------------------------------------------------------
/** {@link CatalogTableChange} represents the modification of the CatalogTable. */
@PublicEvolving
interface CatalogTableChange extends TableChange {}

// --------------------------------------------------------------------------------------------
// Materialized table change
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1374,4 +1391,24 @@ public String toString() {
+ '}';
}
}

/* A table change to modify the definition query. */
@PublicEvolving
class ModifyDefinitionQuery implements MaterializedTableChange {

private final String definitionQuery;

public ModifyDefinitionQuery(String definitionQuery) {
this.definitionQuery = definitionQuery;
}

public String getDefinitionQuery() {
return definitionQuery;
}

@Override
public String toString() {
return "ModifyDefinitionQuery{" + "definitionQuery='" + definitionQuery + '\'' + '}';
}
}
}
Loading

0 comments on commit 81dfefe

Please sign in to comment.