Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a timestamp unit to partitioner config #122

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ public class PartitionerConfig extends AbstractConfig implements ComposableConfi
public static final String TIMESTAMP_FIELD_NAME_DEFAULT = "timestamp";
public static final String TIMESTAMP_FIELD_NAME_DISPLAY = "Record Field for Timestamp Extractor";

public static final String TIMESTAMP_UNIT_NAME_CONFIG = "timestamp.unit";
public static final String TIMESTAMP_UNIT_NAME_DOC =
"The time unit to be used for timestamp by the timestamp extractor.";
public static final String TIMESTAMP_UNIT_NAME_DEFAULT = "ms";
public static final String TIMESTAMP_UNIT_NAME_DISPLAY =
"Time Unit of specified Record Field for Timestamp Extractor";

/**
* Create a new configuration definition.
*
Expand Down Expand Up @@ -208,6 +215,16 @@ public static ConfigDef newConfigDef(ConfigDef.Recommender partitionerClassRecom
++orderInGroup,
Width.LONG,
TIMESTAMP_FIELD_NAME_DISPLAY);

configDef.define(TIMESTAMP_UNIT_NAME_CONFIG,
Type.STRING,
TIMESTAMP_UNIT_NAME_DEFAULT,
Importance.MEDIUM,
TIMESTAMP_UNIT_NAME_DOC,
group,
++orderInGroup,
Width.LONG,
TIMESTAMP_UNIT_NAME_DISPLAY);
}

return configDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.storage.common.SchemaGenerator;
import io.confluent.connect.storage.common.StorageCommonConfig;
Expand Down Expand Up @@ -266,15 +267,32 @@ public Long extract(ConnectRecord<?> record) {
public static class RecordFieldTimestampExtractor implements TimestampExtractor {
private String fieldName;
private DateTimeFormatter dateTime;
private TimeUnit timeUnit;

@Override
public void configure(Map<String, Object> config) {
fieldName = (String) config.get(PartitionerConfig.TIMESTAMP_FIELD_NAME_CONFIG);
dateTime = ISODateTimeFormat.dateTimeParser();
timeUnit = getTimeUnit(config);
}

private TimeUnit getTimeUnit(Map<String, Object> config) {
String timestampTimeUnit = (String) config.get(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG);

if ("s".equalsIgnoreCase(timestampTimeUnit)) {
return TimeUnit.SECONDS;
}

return TimeUnit.MILLISECONDS;
}

@Override
public Long extract(ConnectRecord<?> record) {
Long timestamp = extractTimestamp(record);
return timeUnit == TimeUnit.SECONDS ? timestamp * 1000 : timestamp;
}

private Long extractTimestamp(ConnectRecord<?> record) {
Object value = record.value();
if (value instanceof Struct) {
Struct struct = (Struct) value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package io.confluent.connect.storage.partitioner;

import io.confluent.connect.storage.StorageSinkTestBase;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.errors.PartitionException;
import io.confluent.connect.storage.util.DateTimeUtils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -487,6 +491,17 @@ public void testNumericRecordFieldTimeMap() {
ts = DATE_TIME.getMillis();
testMapNumericTimestampPartitionEncoding(
partitioner, timeField, ts, Schema.INT64_SCHEMA, DATE_TIME);

Map<String, Object> configOverride = new HashMap<>();
configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s");
TimeBasedPartitioner<String> secondsPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, configOverride);

assertThat(secondsPartitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

testMapNumericTimestampPartitionEncoding(
secondsPartitioner, timeField, ((long) ts)/1000, Schema.INT64_SCHEMA, DATE_TIME);
}

@Test
Expand All @@ -498,6 +513,14 @@ public void testRecordFieldTimeDateExtractor() {
assertThat(partitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

Map<String, Object> configOverride = new HashMap<>();
configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s");
TimeBasedPartitioner<String> secondsPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, configOverride);

assertThat(secondsPartitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

DateTime moment = new DateTime(2015, 4, 2, 1, 0, 0, 0, DateTimeZone.forID(TIME_ZONE));
String expectedPartition = "year=2015/month=4/day=2/hour=1";

Expand All @@ -506,6 +529,11 @@ public void testRecordFieldTimeDateExtractor() {
String encodedPartition = partitioner.encodePartition(sinkRecord);
assertEquals(expectedPartition, encodedPartition);

long rawTimestampSeconds = moment.getMillis()/1000;
sinkRecord = createSinkRecord(rawTimestampSeconds);
encodedPartition = secondsPartitioner.encodePartition(sinkRecord);
assertEquals(expectedPartition, encodedPartition);

String timestamp = ISODateTimeFormat.dateTimeNoMillis().print(moment);
sinkRecord = createSinkRecord(Schema.STRING_SCHEMA, timestamp);
encodedPartition = partitioner.encodePartition(sinkRecord);
Expand All @@ -525,6 +553,11 @@ public void testRecordFieldTimeDateExtractor() {
encodedPartition = partitioner.encodePartition(sinkRecord);
assertEquals("year=1970/month=1/day=1/hour=0", encodedPartition);

int shortTimestampSeconds = shortTimestamp/1000;
sinkRecord = createSinkRecord(Schema.INT32_SCHEMA, shortTimestampSeconds);
encodedPartition = secondsPartitioner.encodePartition(sinkRecord);
assertEquals("year=1970/month=1/day=1/hour=0", encodedPartition);

// Struct - Date extraction
sinkRecord = createSinkRecord(rawTimestamp);
String structEncodedPartition = partitioner.encodePartition(sinkRecord);
Expand All @@ -539,8 +572,9 @@ public void testRecordFieldTimeDateExtractor() {

@Test
public void testNestedRecordFieldTimeExtractor() throws Exception {
String timeField = "nested.timestamp";
TimeBasedPartitioner<String> partitioner = configurePartitioner(
new TimeBasedPartitioner<>(), "nested.timestamp", null);
new TimeBasedPartitioner<>(), timeField, null);

assertThat(partitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));
Expand All @@ -551,6 +585,20 @@ public void testNestedRecordFieldTimeExtractor() throws Exception {
String encodedPartition = partitioner.encodePartition(sinkRecord);

validateEncodedPartition(encodedPartition);

Map<String, Object> configOverride = new HashMap<>();
configOverride.put(PartitionerConfig.TIMESTAMP_UNIT_NAME_CONFIG, "s");
TimeBasedPartitioner<String> secondsPartitioner = configurePartitioner(
new TimeBasedPartitioner<>(), timeField, configOverride);

assertThat(secondsPartitioner.getTimestampExtractor(),
instanceOf(TimeBasedPartitioner.RecordFieldTimestampExtractor.class));

sinkRecord = createSinkRecordWithNestedTimestampField(timestamp/1000);

encodedPartition = secondsPartitioner.encodePartition(sinkRecord);

validateEncodedPartition(encodedPartition);
}

@Test
Expand Down