Skip to content

Commit

Permalink
Eliminate sharding.Ring interface (#450)
Browse files Browse the repository at this point in the history
  • Loading branch information
timebertt authored Feb 2, 2025
1 parent 450e5be commit ef2fa92
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 86 deletions.
5 changes: 1 addition & 4 deletions pkg/controller/controllerring/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,7 @@ func (r *Reconciler) reconcileWebhooks(ctx context.Context, controllerRing *shar
}

// add ring-specific path to webhook client config
webhookPath, err := sharder.WebhookPathFor(controllerRing)
if err != nil {
return err
}
webhookPath := sharder.WebhookPathForControllerRing(controllerRing)

if service := webhook.ClientConfig.Service; service != nil {
service.Path = ptr.To(path.Join(ptr.Deref(service.Path, ""), webhookPath))
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/sharder/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *Reconciler) resyncResource(
ctx context.Context,
log logr.Logger,
gr metav1.GroupResource,
ring sharding.Ring,
ring *shardingv1alpha1.ControllerRing,
namespaces sets.Set[string],
hashRing *consistenthash.Ring,
shards leases.Shards,
Expand Down Expand Up @@ -194,7 +194,7 @@ func (r *Reconciler) resyncObject(
log logr.Logger,
gr metav1.GroupResource,
obj *metav1.PartialObjectMetadata,
ring sharding.Ring,
ring *shardingv1alpha1.ControllerRing,
hashRing *consistenthash.Ring,
shards leases.Shards,
controlled bool,
Expand Down Expand Up @@ -243,7 +243,7 @@ func (r *Reconciler) resyncObject(
}

shardingmetrics.DrainsTotal.WithLabelValues(
ring.GetName(), gr.Group, gr.Resource,
ring.Name, gr.Group, gr.Resource,
).Inc()

// object will go through the sharder webhook when shard removes the drain label, which will perform the assignment
Expand All @@ -264,7 +264,7 @@ func (r *Reconciler) resyncObject(
}

shardingmetrics.MovementsTotal.WithLabelValues(
ring.GetName(), gr.Group, gr.Resource,
ring.Name, gr.Group, gr.Resource,
).Inc()

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/sharding/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
)

// KeyFuncForResource returns the key function that maps the given resource or its controller dependening on whether
// the resource is listed as a resource or controlled resource in the given ring.
func KeyFuncForResource(gr metav1.GroupResource, ring Ring) (KeyFunc, error) {
func KeyFuncForResource(gr metav1.GroupResource, ring *shardingv1alpha1.ControllerRing) (KeyFunc, error) {
ringResources := sets.New[metav1.GroupResource]()
controlledResources := sets.New[metav1.GroupResource]()

Expand Down
33 changes: 0 additions & 33 deletions pkg/sharding/ring.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/sharding/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

coordinationv1 "k8s.io/api/coordination/v1"

"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/consistenthash"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/leases"
shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics"
Expand All @@ -32,13 +32,13 @@ import (
// This is a central function in the sharding implementation bringing together the leases package with the
// consistenthash package.
// In short, it determines the subset of available shards and constructs a new consistenthash.Ring with it.
func FromLeases(ringObj sharding.Ring, leaseList *coordinationv1.LeaseList, now time.Time) (*consistenthash.Ring, leases.Shards) {
func FromLeases(controllerRing *shardingv1alpha1.ControllerRing, leaseList *coordinationv1.LeaseList, now time.Time) (*consistenthash.Ring, leases.Shards) {
// determine ready shards and calculate hash ring
shards := leases.ToShards(leaseList.Items, now)
availableShards := shards.AvailableShards().IDs()
ring := consistenthash.New(nil, 0, availableShards...)

shardingmetrics.RingCalculationsTotal.WithLabelValues(ringObj.GetName()).Inc()
shardingmetrics.RingCalculationsTotal.WithLabelValues(controllerRing.Name).Inc()

return ring, shards
}
37 changes: 10 additions & 27 deletions pkg/webhook/sharder/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
)

const (
// HandlerName is the name of the webhook handler.
HandlerName = "sharder"
// WebhookPathPrefix is the path prefix at which the handler should be registered.
WebhookPathPrefix = "/webhooks/sharder/"
)
Expand All @@ -58,41 +54,28 @@ func (h *Handler) AddToManager(mgr manager.Manager) error {

const pathControllerRing = "controllerring"

// WebhookPathFor returns the webhook handler path that should be used for implementing the given ring object.
// It is the reverse of RingForWebhookPath.
func WebhookPathFor(obj client.Object) (string, error) {
switch obj.(type) {
case *shardingv1alpha1.ControllerRing:
return path.Join(WebhookPathPrefix, pathControllerRing, obj.GetName()), nil
default:
return "", fmt.Errorf("unexpected kind %T", obj)
}
// WebhookPathForControllerRing returns the webhook handler path that should be used for implementing the given
// ControllerRing. It is the reverse of ControllerRingForWebhookPath.
func WebhookPathForControllerRing(ring *shardingv1alpha1.ControllerRing) string {
return path.Join(WebhookPathPrefix, pathControllerRing, ring.Name)
}

// RingForWebhookPath returns the ring object that is associated with the given webhook handler path.
// It is the reverse of WebhookPathFor.
func RingForWebhookPath(requestPath string) (sharding.Ring, error) {
// ControllerRingForWebhookPath returns the ControllerRing that is associated with the given webhook handler path.
// It is the reverse of WebhookPathForControllerRing.
func ControllerRingForWebhookPath(requestPath string) (*shardingv1alpha1.ControllerRing, error) {
if !strings.HasPrefix(requestPath, WebhookPathPrefix) {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}

parts := strings.SplitN(strings.TrimPrefix(requestPath, WebhookPathPrefix), "/", 3)
if len(parts) < 2 {
if len(parts) != 2 {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}

var ring sharding.Ring
switch parts[0] {
case pathControllerRing:
if len(parts) != 2 {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}
ring = &shardingv1alpha1.ControllerRing{ObjectMeta: metav1.ObjectMeta{Name: parts[1]}}
default:
if parts[0] != pathControllerRing {
return nil, fmt.Errorf("unexpected request path: %s", requestPath)
}

return ring, nil
return &shardingv1alpha1.ControllerRing{ObjectMeta: metav1.ObjectMeta{Name: parts[1]}}, nil
}

type ctxKey int
Expand Down
29 changes: 15 additions & 14 deletions pkg/webhook/sharder/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding"
shardingmetrics "github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/metrics"
"github.com/timebertt/kubernetes-controller-sharding/pkg/sharding/ring"
Expand All @@ -46,9 +47,9 @@ type Handler struct {
func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.Response {
log := logf.FromContext(ctx)

ringObj, err := RingForRequest(ctx, h.Reader)
controllerRing, err := ControllerRingForRequest(ctx, h.Reader)
if err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error determining ring for request: %w", err))
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error determining ControllerRing for request: %w", err))
}

// Unfortunately, admission.Decoder / runtime.Decoder can't handle decoding into PartialObjectMetadata.
Expand All @@ -58,7 +59,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error decoding object: %w", err))
}

labelShard := ringObj.LabelShard()
labelShard := controllerRing.LabelShard()

// Don't touch labels that the object already has, we can't simply reassign it because the active shard might still
// be working on it.
Expand All @@ -69,7 +70,7 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R
keyFunc, err := sharding.KeyFuncForResource(metav1.GroupResource{
Group: req.Resource.Group,
Resource: req.Resource.Resource,
}, ringObj)
}, controllerRing)
if err != nil {
return admission.Errored(http.StatusBadRequest, fmt.Errorf("error deteriming hash key func for object: %w", err))
}
Expand All @@ -84,15 +85,15 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R

// collect list of shards in the ring
leaseList := &coordinationv1.LeaseList{}
if err := h.Reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: ringObj.LeaseSelector()}); err != nil {
if err := h.Reader.List(ctx, leaseList, client.MatchingLabelsSelector{Selector: controllerRing.LeaseSelector()}); err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error listing Leases for ControllerRing: %w", err))
}

// get ring from cache and hash the object onto the ring
hashRing, _ := ring.FromLeases(ringObj, leaseList, h.Clock.Now())
hashRing, _ := ring.FromLeases(controllerRing, leaseList, h.Clock.Now())
shard := hashRing.Hash(key)

log.V(1).Info("Assigning object for ring", "ring", client.ObjectKeyFromObject(ringObj), "shard", shard)
log.V(1).Info("Assigning object for ControllerRing", "controllerRing", client.ObjectKeyFromObject(controllerRing), "shard", shard)

patches := make([]jsonpatch.JsonPatchOperation, 0, 2)
if obj.Labels == nil {
Expand All @@ -103,30 +104,30 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R

if !ptr.Deref(req.DryRun, false) {
shardingmetrics.AssignmentsTotal.WithLabelValues(
ringObj.GetName(), req.Resource.Group, req.Resource.Resource,
controllerRing.Name, req.Resource.Group, req.Resource.Resource,
).Inc()
}

return admission.Patched("assigning object", patches...)
}

// RingForRequest returns the Ring object matching the requests' path.
func RingForRequest(ctx context.Context, c client.Reader) (sharding.Ring, error) {
// ControllerRingForRequest returns the Ring object matching the requests' path.
func ControllerRingForRequest(ctx context.Context, c client.Reader) (*shardingv1alpha1.ControllerRing, error) {
requestPath, err := RequestPathFromContext(ctx)
if err != nil {
return nil, err
}

ring, err := RingForWebhookPath(requestPath)
controllerRing, err := ControllerRingForWebhookPath(requestPath)
if err != nil {
return nil, err
}

if err := c.Get(ctx, client.ObjectKeyFromObject(ring), ring); err != nil {
return nil, fmt.Errorf("error getting ring: %w", err)
if err := c.Get(ctx, client.ObjectKeyFromObject(controllerRing), controllerRing); err != nil {
return nil, fmt.Errorf("error getting ControllerRing: %w", err)
}

return ring, nil
return controllerRing, nil
}

// rfc6901Encoder can escape / characters in label keys for inclusion in JSON patch paths.
Expand Down

0 comments on commit ef2fa92

Please sign in to comment.