Skip to content

KIP-345 Consumer group static membership #2625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
8 changes: 4 additions & 4 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.client_async import KafkaClient, selectors
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError,
Expand Down Expand Up @@ -1316,7 +1316,7 @@ def _describe_consumer_groups_process_response(self, response):
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type)
if isinstance(group_information_field, Array):
member_information_list = []
member_schema = group_information_field.array_of
Expand All @@ -1325,9 +1325,9 @@ def _describe_consumer_groups_process_response(self, response):
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
if protocol_type_is_consumer:
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member))
else:
member_information.append(member)
member_info_tuple = MemberInformation._make(member_information)
Expand Down
24 changes: 24 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import copy
import logging
import re
import socket
import time

Expand Down Expand Up @@ -57,6 +58,14 @@ class KafkaConsumer(six.Iterator):
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str): A unique identifier of the consumer instance
provided by end user. Only non-empty strings are permitted. If set,
the consumer is treated as a static member, which means that only
one instance with this ID is allowed in the consumer group at any
time. This can be used in combination with a larger session timeout
to avoid group rebalances caused by transient unavailability (e.g.
process restarts). If not set, the consumer will join the group as
a dynamic member, which is the traditional behavior. Default: None
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable): Any callable that takes a
Expand Down Expand Up @@ -276,6 +285,7 @@ class KafkaConsumer(six.Iterator):
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'group_id': None,
'group_instance_id': None,
'key_deserializer': None,
'value_deserializer': None,
'enable_incremental_fetch_sessions': True,
Expand Down Expand Up @@ -408,6 +418,10 @@ def __init__(self, *topics, **configs):
"Request timeout (%s) must be larger than session timeout (%s)" %
(self.config['request_timeout_ms'], self.config['session_timeout_ms']))

if self.config['group_instance_id'] is not None:
if self.config['group_id'] is None:
raise KafkaConfigurationError("group_instance_id requires group_id")

self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, metrics=self._metrics, **self.config)
Expand All @@ -423,6 +437,16 @@ def __init__(self, *topics, **configs):
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)

def _validate_group_instance_id(self, group_instance_id):
if not group_instance_id or not isinstance(group_instance_id, str):
raise KafkaConfigurationError("group_instance_id must be non-empty string")
if group_instance_id in (".", ".."):
raise KafkaConfigurationError("group_instance_id cannot be \".\" or \"..\"")
if len(group_instance_id) > 249:
raise KafkaConfigurationError("group_instance_id can't be longer than 249 characters")
if not re.match(r'^[A-Za-z0-9\.\_\-]+$', group_instance_id):
raise KafkaConfigurationError("group_instance_id is illegal: it contains a character other than ASCII alphanumerics, '.', '_' and '-'")

def bootstrap_connected(self):
"""Return True if the bootstrap is connected."""
return self._client.bootstrap_connected()
Expand Down
5 changes: 3 additions & 2 deletions kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ def assign(self, cluster, members):

Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
members (dict of {member_id: Subscription}): decoded metadata
for each member in the group, including group_instance_id
when available.

Returns:
dict: {member_id: MemberAssignment}
Expand Down
27 changes: 16 additions & 11 deletions kafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import absolute_import

import collections
import itertools
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0

log = logging.getLogger(__name__)

Expand All @@ -32,45 +33,49 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
def assign(cls, cluster, group_subscriptions):
consumers_per_topic = collections.defaultdict(list)
for member, metadata in six.iteritems(member_metadata):
for topic in metadata.subscription:
consumers_per_topic[topic].append(member)
for member_id, subscription in six.iteritems(group_subscriptions):
for topic in subscription.topics:
consumers_per_topic[topic].append((subscription.group_instance_id, member_id))

# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)

for topic in consumers_per_topic:
# group by static members (True) v dynamic members (False)
grouped = {k: list(g) for k, g in itertools.groupby(consumers_per_topic[topic], key=lambda ids: ids[0] is not None)}
consumers_per_topic[topic] = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic

for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
continue
partitions = sorted(partitions)
consumers_for_topic.sort()

partitions_per_consumer = len(partitions) // len(consumers_for_topic)
consumers_with_extra = len(partitions) % len(consumers_for_topic)

for i, member in enumerate(consumers_for_topic):
for i, (_group_instance_id, member_id) in enumerate(consumers_for_topic):
start = partitions_per_consumer * i
start += min(i, consumers_with_extra)
length = partitions_per_consumer
if not i + 1 > consumers_with_extra:
length += 1
assignment[member][topic] = partitions[start:start+length]
assignment[member_id][topic] = partitions[start:start+length]

protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment):
Expand Down
25 changes: 15 additions & 10 deletions kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.structs import TopicPartition

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,10 +49,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
def assign(cls, cluster, group_subscriptions):
all_topics = set()
for metadata in six.itervalues(member_metadata):
all_topics.update(metadata.subscription)
for subscription in six.itervalues(group_subscriptions):
all_topics.update(subscription.topics)

all_topic_partitions = []
for topic in all_topics:
Expand All @@ -67,29 +67,34 @@ def assign(cls, cluster, member_metadata):
# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(lambda: collections.defaultdict(list))

member_iter = itertools.cycle(sorted(member_metadata.keys()))
# Sort static and dynamic members separately to maintain stable static assignments
ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in six.iteritems(group_subscriptions)]
grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)}
member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
member_iter = itertools.cycle(member_list)

for partition in all_topic_partitions:
member_id = next(member_iter)
_group_instance_id, member_id = next(member_iter)

# Because we constructed all_topic_partitions from the set of
# member subscribed topics, we should be safe assuming that
# each topic in all_topic_partitions is in at least one member
# subscription; otherwise this could yield an infinite loop
while partition.topic not in member_metadata[member_id].subscription:
while partition.topic not in group_subscriptions[member_id].topics:
member_id = next(member_iter)
assignment[member_id][partition.topic].append(partition.partition)

protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment):
Expand Down
17 changes: 9 additions & 8 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.coordinator.protocol import Schema
from kafka.protocol.struct import Struct
from kafka.protocol.types import String, Array, Int32
Expand Down Expand Up @@ -66,6 +66,7 @@ class StickyAssignorUserDataV1(Struct):

class StickyAssignmentExecutor:
def __init__(self, cluster, members):
# a mapping of member_id => StickyAssignorMemberMetadataV1
self.members = members
# a mapping between consumers and their assigned partitions that is updated during assignment procedure
self.current_assignment = defaultdict(list)
Expand Down Expand Up @@ -603,7 +604,7 @@ def assign(cls, cluster, members):

assignment = {}
for member_id in members:
assignment[member_id] = ConsumerProtocolMemberAssignment(
assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
cls.version, sorted(executor.get_final_assignment(member_id)), b''
)
return assignment
Expand All @@ -625,24 +626,24 @@ def parse_member_metadata(cls, metadata):
user_data = metadata.user_data
if not user_data:
return StickyAssignorMemberMetadataV1(
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
)

try:
decoded_user_data = StickyAssignorUserDataV1.decode(user_data)
except Exception as e:
except Exception:
# ignore the consumer's previous assignment if it cannot be parsed
log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args
log.exception("Could not parse member data")
return StickyAssignorMemberMetadataV1(
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
)

member_partitions = []
for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member
member_partitions.extend([TopicPartition(topic, partition) for partition in partitions])
return StickyAssignorMemberMetadataV1(
# pylint: disable=no-member
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.topics
)

@classmethod
Expand All @@ -661,7 +662,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)

@classmethod
def on_assignment(cls, assignment):
Expand Down
Loading
Loading