Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36994][table] Support ALTER MATERIALIZED TABLE As <Query> statement #25880

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

hackergin
Copy link
Contributor

What is the purpose of the change

Add support query modification for materialized table

Brief change log

**

  • Add support parse materialized table query modification statements
  • Add support convert materialized table node to operation
  • Add support execution of query modification of materialized table

Verifying this change

Some unit test and ITCase will be added in MaterializedTableStatementITCase to verify these changes.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (Will be added in a separated pr)

@hackergin hackergin changed the title Drat: [FLIP-492] Support query modification for materialized table Draft: [FLIP-492] Support query modification for materialized table Jan 2, 2025
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 2, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hackergin Thanks for your contribution, I left some comments.

}
}
for (int i = oldSchema.getColumns().size(); i < newSchema.getColumns().size(); i++) {
Column newColumn = newSchema.getColumns().get(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the new added column is duplicated with old column, what behavior is? I think you should a test case to verify it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my testing, if duplicate field names appear in a query, the latter field names will automatically have suffixes added, which seems consistent with the logic of the Calcite framework. For example, if there are two fields named a, the second a will automatically be renamed to a0.

@hackergin hackergin changed the title Draft: [FLIP-492] Support query modification for materialized table [FLINK-36993][table] Support ALTER MATERIALIZED TABLE As <Query> statement Jan 5, 2025
@hackergin hackergin force-pushed the FLIP-492-poc branch 2 times, most recently from 90c372c to a204971 Compare January 6, 2025 03:14
@hackergin hackergin changed the title [FLINK-36993][table] Support ALTER MATERIALIZED TABLE As <Query> statement [FLINK-36994][table] Support ALTER MATERIALIZED TABLE As <Query> statement Jan 6, 2025
Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating, I left some comments.

@@ -1374,4 +1391,24 @@ public String toString() {
+ '}';
}
}

/* A table change to modify the definition query. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* A table change to modify the definition query. */
/** A table change to modify the definition query. */

@@ -394,7 +404,7 @@ static ModifyRefreshHandler modifyRefreshHandler(
* </pre>
*/
@PublicEvolving
class AddColumn implements TableChange {
class AddColumn implements CatalogTableChange, MaterializedTableChange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please give more java docs to explain it also support materialized table

@Internal
public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation {

private final List<TableChange> columnChanges;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use MaterializedTableChange


private final List<TableChange> columnChanges;

private final TableChange definitionQueryChange;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put it the above list, If you need it, you can filter it from the list.

"ALTER MATERIALIZED TABLE",
params,
Collections.emptyList(),
Operation::asSummaryString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need to print the definition query here.

TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange;
rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName()));
} else if (tableChange instanceof TableChange.ModifyRefreshHandler) {
rollbackChanges.add(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, we should use the refresh handler of rollback refresh job

.isEqualTo(
Collections.singletonList(
Column.physical("order_amount_sum", DataTypes.INT())));
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should verify the definition query is equals.

.isEqualTo(
Collections.singletonList(
Column.physical("order_amount_sum", DataTypes.INT())));
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
.isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema()))
.isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT())));
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants