diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 1515356..943e62a 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -1079,7 +1079,7 @@ async def fetched_records(self, partitions, timeout=0, max_records=None): if not self._subscriptions.is_assigned(tp): del self._records[tp] continue - res_or_error = self._records[tp] + res_or_error = self._records.get(tp) if type(res_or_error) == FetchResult: records = res_or_error.getall(max_records) if not res_or_error.has_more(): @@ -1101,9 +1101,9 @@ async def fetched_records(self, partitions, timeout=0, max_records=None): return drained else: # Remove error, so we can fetch on partition again - del self._records[tp] + self._records.pop(tp, None) self._notify(self._wait_consume_future) - res_or_error.check_raise() + getattr(res_or_error, 'check_raise', lambda: None)() if drained or not timeout: return drained