Skip to content

Commit

Permalink
Allow connector task to use event's source timestamp when feature is …
Browse files Browse the repository at this point in the history
…enabled

Current implementation passes event's source timestamp only when its timestamp type is
LogAppendTime, else it passes event's read time. This breaks the features.
This commit changes the following:
- Use event's source timestamp when the feature is enabled
- Resort to default behavior when feature is disabled
- Added associated test cases
  • Loading branch information
sanjay24 committed Apr 13, 2020
1 parent 23031e6 commit 63de991
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public class KafkaMirrorMakerConnectorTask extends AbstractKafkaBasedConnectorTa
private long _minInFlightMessagesThreshold;
private int _flowControlTriggerCount = 0;

// variable to preserve the source event timestamp
private boolean _preserveEventSourceTimestamp = false;

/**
* Constructor for KafkaMirrorMakerConnectorTask
* @param config Task configuration properties
Expand All @@ -143,11 +146,13 @@ public KafkaMirrorMakerConnectorTask(KafkaBasedConnectorConfig config, Datastrea
_destinationTopicPrefix = task.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.DESTINATION_TOPIC_PREFIX, DEFAULT_DESTINATION_TOPIC_PREFIX);
_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_preserveEventSourceTimestamp = Boolean.parseBoolean(task.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.FALSE.toString()));

if (_enablePartitionAssignment) {
LOG.info("Enable Brooklin partition assignment");
}

LOG.info("Preserve event source timestamp is set to {}", _preserveEventSourceTimestamp);
LOG.info("Destination topic prefix has been set to {}", _destinationTopicPrefix);

if (_isFlushlessModeEnabled) {
Expand Down Expand Up @@ -223,7 +228,7 @@ private Set<TopicPartition> getAssignedTopicPartitionFromTask() {

@Override
protected DatastreamProducerRecord translate(ConsumerRecord<?, ?> fromKafka, Instant readTime) {
long eventsSourceTimestamp =
long eventsSourceTimestamp = _preserveEventSourceTimestamp ? fromKafka.timestamp() :
fromKafka.timestampType() == TimestampType.LOG_APPEND_TIME ? fromKafka.timestamp() : readTime.toEpochMilli();
HashMap<String, String> metadata = new HashMap<>();
metadata.put(KAFKA_ORIGIN_CLUSTER, _mirrorMakerSource.getBrokerListString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package com.linkedin.datastream.connectors.kafka.mirrormaker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
Expand All @@ -16,6 +18,7 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testng.Assert;

Expand Down Expand Up @@ -54,15 +57,16 @@ static Properties getKafkaProducerProperties(DatastreamEmbeddedZookeeperKafkaClu

static void produceEvents(String topic, int destinationPartition, int numEvents,
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
produceEventsToPartition(topic, destinationPartition, numEvents, kafkaCluster);
produceEventsToPartitionAsync(topic, destinationPartition, numEvents, kafkaCluster);
}

static void produceEvents(String topic, int numEvents, DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
produceEventsToPartition(topic, null, numEvents, kafkaCluster);
produceEventsToPartitionAsync(topic, null, numEvents, kafkaCluster);
}

static void produceEventsToPartition(String topic, Integer destinationPartition, int numEvents,
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
static void produceEventsToPartitionAsync(String topic, Integer destinationPartition, int numEvents,
DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {

try (Producer<byte[], byte[]> producer = new KafkaProducer<>(getKafkaProducerProperties(kafkaCluster))) {
for (int i = 0; i < numEvents; i++) {
producer.send(new ProducerRecord<>(topic, destinationPartition, ("key-" + i).getBytes(Charsets.UTF_8),
Expand All @@ -76,6 +80,38 @@ static void produceEventsToPartition(String topic, Integer destinationPartition,
}
}

static List<RecordMetadata> produceEventsToPartitionSync(String topic, Integer destinationPartition, int numEvents,
List<Long> eventSourceTimeStamps, DatastreamEmbeddedZookeeperKafkaCluster kafkaCluster) {
if (eventSourceTimeStamps != null && numEvents != eventSourceTimeStamps.size()) {
Assert.fail("Number of source timestamps don't match number of events. Required: " +
numEvents + ", supplied: " + eventSourceTimeStamps.size());
} else if (eventSourceTimeStamps == null) {
eventSourceTimeStamps = new ArrayList<>();
for (int i = 0; i < numEvents; i++) {
eventSourceTimeStamps.add(null);
}
}
List<RecordMetadata> recordMetadataList = new ArrayList<>();

Producer<byte[], byte[]> producer = new KafkaProducer<>(getKafkaProducerProperties(kafkaCluster));
try {
for (int i = 0; i < numEvents; i++) {
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, destinationPartition,
eventSourceTimeStamps.get(i), ("key-" + i).getBytes(Charsets.UTF_8),
("value-" + i).getBytes(Charsets.UTF_8))).get();
recordMetadataList.add(metadata);
}
} catch (InterruptedException interruptException) {
throw new RuntimeException("Failed to send message.", interruptException);
} catch (Exception exception) {
throw new RuntimeException("Failed to send message.", exception);
} finally {
producer.flush();
producer.close();
}
return recordMetadataList;
}

static Datastream createDatastream(String name, String broker, String sourceRegex, StringMap metadata) {
DatastreamSource source = new DatastreamSource();
source.setConnectionString("kafka://" + broker + "/" + sourceRegex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.datastream.connectors.kafka.mirrormaker;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -21,8 +22,10 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -79,6 +82,8 @@ public class TestKafkaMirrorMakerConnectorTask extends BaseKafkaZkTest {

private static final long CONNECTOR_AWAIT_STOP_TIMEOUT_MS = 30000;
private static final Logger LOG = LoggerFactory.getLogger(TestKafkaMirrorMakerConnectorTask.class);
private static final String MESSAGE_TIMESTAMP_TYPE_CREATE_TIME = "CreateTime";
private static final String MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME = "LogAppendTime";

@Test
public void testConsumeFromMultipleTopics() throws Exception {
Expand Down Expand Up @@ -127,6 +132,35 @@ public void testConsumeFromMultipleTopics() throws Exception {
Assert.assertTrue(connectorTask.awaitStop(CONNECTOR_AWAIT_STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS),
"did not shut down on time");
}
@Test
public void testPreserveSourceEventTimestampForSourceLogTimestampTypeAsCreateTime() throws Exception {
testEventSendWithTimestamp("topicWithCreateTime", 3, MESSAGE_TIMESTAMP_TYPE_CREATE_TIME, Boolean.TRUE);
}

@Test
public void testPreserveSourceEventTimestampForSourceLogTimestampTypeAsLogAppendTime() throws Exception {
testEventSendWithTimestamp("topicWithLogAppendTime", 3, MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME, Boolean.TRUE);
}

@Test
public void testPreserveSourceEventTimestampNotSetForSourceLogTimestampTypeAsLogAppendTime() throws Exception {
testEventSendWithTimestamp("preserveNotSet_1", 3, MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME, null);
}

@Test
public void testPreserveSourceEventTimestampSetToFalseForSourceLogTimestampTypeAsLogAppendTime() throws Exception {
testEventSendWithTimestamp("preserveNotSet_2", 3, MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME, Boolean.FALSE);
}

@Test
public void testPreserveSourceEventTimestampNotSetForSourceLogTimestampTypeAsCreateTime() throws Exception {
testEventSendWithTimestamp("preserveTsTest_3", 3, MESSAGE_TIMESTAMP_TYPE_CREATE_TIME, null);
}

@Test
public void testPreserveSourceEventTimestampSetToFalseForSourceLogTimestampTypeAsCreateTime() throws Exception {
testEventSendWithTimestamp("preserveTsTest_4", 3, MESSAGE_TIMESTAMP_TYPE_CREATE_TIME, Boolean.FALSE);
}

@Test
public void testConsumeFromMultipleTopicsWithDestinationTopicPrefixMetadata() throws Exception {
Expand Down Expand Up @@ -202,7 +236,7 @@ public void testIdentityPartitioningEnabled() throws Exception {
// produce an event half of the partitions
Set<Integer> expectedPartitionsWithData = new HashSet<>();
for (int i = 0; i < partitionCount; i += 2) {
KafkaMirrorMakerConnectorTestUtils.produceEventsToPartition(yummyTopic, i, 1, _kafkaCluster);
KafkaMirrorMakerConnectorTestUtils.produceEvents(yummyTopic, i, 1, _kafkaCluster);
expectedPartitionsWithData.add(i);
}

Expand Down Expand Up @@ -1054,4 +1088,65 @@ private void validatePausedPartitionsMetrics(String task, String stream, long nu
== numConfigPausedPartitions, POLL_PERIOD_MS, POLL_TIMEOUT_MS),
"numConfigPausedPartitions metric failed to update");
}
}
private void testEventSendWithTimestamp(String topicName, int numberOfEvents, String messageTimestampType,
Boolean preserveSourceEventTimestamp) throws Exception {
Assert.assertTrue(messageTimestampType.equals("CreateTime") ||
messageTimestampType.equals("LogAppendTime"));
Properties topicProperties = new Properties();
topicProperties.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, messageTimestampType);
createTopic(_zkUtils, topicName, 1, topicProperties);

Datastream datastream = KafkaMirrorMakerConnectorTestUtils.createDatastream(topicName + "_Stream", _broker, "^" + topicName + "$");
if (preserveSourceEventTimestamp != null) {
datastream.getMetadata().put(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, preserveSourceEventTimestamp.toString());
}

DatastreamTaskImpl task = new DatastreamTaskImpl(Collections.singletonList(datastream));
MockDatastreamEventProducer datastreamProducer = new MockDatastreamEventProducer();
task.setEventProducer(datastreamProducer);

Long sourceTimestampBase = 1582766709000L;
List<Long> eventSourceTimestamps = new ArrayList<>();
for (int index = 0; index < numberOfEvents; ++index) {
eventSourceTimestamps.add(sourceTimestampBase + index);
}

KafkaMirrorMakerConnectorTask connectorTask =
KafkaMirrorMakerConnectorTestUtils.createKafkaMirrorMakerConnectorTask(task);

List<RecordMetadata> recordMetadataList = KafkaMirrorMakerConnectorTestUtils.produceEventsToPartitionSync(topicName,
null, numberOfEvents, eventSourceTimestamps, _kafkaCluster);

KafkaMirrorMakerConnectorTestUtils.runKafkaMirrorMakerConnectorTask(connectorTask);

if (!PollUtils.poll(() -> datastreamProducer.getEvents().size() == numberOfEvents, POLL_PERIOD_MS, POLL_TIMEOUT_MS) &&
recordMetadataList.size() == numberOfEvents) {
Assert.fail("did not transfer the msgs within timeout. transferred " + datastreamProducer.getEvents().size());
}

// Get the broker appended timestamps if message timestamp type is LogAppendTime
if (messageTimestampType == MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME) {
for (int index = 0; index < numberOfEvents; ++index) {
eventSourceTimestamps.set(index, recordMetadataList.get(index).timestamp());
}
}
List<DatastreamProducerRecord> records = datastreamProducer.getEvents();

List<Long> readTimestamps = new ArrayList<>();

for (DatastreamProducerRecord record : records) {
readTimestamps.add(record.getEventsSourceTimestamp());
}

if (preserveSourceEventTimestamp != null && preserveSourceEventTimestamp) {
Assert.assertEquals(readTimestamps, eventSourceTimestamps, "source and destination timestamps don't match!");
} else if (messageTimestampType == MESSAGE_TIMESTAMP_TYPE_CREATE_TIME) {
Assert.assertNotEquals(readTimestamps, eventSourceTimestamps);
} else if (messageTimestampType == MESSAGE_TIMESTAMP_TYPE_LOG_APPEND_TIME) {
Assert.assertEquals(readTimestamps, eventSourceTimestamps);
}
connectorTask.stop();
Assert.assertTrue(connectorTask.awaitStop(CONNECTOR_AWAIT_STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS),
"did not shut down on time");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private void testEventSendWithTimestamp(int numberOfEvents, int numberOfPartitio
KafkaTestUtils.waitForTopicCreation(_zkUtils, topicName, _kafkaCluster.getBrokers());

LOG.info(String.format("Topic %s created with %d partitions and topic properties %s", topicName, numberOfPartitions,
new Properties()));
topicProperties));
// Specify source event timestamp for asserting
Long sourceTimestampBase = 1582766709000L;
List<Long> eventSourceTimestamps = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,21 @@ public void afterMethodTeardown() {
_broker = null;
}

protected static void createTopic(ZkUtils zkUtils, String topic, int partitionCount) {
protected static void createTopic(ZkUtils zkUtils, String topic, int partitionCount, Properties properties) {
if (!AdminUtils.topicExists(zkUtils, topic)) {
Properties properties = new Properties();
properties.put("message.timestamp.type", "LogAppendTime");
AdminUtils.createTopic(zkUtils, topic, partitionCount, 1, properties, null);
Properties topicConfig = properties;
if (topicConfig == null) {
topicConfig = new Properties();
topicConfig.put("message.timestamp.type", "LogAppendTime");
}
AdminUtils.createTopic(zkUtils, topic, partitionCount, 1, topicConfig, null);
}
}

protected static void createTopic(ZkUtils zkUtils, String topic, int partitionCount) {
createTopic(zkUtils, topic, partitionCount, null);
}

protected static void createTopic(ZkUtils zkUtils, String topic) {
createTopic(zkUtils, topic, 1);
}
Expand Down

0 comments on commit 63de991

Please sign in to comment.