Skip to content

Commit

Permalink
cherry pick of: fc5f6b0 (#18)
Browse files Browse the repository at this point in the history
* cherry pick of: fc5f6b0

MINOR: Add Timer to simplify timeout bookkeeping and use it in the consumer (apache#5087)

We currently do a lot of bookkeeping for timeouts which is both error-prone and distracting. This patch adds a new `Timer` class to simplify this logic and control unnecessary calls to system time. In particular, this helps with nested timeout operations. The consumer has been updated to use the new class.

Reviewers: Ismael Juma <[email protected]>, Guozhang Wang <[email protected]>

* Restore sensor functionality that was removed from patch.
  • Loading branch information
smccauliff authored Apr 24, 2019
1 parent d2c093b commit eae1c4c
Show file tree
Hide file tree
Showing 20 changed files with 924 additions and 687 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -756,7 +757,7 @@ private KafkaConsumer(ConsumerConfig config,
groupId,
maxPollIntervalMs,
sessionTimeoutMs,
new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
assignors,
this.metadata,
this.subscriptions,
Expand Down Expand Up @@ -1090,7 +1091,7 @@ public void assign(Collection<TopicPartition> partitions) {
* offset for the subscribed list of partitions
*
*
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
* @param timeoutMs The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
* If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
* Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
Expand All @@ -1116,8 +1117,8 @@ public void assign(Collection<TopicPartition> partitions) {
*/
@Deprecated
@Override
public ConsumerRecords<K, V> poll(final long timeout) {
return poll(timeout, false);
public ConsumerRecords<K, V> poll(final long timeoutMs) {
return poll(time.timer(timeoutMs), false);
}

/**
Expand Down Expand Up @@ -1158,41 +1159,31 @@ public ConsumerRecords<K, V> poll(final long timeout) {
*/
@Override
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(timeout.toMillis(), true);
return poll(time.timer(timeout), true);
}

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

// poll for new data until the timeout expires
long elapsedTime = 0L;
do {

client.maybeTriggerWakeup();

final long metadataEnd;
if (includeMetadataInTimeout) {
final long metadataStart = time.milliseconds();
if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
metadataEnd = time.milliseconds();
elapsedTime += metadataEnd - metadataStart;
} else {
while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
metadataEnd = time.milliseconds();
}

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
Expand All @@ -1206,10 +1197,7 @@ private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMe

return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
final long fetchEnd = time.milliseconds();
elapsedTime += fetchEnd - metadataEnd;

} while (elapsedTime < timeoutMs);
} while (timer.notExpired());

return ConsumerRecords.empty();
} finally {
Expand All @@ -1220,18 +1208,16 @@ private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMe
/**
* Visible for testing
*/
boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
final long startMs = time.milliseconds();
if (!coordinator.poll(timeoutMs)) {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (!coordinator.poll(timer)) {
return false;
}

return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
return updateFetchPositions(timer);
}

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
final long startMs = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
Expand All @@ -1251,11 +1237,13 @@ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final lon
pollTimeout = retryBackoffMs;
}

client.poll(pollTimeout, startMs, () -> {
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
timer.update(pollTimer.currentTimeMs());

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
Expand All @@ -1266,10 +1254,6 @@ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final lon
return fetcher.fetchedRecords();
}

private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsedTime) {
return Math.max(0, timeoutMs - elapsedTime);
}

/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions.
Expand Down Expand Up @@ -1338,7 +1322,7 @@ public void commitSync() {
public void commitSync(Duration timeout) {
acquireAndEnsureOpen();
try {
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())) {
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing the current consumed offsets");
}
Expand Down Expand Up @@ -1420,7 +1404,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) {
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing offsets " + offsets);
}
Expand Down Expand Up @@ -1627,30 +1611,23 @@ public long position(TopicPartition partition) {
*/
@Override
public long position(TopicPartition partition, final Duration timeout) {
final long timeoutMs = timeout.toMillis();
acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
final long startMs = time.milliseconds();
long finishMs = startMs;

while (offset == null && finishMs - startMs < timeoutMs) {
// batch update fetch positions for any partitions without a valid position
if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs))) {
break;
}
finishMs = time.milliseconds();

client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs - startMs));
offset = this.subscriptions.position(partition);
finishMs = time.milliseconds();
}
if (offset == null)
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
"for partition " + partition + " could be determined");
return offset;
Timer timer = time.timer(timeout);
do {
Long offset = this.subscriptions.position(partition);
if (offset != null)
return offset;

updateFetchPositions(timer);
client.poll(timer);
} while (timer.notExpired());

throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
"for partition " + partition + " could be determined");
} finally {
release();
}
Expand Down Expand Up @@ -1708,7 +1685,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time
acquireAndEnsureOpen();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
Collections.singleton(partition), timeout.toMillis());
Collections.singleton(partition), time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partition " + partition + " could be determined");
Expand Down Expand Up @@ -1771,15 +1748,15 @@ public List<PartitionInfo> partitionsFor(String topic) {
@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
acquireAndEnsureOpen();
long timeoutMs = timeout.toMillis();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
if (!parts.isEmpty())
return parts;

Timer timer = time.timer(requestTimeoutMs);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
new MetadataRequest.Builder(Collections.singletonList(topic), true), timer);
return topicMetadata.get(topic);
} finally {
release();
Expand Down Expand Up @@ -1824,7 +1801,7 @@ public Map<String, List<PartitionInfo>> listTopics() {
public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
acquireAndEnsureOpen();
try {
return fetcher.getAllTopicMetadata(timeout.toMillis());
return fetcher.getAllTopicMetadata(time.timer(timeout));
} finally {
release();
}
Expand Down Expand Up @@ -1949,7 +1926,7 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
return fetcher.offsetsByTimes(timestampsToSearch, timeout.toMillis());
return fetcher.offsetsByTimes(timestampsToSearch, time.timer(timeout));
} finally {
release();
}
Expand Down Expand Up @@ -1994,7 +1971,7 @@ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> par
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
acquireAndEnsureOpen();
try {
return fetcher.beginningOffsets(partitions, timeout.toMillis());
return fetcher.beginningOffsets(partitions, time.timer(timeout));
} finally {
release();
}
Expand Down Expand Up @@ -2049,7 +2026,7 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
acquireAndEnsureOpen();
try {
return fetcher.endOffsets(partitions, timeout.toMillis());
return fetcher.endOffsets(partitions, time.timer(timeout));
} finally {
release();
}
Expand Down Expand Up @@ -2148,7 +2125,7 @@ private void close(long timeoutMs, boolean swallowException) {
AtomicReference<Throwable> firstException = new AtomicReference<>();
try {
if (coordinator != null)
coordinator.close(Math.min(timeoutMs, requestTimeoutMs));
coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs)));
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close coordinator", t);
Expand Down Expand Up @@ -2179,7 +2156,7 @@ private void close(long timeoutMs, boolean swallowException) {
* defined
* @return true iff the operation completed without timing out
*/
private boolean updateFetchPositions(final long timeoutMs) {
private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHashAllFetchPositions) return true;

Expand All @@ -2188,7 +2165,7 @@ private boolean updateFetchPositions(final long timeoutMs) {
// coordinator lookup if there are partitions which have missing positions, so
// a consumer with manually assigned partitions can avoid a coordinator dependence
// by always ensuring that assigned partitions have an initial position.
if (!coordinator.refreshCommittedOffsetsIfNeeded(timeoutMs)) return false;
if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;

// If there are partitions still needing a position and a reset policy is defined,
// request reset using the default policy. If no reset strategy is defined and there
Expand Down
Loading

0 comments on commit eae1c4c

Please sign in to comment.