Skip to content

Commit

Permalink
[FLINK-36993][table] Support executing ALTER MATERIALIZED TABLE AS op…
Browse files Browse the repository at this point in the history
…eration
  • Loading branch information
hackergin committed Jan 6, 2025
1 parent 81dfefe commit a204971
Show file tree
Hide file tree
Showing 2 changed files with 433 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
Expand Down Expand Up @@ -174,6 +175,9 @@ public ResultFetcher callMaterializedTableOperation(
} else if (op instanceof DropMaterializedTableOperation) {
return callDropMaterializedTableOperation(
operationExecutor, handle, (DropMaterializedTableOperation) op);
} else if (op instanceof AlterMaterializedTableAsQueryOperation) {
return callAlterMaterializedTableAsQueryOperation(
operationExecutor, handle, (AlterMaterializedTableAsQueryOperation) op);
}

throw new SqlExecutionException(
Expand Down Expand Up @@ -804,6 +808,114 @@ protected static String getRefreshStatement(
return insertStatement.toString();
}

private ResultFetcher callAlterMaterializedTableAsQueryOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableAsQueryOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable oldMaterializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) {
// direct apply the alter operation
List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
changes.add(op.getDefinitionQueryChange());
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, changes, op.getNewMaterializedTable());
return operationExecutor.callExecutableOperation(
handle, alterMaterializedTableChangeOperation);
}

// 1. suspend the materialized table
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
!= oldMaterializedTable.getRefreshStatus()) {
suspendContinuousRefreshJob(
operationExecutor, handle, tableIdentifier, oldMaterializedTable);
}

// 2. replace query definition and resume the materialized table
// alter materialized table schema & query definition & refresh handler
// we should reset the savepoint path after the alter operation
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
ContinuousRefreshHandler resetHandler =
new ContinuousRefreshHandler(
refreshHandler.getExecutionTarget(), refreshHandler.getJobId());
byte[] serializedBytes = serializeContinuousHandler(resetHandler);

List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges());
tableChanges.add(op.getDefinitionQueryChange());
tableChanges.add(
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes));
CatalogMaterializedTable updatedMaterializedTable =
op.getNewMaterializedTable()
.copy(
op.getNewMaterializedTable().getRefreshStatus(),
resetHandler.asSummaryString(),
serializedBytes);
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
new AlterMaterializedTableChangeOperation(
tableIdentifier, tableChanges, updatedMaterializedTable);
operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation);

// 3. resume the materialized table
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
!= oldMaterializedTable.getRefreshStatus()) {
try {
executeContinuousRefreshJob(
operationExecutor,
handle,
updatedMaterializedTable,
tableIdentifier,
Collections.emptyMap(),
Optional.empty());
} catch (Exception e) {
// rollback the alter operation
LOG.warn(
"Failed to resume the continuous refresh job for materialized table {}, rollback the alter operation.",
tableIdentifier,
e);
AlterMaterializedTableChangeOperation rollbackChangeOperation =
generateRollbackAlterMaterializedTableOperation(
oldMaterializedTable, alterMaterializedTableChangeOperation);
operationExecutor.callExecutableOperation(handle, rollbackChangeOperation);

throw new SqlExecutionException(
String.format(
"Failed to Alter Materialized Table As Query Operation for materialized table %s.",
tableIdentifier),
e);
}
}

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation(
CatalogMaterializedTable oldMaterializedTable,
AlterMaterializedTableChangeOperation op) {
List<TableChange> tableChanges = op.getTableChanges();
List<TableChange> rollbackChanges = new ArrayList<>();

for (TableChange tableChange : tableChanges) {
if (tableChange instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange;
rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName()));
} else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
rollbackChanges.add(
TableChange.modifyRefreshHandler(
oldMaterializedTable.getRefreshHandlerDescription().orElse(null),
oldMaterializedTable.getSerializedRefreshHandler()));
} else {
throw new ValidationException("Unsupported table change type.");
}
}

return new AlterMaterializedTableChangeOperation(
op.getTableIdentifier(), rollbackChanges, oldMaterializedTable);
}

private ResultFetcher callDropMaterializedTableOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
Expand Down
Loading

0 comments on commit a204971

Please sign in to comment.