From 530a30c98cb0507d724da888dd95a6c0e2e1b2f6 Mon Sep 17 00:00:00 2001 From: Daeho Kim Date: Tue, 1 Jun 2021 01:59:41 +0900 Subject: [PATCH] feat: add ChainedPartitioner --- .../partitioner/ChainedPartitioner.java | 145 ++++++++++++ .../partitioner/PartitionerConfig.java | 24 +- .../partitioner/ChainedPartitionerTest.java | 210 ++++++++++++++++++ 3 files changed, 377 insertions(+), 2 deletions(-) create mode 100644 partitioner/src/main/java/io/confluent/connect/storage/partitioner/ChainedPartitioner.java create mode 100644 partitioner/src/test/java/io/confluent/connect/storage/partitioner/ChainedPartitionerTest.java diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/ChainedPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/ChainedPartitioner.java new file mode 100644 index 000000000..12e13aecc --- /dev/null +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/ChainedPartitioner.java @@ -0,0 +1,145 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.storage.partitioner; + +import io.confluent.connect.storage.common.GenericRecommender; +import io.confluent.connect.storage.common.StorageCommonConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ChainedPartitioner extends DefaultPartitioner { + private static final Logger log = LoggerFactory.getLogger(ChainedPartitioner.class); + + private final List> partitionerList = new ArrayList<>(); + + @SuppressWarnings("unchecked") + @Override + public void configure(Map config) { + super.configure(config); + + List aliasList = (List) config.get(PartitionerConfig.PARTITIONER_CHAIN_CONFIG); + log.debug("Partitioners alias : {}", aliasList); + + for (String alias : aliasList) { + try { + Partitioner partitioner = PartitionerFactory.newPartitioner(alias, config); + partitionerList.add(partitioner); + log.info("Partitioner is registered. alias: {}, {}", alias, partitioner.getClass()); + } catch (Exception e) { + log.error("Fail to registering partitioner. alias: {}, config: {}", alias, config); + throw new ConfigException(e.getMessage()); + } + } + } + + @Override + public String encodePartition(SinkRecord sinkRecord) { + return getPartitionString(sinkRecord, null); + } + + @Override + public String encodePartition(SinkRecord sinkRecord, long nowInMillis) { + return getPartitionString(sinkRecord, nowInMillis); + } + + private String getPartitionString(SinkRecord sinkRecord, Long nowInMillis) { + StringBuilder builder = new StringBuilder(); + + for (Partitioner partitioner : partitionerList) { + if (builder.length() > 0) { + builder.append(delim); + } + + if (nowInMillis == null) { + builder.append(partitioner.encodePartition(sinkRecord)); + } else { + builder.append(partitioner.encodePartition(sinkRecord, nowInMillis)); + } + } + return builder.toString(); + } + + @Override + public List partitionFields() { + return partitionerList.stream() + .map(Partitioner::partitionFields) + .reduce(new ArrayList<>(), (p1, p2) -> { + p1.addAll(p2); + return p1; + }); + } + + private static class PartitionerFactory { + private static final ConfigDef STORAGE_CONFIG_DEF + = StorageCommonConfig.newConfigDef(new GenericRecommender()); + private static final ConfigDef PARTITION_CONFIG_DEF + = PartitionerConfig.newConfigDef(new GenericRecommender()); + + @SuppressWarnings("unchecked") + public static Partitioner newPartitioner( + String alias, + Map config + ) throws ClassNotFoundException { + String prefix = PartitionerConfig.PARTITIONER_CHAIN_CONFIG + "." + alias + "."; + String partitionerClassName = (String) config.get(prefix + "class"); + Partitioner partitioner + = Utils.newInstance(partitionerClassName, Partitioner.class); + + Map partitionerConfig = new HashMap<>(); + partitionerConfig.putAll(getStorageConfig(config)); + partitionerConfig.putAll(getPartitionConfig(prefix, config)); + partitioner.configure(partitionerConfig); + return partitioner; + } + + private static Map getStorageConfig(Map config) { + Map storageConfig = new HashMap<>(); + Set storageConfigKeySet = STORAGE_CONFIG_DEF.names(); + for (String key : storageConfigKeySet) { + if (config.containsKey(key)) { + storageConfig.put(key, config.get(key)); + } + } + return storageConfig; + } + + @SuppressWarnings("unchecked") + private static Map getPartitionConfig( + String prefix, + Map config + ) { + Map props = new HashMap<>(); + for (String key : config.keySet()) { + if (key.startsWith(prefix)) { + String originalKey = key.replace(prefix, ""); + props.put(originalKey, config.get(key).toString()); + } + } + PartitionerConfig partitionerConfig = new PartitionerConfig(PARTITION_CONFIG_DEF, props); + return (Map) partitionerConfig.values(); + } + } +} diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java index 222e2dfb5..e543ae9f7 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java @@ -103,6 +103,12 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp"; public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor"; + public static final String PARTITIONER_CHAIN_CONFIG = "partitioner.chain"; + public static final String PARTITIONER_CHAIN_DOC = + "The Aliases is used to register for ``ChainedPartitioner`` which can use multiple" + + " partitioners."; + public static final String PARTITIONER_CHAIN_DISPLAY = "List of Alias for Partitioner Chaining"; + /** * Create a new configuration definition. * @@ -211,6 +217,17 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom ++orderInGroup, Width.LONG, TIMESTAMP_FIELD_NAME_DISPLAY); + + configDef.define(PARTITIONER_CHAIN_CONFIG, + Type.LIST, + "", + Importance.MEDIUM, + PARTITIONER_CHAIN_DOC, + group, + ++orderInGroup, + Width.LONG, + PARTITIONER_CHAIN_DISPLAY, + new PartitionerClassDependentsRecommender()); } return configDef; @@ -218,11 +235,11 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom public static class BooleanParentRecommender implements ConfigDef.Recommender { protected final String parentConfigName; - + public BooleanParentRecommender(String parentConfigName) { this.parentConfigName = parentConfigName; } - + @Override public List validValues(String name, Map connectorConfigs) { return new LinkedList<>(); @@ -262,6 +279,9 @@ public boolean visible(String name, Map connectorConfigs) { || name.equals(LOCALE_CONFIG) || name.equals(TIMEZONE_CONFIG); } + } else if (ChainedPartitioner.class.isAssignableFrom(partitioner)) { + // subclass of ChainedPartitioner + return name.equals(PARTITIONER_CHAIN_CONFIG); } else { // Custom partitioner. Allow all the dependent configs. return true; diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/ChainedPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/ChainedPartitionerTest.java new file mode 100644 index 000000000..6ef12c450 --- /dev/null +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/ChainedPartitionerTest.java @@ -0,0 +1,210 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.storage.partitioner; + +import io.confluent.connect.storage.Storage; +import io.confluent.connect.storage.StorageSinkTestBase; +import io.confluent.connect.storage.common.StorageCommonConfig; +import org.apache.avro.file.SeekableInput; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.joda.time.DateTime; +import org.joda.time.DateTimeConstants; +import org.joda.time.DateTimeZone; +import org.junit.Test; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.endsWith; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class ChainedPartitionerTest extends StorageSinkTestBase { + + private static final String FIELD_PARTITIONER_ALIAS = "field"; + private static final String TIME_BASED_PARTITIONER_ALIAS = "timeBased"; + + private static final String TIME_ZONE = "America/Los_Angeles"; + private static final DateTimeZone DATE_TIME_ZONE = DateTimeZone.forID(TIME_ZONE); + private static final String PATH_FORMAT = "'year'=YYYY/'month'=M/'day'=d"; + + private static final int YEAR = 2020; + private static final int MONTH = DateTimeConstants.JUNE; + private static final int DAY = 1; + public static final DateTime DATE_TIME = + new DateTime(YEAR, MONTH, DAY, 1, 0, DATE_TIME_ZONE); + + @Test + public void testEncodePartition() { + String fieldName = "string,int"; + ChainedPartitioner partitioner = createChainedPartitioner(fieldName); + String path = getEncodedPartitionerPath(partitioner); + + Map m = new LinkedHashMap<>(); + m.put("string", "def"); + m.put("int", 12); + m.put("year", YEAR); + m.put("month", MONTH); + m.put("day", DAY); + assertThat(path, is(generateEncodedPartitionFromMap(m))); + } + + @Test + public void testNotAllowedPartitionerConfig() { + final String configKey = PartitionerConfig.PATH_FORMAT_CONFIG; + final String path = ""; + + TimeBasedConfig timeBasedConfig = new TimeBasedConfig(); + timeBasedConfig.addConfig(configKey, path); + + ConfigException e = assertThrows(ConfigException.class, () -> { + createChainedPartitioner(timeBasedConfig); + }); + assertThat(e.getMessage(), + endsWith("Path format cannot be empty.")); + } + + private ChainedPartitioner createChainedPartitioner(String fieldName) { + FieldConfig fieldConfig = new FieldConfig(fieldName); + TimeBasedConfig timeBasedConfig = new TimeBasedConfig(); + return createChainedPartitioner(fieldConfig, timeBasedConfig); + } + + private ChainedPartitioner createChainedPartitioner(ConfigTemplate... configTemplates) { + HashMap config = new HashMap<>(); + // storage common configs + config.put(StorageCommonConfig.STORAGE_CLASS_CONFIG, SimpleStorage.class.getName()); + config.put(StorageCommonConfig.DIRECTORY_DELIM_CONFIG, StorageCommonConfig.DIRECTORY_DELIM_DEFAULT); + + List aliasList = new ArrayList<>(); + for (ConfigTemplate ct : configTemplates) { + aliasList.add(ct.getAlias()); // get configs alias + config.putAll(ct.getConfig()); // combine config + } + config.put(PartitionerConfig.PARTITIONER_CHAIN_CONFIG, aliasList); + + ChainedPartitioner partitioner = new ChainedPartitioner<>(); + partitioner.configure(config); + return partitioner; + } + + private String getEncodedPartitionerPath(ChainedPartitioner partitioner) { + SinkRecord sinkRecord = createSinkRecord(DATE_TIME.getMillis()); + return partitioner.encodePartition(sinkRecord); + } + + private static class FieldConfig extends ConfigTemplate { + private FieldConfig(String fieldNames) { + super(FIELD_PARTITIONER_ALIAS, FieldPartitioner.class); + addConfig(PartitionerConfig.PARTITION_FIELD_NAME_CONFIG, fieldNames); + } + } + + private static class TimeBasedConfig extends ConfigTemplate { + private TimeBasedConfig() { + super(TIME_BASED_PARTITIONER_ALIAS, TimeBasedPartitioner.class); + addConfig(PartitionerConfig.TIMEZONE_CONFIG, TIME_ZONE); + addConfig(PartitionerConfig.PATH_FORMAT_CONFIG, PATH_FORMAT); + addConfig(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "Record"); + addConfig(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, "60000"); + addConfig(PartitionerConfig.LOCALE_CONFIG, Locale.US.toString()); + } + } + + private static class ConfigTemplate { + private final String alias; + private final String prefix; + private final Map config; + + @SuppressWarnings("rawtypes") + private ConfigTemplate(String alias, Class clazz) { + this.alias = alias; + this.prefix = PartitionerConfig.PARTITIONER_CHAIN_CONFIG + "." + alias + "."; + this.config = new HashMap<>(); + this.config.put(prefix + "class", clazz.getName()); + } + + public void addConfig(String key, String value) { + config.put(prefix + key, value); + } + + public String getAlias() { + return alias; + } + + public Map getConfig() { + return config; + } + } + + private static class SimpleStorage implements Storage> { + + @Override + public boolean exists(String path) { + return false; + } + + @Override + public boolean create(String path) { + return false; + } + + @Override + public OutputStream create(String path, StorageCommonConfig conf, boolean overwrite) { + return null; + } + + @Override + public SeekableInput open(String path, StorageCommonConfig conf) { + return null; + } + + @Override + public OutputStream append(String path) { + return null; + } + + @Override + public void delete(String path) { + } + + @Override + public List list(String path) { + return null; + } + + @Override + public void close() { + } + + @Override + public String url() { + return null; + } + + @Override + public StorageCommonConfig conf() { + return null; + } + } +} \ No newline at end of file