Skip to content

Commit

Permalink
[server] Enhance the producer timestamp fetch logic to include both d…
Browse files Browse the repository at this point in the history
…ata and heartbeat messages. (linkedin#1422)

Update the topic metadata producer timestamp fetch logic to include heartbeat messages. Previously, the getProducerTimestampOfLastDataMessage function only considered data messages when retrieving the producer timestamp from the most recent messages. However, for real-time topics without data ingestion, the last messages could be heartbeat control messages, causing endless searching during the ready-to-serve check due to the continuous arrival of new heartbeat messages.

Co-authored-by: Hao Xu <[email protected]>
  • Loading branch information
haoxu07 and Hao Xu authored Jan 22, 2025
1 parent cd915b9 commit fe1500f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ long getProducerTimestampOfLastDataMessage(PubSubTopicPartition pubSubTopicParti
// iterate in reverse order to find the first data message (not control message) from the end
for (int i = lastConsumedRecords.size() - 1; i >= 0; i--) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = lastConsumedRecords.get(i);
if (!record.getKey().isControlMessage()) {
if (!record.getKey().isControlMessage()
|| Arrays.equals(record.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
stats.recordLatency(GET_PRODUCER_TIMESTAMP_OF_LAST_DATA_MESSAGE, startTime);
// note that the timestamp is the producer timestamp and not the pubsub message (broker) timestamp
return record.getValue().getProducerMetadata().getMessageTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,15 @@ public void testConsumeLatestRecords() {
verify(consumerMock, times(2)).unSubscribe(eq(topicPartition));
}

private PubSubMessage getHeartBeatPubSubMessage(PubSubTopicPartition topicPartition, long offset) {
KafkaKey key = KafkaKey.HEART_BEAT;
KafkaMessageEnvelope val = mock(KafkaMessageEnvelope.class);
ProducerMetadata producerMetadata = new ProducerMetadata();
producerMetadata.setMessageTimestamp(System.nanoTime());
when(val.getProducerMetadata()).thenReturn(producerMetadata);
return new ImmutablePubSubMessage(key, val, topicPartition, offset, System.currentTimeMillis(), 512);
}

private PubSubMessage getPubSubMessage(PubSubTopicPartition topicPartition, boolean isControlMessage, long offset) {
KafkaKey key = mock(KafkaKey.class);
when(key.isControlMessage()).thenReturn(isControlMessage);
Expand All @@ -403,14 +412,20 @@ public void testGetProducerTimestampOfLastDataMessage() {
assertEquals(timestamp, PUBSUB_NO_PRODUCER_TIME_IN_EMPTY_TOPIC_PARTITION);
verify(metadataFetcherSpy, times(1)).consumeLatestRecords(eq(topicPartition), anyInt());

// test when there are no data messages to consume
// test when there are no data messages and heartbeat messages to consume
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> cm = getPubSubMessage(topicPartition, true, 5);
doReturn(Collections.singletonList(cm)).when(metadataFetcherSpy).consumeLatestRecords(eq(topicPartition), anyInt());
Throwable t = expectThrows(
VeniceException.class,
() -> metadataFetcherSpy.getProducerTimestampOfLastDataMessage(topicPartition));
assertTrue(t.getMessage().contains("No data message found in topic-partition"));

// test when there are heartbeat messages but no data messages to consume
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> hm = getHeartBeatPubSubMessage(topicPartition, 6);
doReturn(Collections.singletonList(hm)).when(metadataFetcherSpy).consumeLatestRecords(eq(topicPartition), anyInt());
timestamp = metadataFetcherSpy.getProducerTimestampOfLastDataMessage(topicPartition);
assertEquals(timestamp, hm.getValue().getProducerMetadata().getMessageTimestamp());

// test when there are data messages to consume
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> dm0 = getPubSubMessage(topicPartition, false, 4);
doReturn(Collections.singletonList(dm0)).when(metadataFetcherSpy)
Expand Down

0 comments on commit fe1500f

Please sign in to comment.