From 2f93470c6c5c50a49167b12ff4e1777aabc45836 Mon Sep 17 00:00:00 2001 From: "misha.gavela" Date: Wed, 25 Nov 2020 23:41:34 +0200 Subject: [PATCH] feat: logged cancelled exception as info messages we manual interrupt our connection so all inflight request which was canceled by this reason shouldn't log it as an error --- kafka/consumer/fetcher.py | 15 ++++++++++++--- test/test_fetcher.py | 4 ++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7ff9daf7b..88858d324 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -124,8 +124,8 @@ def send_fetches(self): if self._client.ready(node_id): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request, wakeup=False) - future.add_callback(self._handle_fetch_response, request, time.time()) - future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) + future.add_callback(self._handle_fetch_success, request, time.time()) + future.add_errback(self._handle_fetch_error, node_id) futures.append(future) self._fetch_futures.extend(futures) self._clean_done_fetch_futures() @@ -747,7 +747,7 @@ def _create_fetch_requests(self): partition_data) return requests - def _handle_fetch_response(self, request, send_time, response): + def _handle_fetch_success(self, request, send_time, response): """The callback for fetch completion""" fetch_offsets = {} for topic, partitions in request.topics: @@ -778,6 +778,15 @@ def _handle_fetch_response(self, request, send_time, response): self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + def _handle_fetch_error(self, node_id, err): + """The callback for fetch error""" + msg = 'Fetch to node %s failed: %s' % (node_id, err) + + if isinstance(err, Errors.Cancelled): + log.info(msg) + else: + log.error(msg) + def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition fetch_offset = completed_fetch.fetched_offset diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 697f8be1f..556d24e93 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -373,8 +373,8 @@ def test_fetched_records(fetcher, topic, mocker): 1, ), ]) -def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions): - fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response) +def test__handle_fetch_success(fetcher, fetch_request, fetch_response, num_partitions): + fetcher._handle_fetch_success(fetch_request, time.time(), fetch_response) assert len(fetcher._completed_fetches) == num_partitions