-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Federated Clients: implement commitSync() and committed() #129
base: master
Are you sure you want to change the base?
Conversation
} | ||
|
||
try { | ||
Thread.sleep(Math.min(deadlineTimeMs - System.currentTimeMillis(), _retryBackoffMs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can deadlineTimeMs - System.currentTimeMillis() be negative and cause infinite sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep would throw for a negative
} | ||
|
||
try { | ||
if (!countDownLatch.await(deadlineTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here, not sure if we need to consider negative values for await here.
_nonexistentTopics = Collections.emptySet(); | ||
} | ||
|
||
public LocationLookupResult(/*Map<ClusterDescriptor, T> valuesByCluster, */Set<String> nonexistentTopics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should you just delete the commented out part ?
} | ||
|
||
// ATTN: UnknownTopicOrPartitionException may be received - this is a retriable exception.. | ||
// if not resolved by the time, timeout exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"by the time" - "within the timeout" ?
long now = System.currentTimeMillis(); | ||
long deadlineTimeMs = now + timeout.toMillis(); | ||
while (now < deadlineTimeMs) { | ||
PartitionKeyedMapLookupResult offsetsByClusterResult = getPartitionKeyedMapsByCluster(offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the while() loop executes multiple times (because some topics done exist?) it will commit the same offsets for existing topicPartitions N times. i think on successful commit the TPs committed should be taken out and not committed over again - its just wasteful
} | ||
|
||
@Override | ||
synchronized public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) { | ||
throw new UnsupportedOperationException("Not implemented yet"); | ||
if (partition == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to null check timeout as well
@@ -552,17 +589,48 @@ synchronized public long position(TopicPartition partition, Duration timeout) { | |||
|
|||
@Override | |||
synchronized public OffsetAndMetadata committed(TopicPartition partition) { | |||
throw new UnsupportedOperationException("Not implemented yet"); | |||
|
|||
return committed(partition, _defaultApiTimeout); | |||
} | |||
|
|||
@Override | |||
synchronized public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does vanilla block here for non-existent TPs? if so, we should probably document this
No description provided.