Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
Improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Giuseppe Landolfi committed Mar 7, 2019
1 parent 92f5411 commit 9d53fad
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/main/java/ch/ricardo/kafka/consumergroupexporter/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private static void exportConsumerGroup(String groupId, OffsetResetStrategy fall
for (TopicPartition tp : partitions.keySet()) {
// get the offset in the source cluster
long offset = partitions.get(tp).offset();
log.info(tp.toString() + " committed offset on source cluster is " + offset);

// get the timestamp associated to the offset
dataCenterConsumer.assign(ImmutableList.of(tp));
Expand All @@ -75,7 +76,7 @@ private static void exportConsumerGroup(String groupId, OffsetResetStrategy fall
long timestamp = 0;

if (consumerRecords.isEmpty()) {
log.warn(tp.toString() + ": no message could be consumed");
log.warn(tp.toString() + ": no message could be consumed from source cluster");
} else {
ConsumerRecord record = (ConsumerRecord) consumerRecords.iterator().next();

Expand All @@ -90,7 +91,7 @@ private static void exportConsumerGroup(String groupId, OffsetResetStrategy fall
if (remoteOffset != null && remoteOffset.get(tp) != null) {
long remoteOffsetValue = remoteOffset.get(tp).offset() + 1L;

log.info(tp.toString() + ": translated to remote offset " + remoteOffsetValue);
log.info(tp.toString() + ": translated to offset " + remoteOffsetValue + " on destination cluster");

cloudConsumer.commitSync(ImmutableMap.of(tp, new OffsetAndMetadata(remoteOffsetValue)));
continue;
Expand Down

0 comments on commit 9d53fad

Please sign in to comment.