diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9abf15e9b..070aac857 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -219,6 +219,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + coordinator_not_ready_retry_timeout_ms (int): The timeout used to detect + that the Kafka coordinator is not available. If 'None', the default + behavior of polling indefinitely would be kept. Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -288,6 +291,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', + 'coordinator_not_ready_retry_timeout_ms': None, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b1775670b..5d9799620 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -85,6 +85,7 @@ class BaseCoordinator(object): 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, + 'coordinator_not_ready_retry_timeout_ms': None, 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), @@ -98,7 +99,7 @@ def __init__(self, client, metrics, **configs): partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -108,6 +109,10 @@ def __init__(self, client, metrics, **configs): should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 + coordinator_not_ready_retry_timeout_ms (int): The timeout used to + detect that the Kafka coordinator is not available. If 'None', + the default behavior of polling indefinitely would be kept. + Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. """ @@ -240,10 +245,12 @@ def coordinator(self): else: return self.coordinator_id - def ensure_coordinator_ready(self): + def ensure_coordinator_ready(self, timeout_ms=None): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + retry_timeout_ms = timeout_ms or self.config['coordinator_not_ready_retry_timeout_ms'] + retry_start_time_in_secs = time.time() with self._lock: while self.coordinator_unknown(): @@ -261,7 +268,15 @@ def ensure_coordinator_ready(self): if future.failed(): if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): + if retry_timeout_ms is not None and isinstance( + future.exception, (Errors.NodeNotReadyError, Errors.NoBrokersAvailable)): + remaining_retry_timeout_ms = retry_timeout_ms - ( + time.time() - retry_start_time_in_secs) * 1000 + if remaining_retry_timeout_ms <= 0: + raise future.exception # pylint: disable-msg=raising-bad-type + self._client.poll(timeout_ms=min( + self.config['retry_backoff_ms'], remaining_retry_timeout_ms)) + elif getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cb1de0d2e..ffbc72fea 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -37,7 +37,8 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + 'metric_group_prefix': 'consumer', + 'coordinator_not_ready_retry_timeout_ms': None } def __init__(self, client, subscription, metrics, **configs): @@ -68,13 +69,17 @@ def __init__(self, client, subscription, metrics, **configs): adjusted even lower to control the expected time for normal rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. exclude_internal_topics (bool): Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True + coordinator_not_ready_retry_timeout_ms (int): The timeout used to + detect that the Kafka coordinator is not available. If 'None', + the default behavior of polling indefinitely would be kept. + Default: None. """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs)