From 7e2f9e70df38e88211b318463b0fbd5002367974 Mon Sep 17 00:00:00 2001 From: DhruvaPatil98 Date: Tue, 25 Feb 2020 10:53:31 +0530 Subject: [PATCH] Fix keyerror #19 --- aiokafka/consumer/fetcher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 1515356..943e62a 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -1079,7 +1079,7 @@ async def fetched_records(self, partitions, timeout=0, max_records=None): if not self._subscriptions.is_assigned(tp): del self._records[tp] continue - res_or_error = self._records[tp] + res_or_error = self._records.get(tp) if type(res_or_error) == FetchResult: records = res_or_error.getall(max_records) if not res_or_error.has_more(): @@ -1101,9 +1101,9 @@ async def fetched_records(self, partitions, timeout=0, max_records=None): return drained else: # Remove error, so we can fetch on partition again - del self._records[tp] + self._records.pop(tp, None) self._notify(self._wait_consume_future) - res_or_error.check_raise() + getattr(res_or_error, 'check_raise', lambda: None)() if drained or not timeout: return drained