Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc] paimon cdc database sync support computed columns #4193

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public static List<ComputedColumn> buildComputedColumns(
return buildComputedColumns(computedColumnArgs, physicFields, true);
}

/** The caseSensitive only affects check. We don't change field names at building phase. */
/**
* Filter computed columns based on table columns, The caseSensitive only affects check. We
* don't change field names at building phase.
*/
public static List<ComputedColumn> buildComputedColumns(
List<String> computedColumnArgs, List<DataField> physicFields, boolean caseSensitive) {
Map<String, DataType> typeMapping =
Expand Down Expand Up @@ -69,12 +72,15 @@ public static List<ComputedColumn> buildComputedColumns(
String[] args = expression.substring(left + 1, right).split(",");
checkArgument(args.length >= 1, "Computed column needs at least one argument.");

computedColumns.add(
new ComputedColumn(
columnName,
Expression.create(typeMapping, caseSensitive, exprName, args)));
String fieldReference = caseSensitive ? args[0] : args[0].toLowerCase();
// The cast function is constant function and doesn't reference the columns of the table
if ("cast".equals(exprName) || typeMapping.containsKey(fieldReference)) {
computedColumns.add(
new ComputedColumn(
columnName,
Expression.create(typeMapping, caseSensitive, exprName, args)));
}
}

return computedColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -59,8 +58,7 @@ public static Schema getSchema(
int retry = 0;
int retryInterval = 1000;

AbstractRecordParser recordParser =
dataFormat.createParser(typeMapping, Collections.emptyList());
AbstractRecordParser recordParser = dataFormat.createParser(typeMapping);

while (true) {
Optional<Schema> schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -58,6 +57,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
@Nullable protected String excludingTables;
protected List<FileStoreTable> tables = new ArrayList<>();
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();
protected List<String> computedColumnArgs = new ArrayList<>();
protected Map<String, List<ComputedColumn>> computedColumnsMap = new HashMap<>();

public SyncDatabaseActionBase(
String warehouse,
Expand Down Expand Up @@ -119,6 +120,11 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
return this;
}

public SyncDatabaseActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
this.computedColumnArgs = computedColumnArgs;
return this;
}

@Override
protected void validateCaseSensitivity() {
Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
Expand All @@ -128,8 +134,7 @@ protected void validateCaseSensitivity() {

@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
Collections.emptyList(), typeMapping, metadataConverters);
return syncJobHandler.provideRecordParser(typeMapping, metadataConverters);
}

public SyncDatabaseActionBase withPartitionKeyMultiple(
Expand Down Expand Up @@ -161,13 +166,15 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
} catch (Catalog.DatabaseNotExistException e) {
throw new RuntimeException(e);
}
List<String> computedColumnArgs = this.computedColumnArgs;
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
tableNameConverter,
createdTables);
createdTables,
computedColumnArgs);
}

@Override
Expand All @@ -182,6 +189,7 @@ protected void buildSink(
.withTables(tables)
.withMode(mode)
.withTableOptions(tableConfig)
.withComputedColumnArgs(computedColumnArgs)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;

import java.util.ArrayList;
import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
Expand Down Expand Up @@ -69,5 +71,10 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}

if (params.has(COMPUTED_COLUMN)) {
action.withComputedColumnArgs(
new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;

import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.KAFKA_CONF;
Expand Down Expand Up @@ -197,22 +196,18 @@ public void checkRequiredOption() {
}

public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
TypeMapping typeMapping, CdcMetadataConverter[] metadataConverters) {
switch (sourceType) {
case MYSQL:
return new MySqlRecordParser(
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
return new MySqlRecordParser(cdcSourceConfig, typeMapping, metadataConverters);
case POSTGRES:
return new PostgresRecordParser(
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
return new PostgresRecordParser(cdcSourceConfig, typeMapping, metadataConverters);
case KAFKA:
case PULSAR:
DataFormat dataFormat = provideDataFormat();
return dataFormat.createParser(typeMapping, computedColumns);
return dataFormat.createParser(typeMapping);
case MONGODB:
return new MongoDBRecordParser(computedColumns, cdcSourceConfig);
return new MongoDBRecordParser(cdcSourceConfig);
default:
throw new UnsupportedOperationException("Unknown source type " + sourceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ protected void beforeBuildingSourceSink() throws Exception {

@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(computedColumns, typeMapping, metadataConverters);
return syncJobHandler.provideRecordParser(typeMapping, metadataConverters);
}

@Override
protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
boolean caseSensitive = this.allowUpperCase;
return () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
List<String> computedColumnArgs = this.computedColumnArgs;
return () -> new RichCdcMultiplexRecordEventParser(caseSensitive, computedColumnArgs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public String convert(Identifier originIdentifier) {
+ originIdentifier.getObjectName();
return convert(rawName);
}

public boolean isCaseSensitive() {
return caseSensitive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.List;
import java.util.function.Function;

/** Data format common implementation of {@link DataFormat}. */
Expand All @@ -44,9 +42,8 @@ public abstract class AbstractDataFormat implements DataFormat {
pulsarDeserializer();

@Override
public AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
return parser().createParser(typeMapping, computedColumns);
public AbstractRecordParser createParser(TypeMapping typeMapping) {
return parser().createParser(typeMapping);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -63,8 +62,8 @@ public abstract class AbstractJsonRecordParser extends AbstractRecordParser {

protected JsonNode root;

public AbstractJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
public AbstractJsonRecordParser(TypeMapping typeMapping) {
super(typeMapping);
}

protected void setRoot(CdcSourceRecord record) {
Expand Down Expand Up @@ -103,7 +102,6 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
}
return Objects.toString(entry.getValue());
}));
evalComputedColumns(rowData, rowTypeBuilder);
return rowData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -55,11 +53,9 @@ public abstract class AbstractRecordParser
protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;

public AbstractRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
public AbstractRecordParser(TypeMapping typeMapping) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
}

@Nullable
Expand Down Expand Up @@ -112,18 +108,6 @@ protected boolean isDDL() {

protected abstract List<String> extractPrimaryKeys();

/** generate values for computed columns. */
protected void evalComputedColumns(
Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
computedColumns.forEach(
computedColumn -> {
rowData.put(
computedColumn.columnName(),
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
rowTypeBuilder.field(computedColumn.columnName(), computedColumn.columnType());
});
}

/** Handle case sensitivity here. */
protected RichCdcMultiplexRecord createRecord(
RowKind rowKind, Map<String, String> data, List<DataField> paimonFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.List;

/**
* Supports the message queue's data format and provides definitions for the message queue's record
* deserialization class and parsing class {@link AbstractRecordParser}.
Expand All @@ -38,11 +35,9 @@ public interface DataFormat {
* Creates a new instance of {@link AbstractRecordParser} for this data format with the
* specified configurations.
*
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link AbstractRecordParser}.
*/
AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns);
AbstractRecordParser createParser(TypeMapping typeMapping);

KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;

import java.util.List;

/**
* Represents a factory for creating instances of {@link AbstractRecordParser}.
*
Expand All @@ -38,9 +35,7 @@ public interface RecordParserFactory {
* Creates a new instance of {@link AbstractRecordParser} with the specified configurations.
*
* @param typeMapping Data type mapping options.
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link AbstractRecordParser}.
*/
AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns);
AbstractRecordParser createParser(TypeMapping typeMapping);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action.cdc.format.canal;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
Expand Down Expand Up @@ -84,8 +83,8 @@ protected boolean isDDL() {
return !isNull(node) && node.asBoolean();
}

public CanalRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(typeMapping, computedColumns);
public CanalRecordParser(TypeMapping typeMapping) {
super(typeMapping);
}

@Override
Expand Down Expand Up @@ -179,8 +178,6 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null));
}
}

evalComputedColumns(rowData, rowTypeBuilder);
return rowData;
}

Expand Down
Loading
Loading