Skip to content

Commit

Permalink
[LI-HOTFIX] Avoid excessive info logs in the client side for incremen…
Browse files Browse the repository at this point in the history
…tal fetch and print out more informative logs for throttling (#48)

* [LI-HOTFIX] Avoid excessive info logs in the client side for incremental fetch and
print out more informative logs for throttling

TICKET = KAFKA-8921

LI_DESCRIPTION =  

Currently in FetchSessionHandler::handleResponse, the following info log
will get printed out excessively when the session is evicted from the
server-side cache even though there is nothing wrong with the fetch
request and client cannot do much to improve it.
```
Node xxx was unable to process the fetch request with (sessionId=xxx,
epoch=xxx): FETCH_SESSION_ID_NOT_FOUND.
``` 

Moreover, when the fetch request gets throttled, the following info logs
will also get printed out, which are very misleading.
```
Node xxx sent an invalid full fetch response with ...
Node xxx sent an invalid incremental fetch response with ...
```

We should avoid logging these things in INFO level and print out more
informative logs for throttling.

 

EXIT_CRITERIA = MANUAL ["When the hotfix is pushed to apache/kafka"]
  • Loading branch information
hzxa21 authored Nov 7, 2019
1 parent 34dbd85 commit 456f47e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,21 @@ private String responseDataToLogString(FetchResponse response) {
*/
public boolean handleResponse(FetchResponse response) {
if (response.error() != Errors.NONE) {
log.info("Node {} was unable to process the fetch request with {}: {}.",
node, nextMetadata, response.error());
if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) {
log.debug("Node {} was unable to process the fetch request with {}: {}. "
+ "This normally indicates the session has been evicted in the server side cache",
node, nextMetadata, response.error());
nextMetadata = FetchMetadata.INITIAL;
} else {
log.warn("Node {} was unable to process the fetch request with {}: {}.",
node, nextMetadata, response.error());
nextMetadata = nextMetadata.nextCloseExisting();
}
return false;
} else if (nextMetadata.isFull()) {
String problem = verifyFullFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid full fetch response with {}", node, problem);
log.debug("Node {} sent an invalid full fetch response with {}", node, problem);
nextMetadata = FetchMetadata.INITIAL;
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
Expand All @@ -409,7 +412,7 @@ public boolean handleResponse(FetchResponse response) {
} else {
String problem = verifyIncrementalFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid incremental fetch response with {}", node, problem);
log.debug("Node {} sent an invalid incremental fetch response with {}", node, problem);
nextMetadata = nextMetadata.nextCloseExisting();
return false;
} else if (response.sessionId() == INVALID_SESSION_ID) {
Expand Down Expand Up @@ -440,4 +443,13 @@ public void handleError(Throwable t) {
log.info("Error sending fetch request {} to node {}: {}.", nextMetadata, node, t.toString());
nextMetadata = nextMetadata.nextCloseExisting();
}


public void maybeLogThrottledFetch(FetchResponse response, short version) {
int throttleTimeMs = response.throttleTimeMs();
if (throttleTimeMs > 0 && response.shouldClientThrottle(version)) {
log.info("FetchRequest to node {} is throttled for {} ms for session {}. Fetch session partitions={}", node,
throttleTimeMs, response.sessionId(), partitionsToLogString(sessionPartitions.keySet()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ public void onSuccess(ClientResponse resp) {
fetchTarget.id());
return;
}

handler.maybeLogThrottledFetch(response, resp.requestHeader().apiVersion());

if (!handler.handleResponse(response)) {
return;
}
Expand Down

0 comments on commit 456f47e

Please sign in to comment.