Skip to content

Commit

Permalink
Revert "Use hash.BucketSet for standard leader election mode (#1490)" (
Browse files Browse the repository at this point in the history
…#1518)

This reverts commit 8d80e63.
  • Loading branch information
vagababov authored Jul 18, 2020
1 parent 25be382 commit 6cd4568
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions leaderelection/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package leaderelection
import (
"context"
"fmt"
"hash/fnv"
"strings"
"sync"

Expand Down Expand Up @@ -81,11 +82,11 @@ type Elector interface {

// BuildElector builds a leaderelection.LeaderElector for the named LeaderAware
// reconciler using a builder added to the context via WithStandardLeaderElectorBuilder.
func BuildElector(ctx context.Context, la reconciler.LeaderAware, queueName string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) {
func BuildElector(ctx context.Context, la reconciler.LeaderAware, name string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) {
if val := ctx.Value(builderKey{}); val != nil {
switch builder := val.(type) {
case *standardBuilder:
return builder.buildElector(ctx, la, queueName, enq)
return builder.buildElector(ctx, la, name, enq)
case *statefulSetBuilder:
return builder.buildElector(ctx, la, enq)
}
Expand All @@ -106,17 +107,24 @@ type standardBuilder struct {
}

func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.LeaderAware,
queueName string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) {
name string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) {
logger := logging.FromContext(ctx)

id, err := UniqueID()
if err != nil {
return nil, err
}

bkts := newStandardBuckets(queueName, b.lec)
electors := make([]Elector, 0, b.lec.Buckets)
for _, bkt := range bkts {
buckets := make([]Elector, 0, b.lec.Buckets)
for i := uint32(0); i < b.lec.Buckets; i++ {
bkt := &bucket{
// The resource name is the lowercase:
// {component}.{workqueue}.{index}-of-{total}
name: strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", b.lec.Component, name, i, b.lec.Buckets)),
index: i,
total: b.lec.Buckets,
}

rl, err := resourcelock.New(KnativeResourceLock,
system.Namespace(), // use namespace we are running in
bkt.Name(),
Expand Down Expand Up @@ -160,27 +168,9 @@ func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.Leader
// if lec.WatchDog != nil {
// lec.WatchDog.SetLeaderElection(le)
// }
electors = append(electors, &runUntilCancelled{Elector: le})
}
return &runAll{les: electors}, nil
}

func newStandardBuckets(queueName string, cc ComponentConfig) []reconciler.Bucket {
names := make(sets.String, cc.Buckets)
for i := uint32(0); i < cc.Buckets; i++ {
names.Insert(standardBucketName(i, queueName, cc))
buckets = append(buckets, &runUntilCancelled{Elector: le})
}
bs := hash.NewBucketSet(names)

bkts := make([]reconciler.Bucket, 0, cc.Buckets)
for name := range names {
bkts = append(bkts, hash.NewBucket(name, bs))
}
return bkts
}

func standardBucketName(ordinal uint32, queueName string, cc ComponentConfig) string {
return strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", cc.Component, queueName, ordinal, cc.Buckets))
return &runAll{les: buckets}, nil
}

type statefulSetBuilder struct {
Expand Down Expand Up @@ -279,3 +269,26 @@ func (ruc *runUntilCancelled) Run(ctx context.Context) {
}
}
}

type bucket struct {
name string

// We are bucket {index} of {total}
index uint32
total uint32
}

var _ reconciler.Bucket = (*bucket)(nil)

// Name implements reconciler.Bucket
func (b *bucket) Name() string {
return b.name
}

// Has implements reconciler.Bucket
func (b *bucket) Has(nn types.NamespacedName) bool {
h := fnv.New32a()
h.Write([]byte(nn.Namespace + "." + nn.Name))
ii := h.Sum32() % b.total
return b.index == ii
}

0 comments on commit 6cd4568

Please sign in to comment.