Skip to content

[FLINK-36994][table] Support ALTER MATERIALIZED TABLE As <Query> statement #25880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
Expand All @@ -48,6 +49,7 @@
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
Expand Down Expand Up @@ -174,6 +176,9 @@ public ResultFetcher callMaterializedTableOperation(
} else if (op instanceof DropMaterializedTableOperation) {
return callDropMaterializedTableOperation(
operationExecutor, handle, (DropMaterializedTableOperation) op);
} else if (op instanceof AlterMaterializedTableAsQueryOperation) {
return callAlterMaterializedTableAsQueryOperation(
operationExecutor, handle, (AlterMaterializedTableAsQueryOperation) op);
}

throw new SqlExecutionException(
Expand Down Expand Up @@ -315,7 +320,7 @@ private ResultFetcher callAlterMaterializedTableSuspend(
return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private void suspendContinuousRefreshJob(
private CatalogMaterializedTable suspendContinuousRefreshJob(
OperationExecutor operationExecutor,
OperationHandle handle,
ObjectIdentifier tableIdentifier,
Expand All @@ -341,7 +346,7 @@ private void suspendContinuousRefreshJob(
refreshHandler.getJobId(),
savepointPath);

updateRefreshHandler(
return updateRefreshHandler(
operationExecutor,
handle,
tableIdentifier,
Expand Down Expand Up @@ -804,6 +809,164 @@ protected static String getRefreshStatement(
return insertStatement.toString();
}

private ResultFetcher callAlterMaterializedTableAsQueryOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableAsQueryOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable oldMaterializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) {
// directly apply the alter operation
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, op.getTableChanges(), op.getNewMaterializedTable());
return operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
}

if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
== oldMaterializedTable.getRefreshStatus()) {
// 1. suspend the materialized table
CatalogMaterializedTable suspendMaterializedTable =
suspendContinuousRefreshJob(
operationExecutor, handle, tableIdentifier, oldMaterializedTable);

// 2. alter materialized table schema & query definition
CatalogMaterializedTable updatedMaterializedTable =
op.getNewMaterializedTable()
.copy(
suspendMaterializedTable.getRefreshStatus(),
suspendMaterializedTable
.getRefreshHandlerDescription()
.orElse(null),
suspendMaterializedTable.getSerializedRefreshHandler());
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, op.getTableChanges(), updatedMaterializedTable);
operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);

// 3. resume the materialized table
try {
executeContinuousRefreshJob(
operationExecutor,
handle,
updatedMaterializedTable,
tableIdentifier,
Collections.emptyMap(),
Optional.empty());
} catch (Exception e) {
// Roll back the changes to the materialized table and restore the continuous
// refresh job
LOG.warn(
"Failed to start the continuous refresh job for materialized table {} using new query {}, rollback to origin query {}.",
tableIdentifier,
op.getNewMaterializedTable().getDefinitionQuery(),
suspendMaterializedTable.getDefinitionQuery(),
e);

AlterMaterializedTableChangeOperation rollbackChangeOperation =
generateRollbackAlterMaterializedTableOperation(
suspendMaterializedTable, alterMaterializedTableChangeOperation);
operationExecutor.callExecutableOperation(handle, rollbackChangeOperation);

ContinuousRefreshHandler continuousRefreshHandler =
deserializeContinuousHandler(
suspendMaterializedTable.getSerializedRefreshHandler());
executeContinuousRefreshJob(
operationExecutor,
handle,
suspendMaterializedTable,
tableIdentifier,
Collections.emptyMap(),
continuousRefreshHandler.getRestorePath());

throw new SqlExecutionException(
String.format(
"Failed to start the continuous refresh job using new query %s when altering materialized table %s select query.",
op.getNewMaterializedTable().getDefinitionQuery(), tableIdentifier),
e);
}
} else if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
== oldMaterializedTable.getRefreshStatus()) {
// alter schema & definition query & refresh handler (reset savepoint path of refresh
// handler)
List<MaterializedTableChange> tableChanges = new ArrayList<>(op.getTableChanges());
TableChange.ModifyRefreshHandler modifyRefreshHandler =
genereateResetSavepointTableChange(
oldMaterializedTable.getSerializedRefreshHandler());
tableChanges.add(modifyRefreshHandler);

CatalogMaterializedTable updatedMaterializedTable =
op.getNewMaterializedTable()
.copy(
oldMaterializedTable.getRefreshStatus(),
modifyRefreshHandler.getRefreshHandlerDesc(),
modifyRefreshHandler.getRefreshHandlerBytes());
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, tableChanges, updatedMaterializedTable);

operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
} else {
throw new SqlExecutionException(
String.format(
"Materialized table %s is being initialized and does not support alter operation.",
tableIdentifier));
}

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation(
CatalogMaterializedTable oldMaterializedTable,
AlterMaterializedTableChangeOperation op) {
List<MaterializedTableChange> tableChanges = op.getTableChanges();
List<MaterializedTableChange> rollbackChanges = new ArrayList<>();

for (TableChange tableChange : tableChanges) {
if (tableChange instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange;
rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName()));
} else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
rollbackChanges.add(
TableChange.modifyRefreshHandler(
oldMaterializedTable.getRefreshHandlerDescription().orElse(null),
oldMaterializedTable.getSerializedRefreshHandler()));
} else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
rollbackChanges.add(
TableChange.modifyDefinitionQuery(
oldMaterializedTable.getDefinitionQuery()));
} else {
throw new ValidationException(
String.format(
"Failed to generate rollback changes for materialized table '%s'. "
+ "Unsupported table change detected: %s. ",
op.getTableIdentifier(), tableChange));
}
}

return new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(), rollbackChanges, oldMaterializedTable);
}

private TableChange.ModifyRefreshHandler genereateResetSavepointTableChange(
byte[] serializedContinuousHandler) {
ContinuousRefreshHandler continuousRefreshHandler =
deserializeContinuousHandler(serializedContinuousHandler);
ContinuousRefreshHandler resetedRefreshHandler =
new ContinuousRefreshHandler(
continuousRefreshHandler.getExecutionTarget(),
continuousRefreshHandler.getJobId());

return TableChange.modifyRefreshHandler(
resetedRefreshHandler.asSummaryString(),
serializeContinuousHandler(resetedRefreshHandler));
}

private ResultFetcher callDropMaterializedTableOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
Expand Down Expand Up @@ -1018,7 +1181,7 @@ private ResolvedCatalogMaterializedTable getCatalogMaterializedTable(
return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
}

private void updateRefreshHandler(
private CatalogMaterializedTable updateRefreshHandler(
OperationExecutor operationExecutor,
OperationHandle operationHandle,
ObjectIdentifier materializedTableIdentifier,
Expand All @@ -1029,7 +1192,7 @@ private void updateRefreshHandler(
CatalogMaterializedTable updatedMaterializedTable =
catalogMaterializedTable.copy(
refreshStatus, refreshHandlerSummary, serializedRefreshHandler);
List<TableChange> tableChanges = new ArrayList<>();
List<MaterializedTableChange> tableChanges = new ArrayList<>();
tableChanges.add(TableChange.modifyRefreshStatus(refreshStatus));
tableChanges.add(
TableChange.modifyRefreshHandler(refreshHandlerSummary, serializedRefreshHandler));
Expand All @@ -1039,6 +1202,8 @@ private void updateRefreshHandler(
// update RefreshHandler to Catalog
operationExecutor.callExecutableOperation(
operationHandle, alterMaterializedTableChangeOperation);

return updatedMaterializedTable;
}

/** Generate insert statement for materialized table. */
Expand Down
Loading