Skip to content

Commit

Permalink
Merge pull request #290 from lovoo/copartitioning-tolerate-topic-diff
Browse files Browse the repository at this point in the history
The default copartition strategy allows members to have different sets of topics.
  • Loading branch information
frairon authored Dec 7, 2020
2 parents 9790d84 + a1ac26f commit 8a87b5a
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
40 changes: 35 additions & 5 deletions copartition_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,39 @@ import (
"github.com/Shopify/sarama"
)

// CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning
// when consuming multiple input topics with multiple processor instances
var CopartitioningStrategy = new(copartitioningStrategy)
var (
// CopartitioningStrategy is the rebalance strategy necessary to guarantee the copartitioning
// when consuming multiple input topics with multiple processor instances.
// This strategy tolerates different sets of topics per member of consumer group to allow
// rolling upgrades of processors.
//
// Note that the topic inconcistency needs to be only temporarily, otherwise not all topic partitions will be consumed as in
// the following example:
// Assume having topics X and Y, each with partitions [0,1,2]
// MemberA requests topic X
// MemberB requests topics X and Y, because topic Y was newly added to the processor.
//
// Then the strategy will plan as follows:
// MemberA: X: [0,1]
// MemberB: X: [2], Y:[2]
//
// That means partitions [0,1] from topic Y are not being consumed.
// So the assumption is that memberA will be redeployed so that after a second rebalance
// both members consume both topics and all partitions.
//
// If you do not use rolling upgrades, i.e. replace all members of a group simultaneously, it is
// safe to use the StrictCopartitioningStrategy
CopartitioningStrategy = new(copartitioningStrategy)

// StrictCopartitioningStrategy behaves like the copartitioning strategy but it will fail if two members of a consumer group
// request a different set of topics, which might indicate a bug or a reused consumer group name.
StrictCopartitioningStrategy = &copartitioningStrategy{
failOnInconsistentTopics: true,
}
)

type copartitioningStrategy struct {
failOnInconsistentTopics bool
}

// Name implements BalanceStrategy.
Expand Down Expand Up @@ -42,7 +70,9 @@ func (s *copartitioningStrategy) Plan(members map[string]sarama.ConsumerGroupMem
// (2) collect all members and check they consume the same topics
for memberID, meta := range members {
if !s.topicsEqual(allTopics, meta.Topics) {
return nil, fmt.Errorf("Error balancing. Not all members request the same list of topics. A group-name clash might be the reason: %#v", members)
if s.failOnInconsistentTopics {
return nil, fmt.Errorf("Error balancing. Not all members request the same list of topics. A group-name clash might be the reason: %#v", members)
}
}
allMembers = append(allMembers, memberID)
}
Expand All @@ -60,7 +90,7 @@ func (s *copartitioningStrategy) Plan(members map[string]sarama.ConsumerGroupMem
pos := float64(idx)
min := int(math.Floor(pos*step + 0.5))
max := int(math.Floor((pos+1)*step + 0.5))
for _, topic := range allTopics {
for _, topic := range members[memberID].Topics {
plan.Add(memberID, topic, allPartitions[min:max]...)
}
}
Expand Down
46 changes: 38 additions & 8 deletions copartition_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ func TestCopartitioningStrategy(t *testing.T) {
})

for _, ttest := range []struct {
name string
members map[string]sarama.ConsumerGroupMemberMetadata
topics map[string][]int32
hasError bool
expected sarama.BalanceStrategyPlan
name string
members map[string]sarama.ConsumerGroupMemberMetadata
topics map[string][]int32
hasError bool
useStrict bool
expected sarama.BalanceStrategyPlan
}{
{
name: "inconsistent-topic-members",
Expand All @@ -29,7 +30,8 @@ func TestCopartitioningStrategy(t *testing.T) {
topics: map[string][]int32{
"T2": {0, 1, 2},
},
hasError: true,
hasError: true,
useStrict: true,
},
{
name: "not-copartitioned",
Expand All @@ -54,7 +56,29 @@ func TestCopartitioningStrategy(t *testing.T) {
"T1": {0, 1, 2},
"T2": {0, 1, 2},
},
hasError: true,
hasError: true,
useStrict: true,
},
{
name: "tolerate-inconsistent-members",
members: map[string]sarama.ConsumerGroupMemberMetadata{
"M1": {Topics: []string{"T1", "T2"}},
"M2": {Topics: []string{"T2"}},
},
// topics are inconsistent with members
topics: map[string][]int32{
"T1": {0, 1, 2},
"T2": {0, 1, 2},
},
expected: sarama.BalanceStrategyPlan{
"M1": map[string][]int32{
"T1": {0, 1},
"T2": {0, 1},
},
"M2": map[string][]int32{
"T2": {2},
},
},
},
{
name: "single-member",
Expand Down Expand Up @@ -123,7 +147,13 @@ func TestCopartitioningStrategy(t *testing.T) {
},
} {
t.Run(ttest.name, func(t *testing.T) {
plan, err := CopartitioningStrategy.Plan(ttest.members, ttest.topics)

var strategy sarama.BalanceStrategy = CopartitioningStrategy
if ttest.useStrict {
strategy = StrictCopartitioningStrategy
}

plan, err := strategy.Plan(ttest.members, ttest.topics)
test.AssertEqual(t, err != nil, ttest.hasError)
if err == nil {
test.AssertTrue(t, reflect.DeepEqual(ttest.expected, plan), "expected", ttest.expected, "actual", plan)
Expand Down

0 comments on commit 8a87b5a

Please sign in to comment.