diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java index ba592b27a..19ed622d1 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/PartitionerConfig.java @@ -101,6 +101,13 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp"; public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor"; + public static final String TIMESTAMP_UNIT_NAME_CONFIG = "timestamp.unit"; + public static final String TIMESTAMP_UNIT_NAME_DOC = + "The time unit to be used for timestamp by the timestamp extractor."; + public static final String TIMESTAMP_UNIT_NAME_DEFAULT = "ms"; + public static final String TIMESTAMP_UNIT_NAME_DISPLAY = + "Time Unit of specified Record Field for Timestamp Extractor"; + /** * Create a new configuration definition. * @@ -208,6 +215,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom ++orderInGroup, Width.LONG, TIMESTAMP_FIELD_NAME_DISPLAY); + + configDef.define(TIMESTAMP_UNIT_NAME_CONFIG, + Type.STRING, + TIMESTAMP_UNIT_NAME_DEFAULT, + Importance.MEDIUM, + TIMESTAMP_UNIT_NAME_DOC, + group, + ++orderInGroup, + Width.LONG, + TIMESTAMP_UNIT_NAME_DISPLAY); } return configDef; diff --git a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java index e1bba1cfe..9a089e652 100644 --- a/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java +++ b/partitioner/src/main/java/io/confluent/connect/storage/partitioner/TimeBasedPartitioner.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import io.confluent.connect.storage.common.SchemaGenerator; import io.confluent.connect.storage.common.StorageCommonConfig; @@ -266,15 +267,32 @@ public Long extract(ConnectRecord record) { public static class RecordFieldTimestampExtractor implements TimestampExtractor { private String fieldName; private DateTimeFormatter dateTime; + private TimeUnit timeUnit; @Override public void configure(Map config) { fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG); dateTime = ISODateTimeFormat.dateTimeParser(); + timeUnit = getTimeUnit(config); + } + + private TimeUnit getTimeUnit(Map config) { + String timestampTimeUnit = (String) config.get(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG); + + if ("s".equalsIgnoreCase(timestampTimeUnit)) { + return TimeUnit.SECONDS; + } + + return TimeUnit.MILLISECONDS; } @Override public Long extract(ConnectRecord record) { + Long timestamp = extractTimestamp(record); + return timeUnit == TimeUnit.SECONDS ? timestamp * 1000 : timestamp; + } + + private Long extractTimestamp(ConnectRecord record) { Object value = record.value(); if (value instanceof Struct) { Struct struct = (Struct) value; diff --git a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java index 9fa9625ec..a1820b913 100644 --- a/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java +++ b/partitioner/src/test/java/io/confluent/connect/storage/partitioner/TimeBasedPartitionerTest.java @@ -15,6 +15,10 @@ package io.confluent.connect.storage.partitioner; +import io.confluent.connect.storage.StorageSinkTestBase; +import io.confluent.connect.storage.common.StorageCommonConfig; +import io.confluent.connect.storage.errors.PartitionException; +import io.confluent.connect.storage.util.DateTimeUtils; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; @@ -487,6 +491,17 @@ public void testNumericRecordFieldTimeMap() { ts = DATE_TIME.getMillis(); testMapNumericTimestampPartitionEncoding( partitioner, timeField, ts, Schema.INT64_SCHEMA, DATE_TIME); + + Map configOverride = new HashMap<>(); + configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s"); + TimeBasedPartitioner secondsPartitioner = configurePartitioner( + new TimeBasedPartitioner<>(), timeField, configOverride); + + assertThat(secondsPartitioner.getTimestampExtractor(), + instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class)); + + testMapNumericTimestampPartitionEncoding( + secondsPartitioner, timeField, ((long) ts)/1000, Schema.INT64_SCHEMA, DATE_TIME); } @Test @@ -498,6 +513,14 @@ public void testRecordFieldTimeDateExtractor() { assertThat(partitioner.getTimestampExtractor(), instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class)); + Map configOverride = new HashMap<>(); + configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s"); + TimeBasedPartitioner secondsPartitioner = configurePartitioner( + new TimeBasedPartitioner<>(), timeField, configOverride); + + assertThat(secondsPartitioner.getTimestampExtractor(), + instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class)); + DateTime moment = new DateTime(2015, 4, 2, 1, 0, 0, 0, DateTimeZone.forID(TIME_ZONE)); String expectedPartition = "year=2015/month=4/day=2/hour=1"; @@ -506,6 +529,11 @@ public void testRecordFieldTimeDateExtractor() { String encodedPartition = partitioner.encodePartition(sinkRecord); assertEquals(expectedPartition, encodedPartition); + long rawTimestampSeconds = moment.getMillis()/1000; + sinkRecord = createSinkRecord(rawTimestampSeconds); + encodedPartition = secondsPartitioner.encodePartition(sinkRecord); + assertEquals(expectedPartition, encodedPartition); + String timestamp = ISODateTimeFormat.dateTimeNoMillis().print(moment); sinkRecord = createSinkRecord(Schema.STRING_SCHEMA, timestamp); encodedPartition = partitioner.encodePartition(sinkRecord); @@ -525,6 +553,11 @@ public void testRecordFieldTimeDateExtractor() { encodedPartition = partitioner.encodePartition(sinkRecord); assertEquals("year=1970/month=1/day=1/hour=0", encodedPartition); + int shortTimestampSeconds = shortTimestamp/1000; + sinkRecord = createSinkRecord(Schema.INT32_SCHEMA, shortTimestampSeconds); + encodedPartition = secondsPartitioner.encodePartition(sinkRecord); + assertEquals("year=1970/month=1/day=1/hour=0", encodedPartition); + // Struct - Date extraction sinkRecord = createSinkRecord(rawTimestamp); String structEncodedPartition = partitioner.encodePartition(sinkRecord); @@ -539,8 +572,9 @@ public void testRecordFieldTimeDateExtractor() { @Test public void testNestedRecordFieldTimeExtractor() throws Exception { + String timeField = "nested.timestamp"; TimeBasedPartitioner partitioner = configurePartitioner( - new TimeBasedPartitioner<>(), "nested.timestamp", null); + new TimeBasedPartitioner<>(), timeField, null); assertThat(partitioner.getTimestampExtractor(), instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class)); @@ -551,6 +585,20 @@ public void testNestedRecordFieldTimeExtractor() throws Exception { String encodedPartition = partitioner.encodePartition(sinkRecord); validateEncodedPartition(encodedPartition); + + Map configOverride = new HashMap<>(); + configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s"); + TimeBasedPartitioner secondsPartitioner = configurePartitioner( + new TimeBasedPartitioner<>(), timeField, configOverride); + + assertThat(secondsPartitioner.getTimestampExtractor(), + instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class)); + + sinkRecord = createSinkRecordWithNestedTimestampField(timestamp/1000); + + encodedPartition = secondsPartitioner.encodePartition(sinkRecord); + + validateEncodedPartition(encodedPartition); } @Test