Skip to content

Commit

Permalink
[FLINK-XXX][table] Support alter materialized table as query
Browse files Browse the repository at this point in the history
  • Loading branch information
hackergin committed Jan 2, 2025
1 parent ff460b5 commit f2205e0
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.operations.command.DescribeJobOperation;
import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
Expand Down Expand Up @@ -174,6 +175,9 @@ public ResultFetcher callMaterializedTableOperation(
} else if (op instanceof DropMaterializedTableOperation) {
return callDropMaterializedTableOperation(
operationExecutor, handle, (DropMaterializedTableOperation) op);
} else if (op instanceof AlterMaterializedTableAsQueryOperation) {
return callAlterMaterializedTableAsQueryOperation(
operationExecutor, handle, (AlterMaterializedTableAsQueryOperation) op);
}

throw new SqlExecutionException(
Expand Down Expand Up @@ -804,6 +808,78 @@ protected static String getRefreshStatement(
return insertStatement.toString();
}

private ResultFetcher callAlterMaterializedTableAsQueryOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
AlterMaterializedTableAsQueryOperation op) {
ObjectIdentifier tableIdentifier = op.getTableIdentifier();
CatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

// 1. suspend the materialized table
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
!= materializedTable.getRefreshStatus()) {
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
== materializedTable.getRefreshMode()) {
suspendContinuousRefreshJob(
operationExecutor, handle, tableIdentifier, materializedTable);
} else {
suspendRefreshWorkflow(
operationExecutor, handle, tableIdentifier, materializedTable);
}
}

// 2. replace query definition and resume the materialized table
// alter materialized table schema
operationExecutor.callExecutableOperation(handle, op);
ResolvedCatalogMaterializedTable updatedMaterializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

// 3. resume the materialized table
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
!= materializedTable.getRefreshStatus()) {
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
== materializedTable.getRefreshMode()) {
executeContinuousRefreshJob(
operationExecutor,
handle,
updatedMaterializedTable,
tableIdentifier,
Collections.emptyMap(),
Optional.empty());
} else {
// resume workflow
resumeRefreshWorkflow(
operationExecutor,
handle,
tableIdentifier,
updatedMaterializedTable,
Collections.emptyMap());
}
} else {
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
== materializedTable.getRefreshMode()) {
// we should reset the savepoint path after the alter operation
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(
materializedTable.getSerializedRefreshHandler());
ContinuousRefreshHandler resetHandler =
new ContinuousRefreshHandler(
refreshHandler.getExecutionTarget(), refreshHandler.getJobId());
updateRefreshHandler(
operationExecutor,
handle,
tableIdentifier,
updatedMaterializedTable,
updatedMaterializedTable.getRefreshStatus(),
resetHandler.asSummaryString(),
serializeContinuousHandler(resetHandler));
}
}

return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
}

private ResultFetcher callDropMaterializedTableOperation(
OperationExecutor operationExecutor,
OperationHandle handle,
Expand Down
Loading

0 comments on commit f2205e0

Please sign in to comment.