Skip to content

Commit

Permalink
[cdc] Add the latest_schema state at schema evolution operator to red…
Browse files Browse the repository at this point in the history
…uce the latest schema access frequency (#4535)
  • Loading branch information
GangYang-HX authored Nov 28, 2024
1 parent fdcdd09 commit 2f93b7b
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.types;

import java.util.Objects;

/** Used to indicate the uniqueness of a field. */
public class FieldIdentifier {
private String name;
private DataType type;
private String description;

public FieldIdentifier(DataField dataField) {
this.name = dataField.name();
this.type = dataField.type();
this.description = dataField.description();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FieldIdentifier field = (FieldIdentifier) o;
return Objects.equals(name, field.name)
&& Objects.equals(type, field.type)
&& Objects.equals(description, field.description);
}

@Override
public int hashCode() {
return Objects.hash(name, type, description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.FieldIdentifier;
import org.apache.paimon.types.RowType;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link ProcessFunction} to handle schema changes. New schema is represented by a list of {@link
Expand All @@ -43,19 +50,51 @@ public class UpdatedDataFieldsProcessFunction

private final Identifier identifier;

private Set<FieldIdentifier> latestFields;

public UpdatedDataFieldsProcessFunction(
SchemaManager schemaManager, Identifier identifier, Catalog.Loader catalogLoader) {
super(catalogLoader);
this.schemaManager = schemaManager;
this.identifier = identifier;
this.latestFields = new HashSet<>();
}

@Override
public void processElement(
List<DataField> updatedDataFields, Context context, Collector<Void> collector)
throws Exception {
for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, updatedDataFields)) {
List<DataField> actualUpdatedDataFields =
updatedDataFields.stream()
.filter(
dataField ->
!latestDataFieldContain(new FieldIdentifier(dataField)))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
return;
}
for (SchemaChange schemaChange :
extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
/**
* Here, actualUpdatedDataFields cannot be used to update latestFields because there is a
* non-SchemaChange.AddColumn scenario. Otherwise, the previously existing fields cannot be
* modified again.
*/
updateLatestFields();
}

private boolean latestDataFieldContain(FieldIdentifier dataField) {
return latestFields.stream().anyMatch(previous -> Objects.equals(previous, dataField));
}

private void updateLatestFields() {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Set<FieldIdentifier> fieldIdentifiers =
oldRowType.getFields().stream()
.map(item -> new FieldIdentifier(item))
.collect(Collectors.toSet());
latestFields = fieldIdentifiers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.VarCharType;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;

/** Used to test schema evolution related logic. */
public class SchemaEvolutionTest extends TableTestBase {

private static List<List<DataField>> prepareData() {
List<DataField> upField1 =
Arrays.asList(
new DataField(0, "col_0", new VarCharType(), "test description."),
new DataField(1, "col_1", new IntType(), "test description."),
new DataField(2, "col_2", new IntType(), "test description."),
new DataField(3, "col_3", new VarCharType(), "Someone's desc."),
new DataField(4, "col_4", new VarCharType(), "Someone's desc."),
new DataField(5, "col_5", new VarCharType(), "Someone's desc."),
new DataField(6, "col_6", new DecimalType(), "Someone's desc."),
new DataField(7, "col_7", new VarCharType(), "Someone's desc."),
new DataField(8, "col_8", new VarCharType(), "Someone's desc."),
new DataField(9, "col_9", new VarCharType(), "Someone's desc."),
new DataField(10, "col_10", new VarCharType(), "Someone's desc."),
new DataField(11, "col_11", new VarCharType(), "Someone's desc."),
new DataField(12, "col_12", new DoubleType(), "Someone's desc."),
new DataField(13, "col_13", new VarCharType(), "Someone's desc."),
new DataField(14, "col_14", new VarCharType(), "Someone's desc."),
new DataField(15, "col_15", new VarCharType(), "Someone's desc."),
new DataField(16, "col_16", new VarCharType(), "Someone's desc."),
new DataField(17, "col_17", new VarCharType(), "Someone's desc."),
new DataField(18, "col_18", new VarCharType(), "Someone's desc."),
new DataField(19, "col_19", new VarCharType(), "Someone's desc."),
new DataField(20, "col_20", new VarCharType(), "Someone's desc."));
List<DataField> upField2 =
Arrays.asList(
new DataField(0, "col_0", new VarCharType(), "test description."),
new DataField(1, "col_1", new BigIntType(), "test description."),
new DataField(2, "col_2", new IntType(), "test description."),
new DataField(3, "col_3", new VarCharType(), "Someone's desc."),
new DataField(4, "col_4", new VarCharType(), "Someone's desc."),
new DataField(5, "col_5", new VarCharType(), "Someone's desc."),
new DataField(6, "col_6", new DecimalType(), "Someone's desc."),
new DataField(7, "col_7", new VarCharType(), "Someone's desc."),
new DataField(8, "col_8", new VarCharType(), "Someone's desc."),
new DataField(9, "col_9", new VarCharType(), "Someone's desc."),
new DataField(10, "col_10", new VarCharType(), "Someone's desc."),
new DataField(11, "col_11", new VarCharType(), "Someone's desc."),
new DataField(12, "col_12", new DoubleType(), "Someone's desc."),
new DataField(13, "col_13", new VarCharType(), "Someone's desc."),
new DataField(14, "col_14", new VarCharType(), "Someone's desc."),
new DataField(15, "col_15", new VarCharType(), "Someone's desc."),
new DataField(16, "col_16", new VarCharType(), "Someone's desc."),
new DataField(17, "col_17", new VarCharType(), "Someone's desc."),
new DataField(18, "col_18", new VarCharType(), "Someone's desc."),
new DataField(19, "col_19", new VarCharType(), "Someone's desc."),
new DataField(20, "col_20", new VarCharType(), "Someone's desc."));
List<DataField> upField3 =
Arrays.asList(
new DataField(0, "col_0", new VarCharType(), "test description."),
new DataField(1, "col_1", new BigIntType(), "test description."),
new DataField(2, "col_2", new IntType(), "test description 2."),
new DataField(3, "col_3", new VarCharType(), "Someone's desc."),
new DataField(4, "col_4", new VarCharType(), "Someone's desc."),
new DataField(5, "col_5", new VarCharType(), "Someone's desc."),
new DataField(6, "col_6", new DecimalType(), "Someone's desc."),
new DataField(7, "col_7", new VarCharType(), "Someone's desc."),
new DataField(8, "col_8", new VarCharType(), "Someone's desc."),
new DataField(9, "col_9", new VarCharType(), "Someone's desc."),
new DataField(10, "col_10", new VarCharType(), "Someone's desc."),
new DataField(11, "col_11", new VarCharType(), "Someone's desc."),
new DataField(12, "col_12", new DoubleType(), "Someone's desc."),
new DataField(13, "col_13", new VarCharType(), "Someone's desc."),
new DataField(14, "col_14", new VarCharType(), "Someone's desc."),
new DataField(15, "col_15", new VarCharType(), "Someone's desc."),
new DataField(16, "col_16", new VarCharType(), "Someone's desc."),
new DataField(17, "col_17", new VarCharType(), "Someone's desc."),
new DataField(18, "col_18", new VarCharType(), "Someone's desc."),
new DataField(19, "col_19", new VarCharType(), "Someone's desc."),
new DataField(20, "col_20", new VarCharType(), "Someone's desc."));
List<DataField> upField4 =
Arrays.asList(
new DataField(0, "col_0", new VarCharType(), "test description."),
new DataField(1, "col_1", new BigIntType(), "test description."),
new DataField(2, "col_2", new IntType(), "test description."),
new DataField(3, "col_3_1", new VarCharType(), "Someone's desc."),
new DataField(4, "col_4", new VarCharType(), "Someone's desc."),
new DataField(5, "col_5", new VarCharType(), "Someone's desc."),
new DataField(6, "col_6", new DecimalType(), "Someone's desc."),
new DataField(7, "col_7", new VarCharType(), "Someone's desc."),
new DataField(8, "col_8", new VarCharType(), "Someone's desc."),
new DataField(9, "col_9", new VarCharType(), "Someone's desc."),
new DataField(10, "col_10", new VarCharType(), "Someone's desc."),
new DataField(11, "col_11", new VarCharType(), "Someone's desc."),
new DataField(12, "col_12", new DoubleType(), "Someone's desc."),
new DataField(13, "col_13", new VarCharType(), "Someone's desc."),
new DataField(14, "col_14", new VarCharType(), "Someone's desc."),
new DataField(15, "col_15", new VarCharType(), "Someone's desc."),
new DataField(16, "col_16", new VarCharType(), "Someone's desc."),
new DataField(17, "col_17", new VarCharType(), "Someone's desc."),
new DataField(18, "col_18", new VarCharType(), "Someone's desc."),
new DataField(19, "col_19", new VarCharType(), "Someone's desc."),
new DataField(20, "col_20", new VarCharType(), "Someone's desc."));
List<DataField> upField5 =
Arrays.asList(
new DataField(0, "col_0", new VarCharType(), "test description."),
new DataField(1, "col_1", new BigIntType(), "test description."),
new DataField(2, "col_2_1", new BigIntType(), "test description 2."),
new DataField(3, "col_3", new VarCharType(), "Someone's desc."),
new DataField(4, "col_4", new VarCharType(), "Someone's desc."),
new DataField(5, "col_5", new VarCharType(), "Someone's desc."),
new DataField(6, "col_6", new DecimalType(), "Someone's desc."),
new DataField(7, "col_7", new VarCharType(), "Someone's desc."),
new DataField(8, "col_8", new VarCharType(), "Someone's desc."),
new DataField(9, "col_9", new VarCharType(), "Someone's desc."),
new DataField(10, "col_10", new VarCharType(), "Someone's desc."),
new DataField(11, "col_11", new VarCharType(), "Someone's desc."),
new DataField(12, "col_12", new DoubleType(), "Someone's desc."),
new DataField(13, "col_13", new VarCharType(), "Someone's desc."),
new DataField(14, "col_14", new VarCharType(), "Someone's desc."),
new DataField(15, "col_15", new VarCharType(), "Someone's desc."),
new DataField(16, "col_16", new VarCharType(), "Someone's desc."),
new DataField(17, "col_17", new VarCharType(), "Someone's desc."),
new DataField(18, "col_18", new VarCharType(), "Someone's desc."),
new DataField(19, "col_19", new VarCharType(), "Someone's desc."),
new DataField(20, "col_20", new VarCharType(), "Someone's desc."));
return Arrays.asList(upField1, upField2, upField3, upField4, upField5);
}

private FileStoreTable table;
private String tableName = "MyTable";

@BeforeEach
public void before() throws Exception {
FileIO fileIO = LocalFileIO.create();
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName));
Schema schema =
Schema.newBuilder()
.column("pk", DataTypes.INT())
.column("pt1", DataTypes.INT())
.column("pt2", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt1", "pt2")
.primaryKey("pk", "pt1", "pt2")
.option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
.option(CoreOptions.BUCKET.key(), "2")
.option(CoreOptions.SEQUENCE_FIELD.key(), "col1")
.build();
TableSchema tableSchema =
SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), schema);
table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema);
}

@Test
public void testSchemaEvolution() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<List<DataField>> upDataFieldStream = env.fromCollection(prepareData());
Options options = new Options();
options.set("warehouse", tempPath.toString());
final Catalog.Loader catalogLoader = () -> FlinkCatalogFactory.createPaimonCatalog(options);
Identifier identifier = Identifier.create(database, tableName);
DataStream<Void> schemaChangeProcessFunction =
upDataFieldStream
.process(
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(), table.location()),
identifier,
catalogLoader))
.name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
env.execute();
}
}

0 comments on commit 2f93b7b

Please sign in to comment.