From 5dce3cc8bcd9f6a1ec4318684a162c7789ae104d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 17 Jun 2024 15:11:44 +0200 Subject: [PATCH 1/2] Allow delayed downscale of subset of pods (#156) * When checking downscale delay in the statefulset allow downscale if some pods at the end of statefulset are ready to be downscaled. * CHANGELOG.md --- CHANGELOG.md | 1 + pkg/controller/controller_test.go | 30 ++++++++- pkg/controller/custom_resource_replicas.go | 30 +++++---- pkg/controller/delay.go | 72 ++++++++++++++-------- 4 files changed, 93 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a27078cb1..b4690e7c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [ENHANCEMENT] Include unique IDs of webhook requests in logs for easier debugging. #150 * [ENHANCEMENT] Include k8s operation username in request debug logs. #152 * [ENHANCEMENT] `rollout-max-unavailable` annotation can now be specified as percentage, e.g.: `rollout-max-unavailable: 25%`. Resulting value is computed as `floor(replicas * percentage)`, but is never less than 1. #153 +* [ENHANCEMENT] Delayed downscale of statefulset can now reduce replicas earlier, if subset of pods at the end of statefulset have already reached their delay. #156 * [BUGFIX] Fix a mangled error log in controller's delayed downscale code. #154 ## v0.16.0 diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 265d85435..e7776c9bd 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -655,15 +655,15 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) }, }, - "scale down is not allowed if delay time was not reached on one pod": { + "scale down is not allowed if delay time was not reached on one pod at the end of statefulset": { statefulSets: []runtime.Object{ mockStatefulSet("ingester-zone-b", withReplicas(5, 5), withMirrorReplicasAnnotations("test", customResourceGVK), withDelayedDownscaleAnnotations(time.Hour, "http://pod/prepare-delayed-downscale")), }, httpResponses: map[string]httpResponse{ - "POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-70*time.Minute).Unix())}, - "POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Unix())}, + "POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Unix())}, + "POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-70*time.Minute).Unix())}, }, customResourceScaleSpecReplicas: 3, // We want to downscale to 3 replicas only. customResourceScaleStatusReplicas: 5, @@ -676,6 +676,30 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) }, }, + "limited scale down by 2 replicas is allowed if delay time was reached on some pods at the end of statefulset": { + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-b", withReplicas(5, 5), + withMirrorReplicasAnnotations("test", customResourceGVK), + withDelayedDownscaleAnnotations(time.Hour, "http://pod/prepare-delayed-downscale")), + }, + httpResponses: map[string]httpResponse{ + "POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-70*time.Minute).Unix())}, + "POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-75*time.Minute).Unix())}, + "POST http://ingester-zone-b-2.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-30*time.Minute).Unix())}, // cannot be scaled down yet, as 1h has not elapsed + "POST http://ingester-zone-b-1.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale": {statusCode: 200, body: fmt.Sprintf(`{"timestamp": %d}`, now.Add(-75*time.Minute).Unix())}, + }, + customResourceScaleSpecReplicas: 1, // We want to downscale to single replica + customResourceScaleStatusReplicas: 5, + expectedPatchedSets: map[string][]string{"ingester-zone-b": {`{"spec":{"replicas":3}}`}}, // Scaledown by 2 replicas (from 5 to 3) is allowed. + expectedHttpRequests: []string{ + "DELETE http://ingester-zone-b-0.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale", + "POST http://ingester-zone-b-1.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale", + "POST http://ingester-zone-b-2.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale", + "POST http://ingester-zone-b-3.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale", + "POST http://ingester-zone-b-4.ingester-zone-b.test.svc.cluster.local./prepare-delayed-downscale", + }, + }, + "scale down is not allowed, if POST returns non-200 HTTP status code, even if returned timestamps are outside of delay": { statefulSets: []runtime.Object{ mockStatefulSet("ingester-zone-b", withReplicas(5, 5), diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index 127513c39..bb61d4142 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -34,10 +34,10 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx referenceResource := fmt.Sprintf("%s/%s", referenceGVR.Resource, referenceName) - desiredReplicas := scaleObj.Spec.Replicas - if currentReplicas == desiredReplicas { - updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, desiredReplicas) - cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, desiredReplicas) + referenceResourceDesiredReplicas := scaleObj.Spec.Replicas + if currentReplicas == referenceResourceDesiredReplicas { + updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) + cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) // No change in the number of replicas: don't log because this will be the result most of the time. continue } @@ -45,26 +45,34 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx // We're going to change number of replicas on the statefulset. // If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds, // continue with downscaling or upscaling. - if err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, desiredReplicas); err != nil { - level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, "desiredReplicas", desiredReplicas, "err", err) + desiredReplicas, err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) + if err != nil { + level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", + "group", groupName, + "name", sts.GetName(), + "currentReplicas", currentReplicas, + "referenceResourceDesiredReplicas", referenceResourceDesiredReplicas, + "err", err, + ) updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, currentReplicas) // If delay has not been reached, we can check next statefulset. continue } - direction := "" + logMsg := "" if desiredReplicas > currentReplicas { - direction = "up" + logMsg = "scaling up statefulset to match replicas in the reference resource" } else if desiredReplicas < currentReplicas { - direction = "down" + logMsg = "scaling down statefulset to computed desired replicas, based on replicas in the reference resource and elapsed downscale delays" } - level.Info(c.logger).Log("msg", fmt.Sprintf("scaling %s statefulset to match reference resource", direction), + level.Info(c.logger).Log("msg", logMsg, "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, - "desiredReplicas", desiredReplicas, + "referenceResourceDesiredReplicas", referenceResourceDesiredReplicas, + "computedDesiredReplicas", desiredReplicas, "referenceResource", referenceResource, ) diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index d959e3da2..da671a0f0 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -36,21 +36,27 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, callCancelDelayedDownscale(ctx, logger, httpClient, endpoints) } -func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) error { +// Checks if downscale delay has been reached on replicas in [desiredReplicas, currentReplicas) range. +// If there is a range of replicas at the end of statefulset for which delay has been reached, this function +// returns updated desired replicas that statefulset can be scaled to. +func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) (updatedDesiredReplicas int32, _ error) { if currentReplicas == desiredReplicas { // should not happen - return nil + return currentReplicas, nil } delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations()) - if delay == 0 || prepareURL == nil || err != nil { - return err + if err != nil { + return currentReplicas, err + } + if delay == 0 || prepareURL == nil { + return desiredReplicas, err } if desiredReplicas >= currentReplicas { callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(currentReplicas), prepareURL)) // Proceed even if calling cancel of delayed downscale fails. We call cancellation repeatedly, so it will happen during next reconcile. - return nil + return desiredReplicas, nil } { @@ -61,19 +67,34 @@ func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulS // Replicas in [desired, current) interval are going to be stopped. downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), int(desiredReplicas), int(currentReplicas), prepareURL) - maxPrepareTime, err := callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx, logger, httpClient, downscaleEndpoints) + elapsedTimeSinceDownscaleInitiated, err := callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx, logger, httpClient, downscaleEndpoints) if err != nil { - return fmt.Errorf("failed prepare pods for delayed downscale: %v", err) + return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err) + } + + // Find how many pods from the end of statefulset we can already scale down + allowedDesiredReplicas := currentReplicas + for replica := currentReplicas - 1; replica >= desiredReplicas; replica-- { + elapsed, ok := elapsedTimeSinceDownscaleInitiated[int(replica)] + if !ok { + break + } + + if elapsed < delay { + break + } + + // We can scale down this replica + allowedDesiredReplicas-- } - elapsedSinceMaxTime := time.Since(maxPrepareTime) - if elapsedSinceMaxTime < delay { - return fmt.Errorf("configured downscale delay %v has not been reached for all pods. elapsed time: %v", delay, elapsedSinceMaxTime) + if allowedDesiredReplicas == currentReplicas { + return currentReplicas, fmt.Errorf("configured downscale delay %v has not been reached for any pods at the end of statefulset replicas range", delay) } - // We can proceed with downscale! - level.Info(logger).Log("msg", "downscale delay has been reached on all downscaled pods", "name", sts.GetName(), "delay", delay, "elapsed", elapsedSinceMaxTime) - return nil + // We can proceed with downscale on at least one pod. + level.Info(logger).Log("msg", "downscale delay has been reached on some downscaled pods", "name", sts.GetName(), "delay", delay, "originalDesiredReplicas", desiredReplicas, "allowedDesiredReplicas", allowedDesiredReplicas) + return allowedDesiredReplicas, nil } func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) { @@ -106,7 +127,7 @@ type endpoint struct { namespace string podName string url url.URL - index int + replica int } // Create prepare-downscale endpoints for pods with index in [from, to) range. URL is fully reused except for host, which is replaced with pod's FQDN. @@ -122,7 +143,7 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int ep := endpoint{ namespace: namespace, podName: fmt.Sprintf("%v-%v", serviceName, index), - index: index, + replica: index, } ep.url = *url @@ -134,14 +155,14 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int return eps } -func callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (time.Time, error) { +func callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (map[int]time.Duration, error) { if len(endpoints) == 0 { - return time.Time{}, fmt.Errorf("no endpoints") + return nil, fmt.Errorf("no endpoints") } var ( - maxTimeMu sync.Mutex - maxTime time.Time + timestampsMu sync.Mutex + timestamps = map[int]time.Duration{} ) type expectedResponse struct { @@ -195,19 +216,18 @@ func callPrepareDownscaleAndReturnMaxPrepareTimestamp(ctx context.Context, logge } t := time.Unix(r.Timestamp, 0) + elapsed := time.Since(t) - maxTimeMu.Lock() - if t.After(maxTime) { - maxTime = t - } - maxTimeMu.Unlock() + timestampsMu.Lock() + timestamps[ep.replica] = elapsed + timestampsMu.Unlock() - level.Debug(epLogger).Log("msg", "HTTP POST request to endpoint succeded", "timestamp", t.UTC().Format(time.RFC3339)) + level.Debug(epLogger).Log("msg", "HTTP POST request to endpoint succeded", "timestamp", t.UTC().Format(time.RFC3339), "elapsed", elapsed) return nil }) } err := g.Wait() - return maxTime, err + return timestamps, err } func callCancelDelayedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) { From f5bef38c0daa9335c7489c3ac2371e4fbde1595e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 17 Jun 2024 15:56:53 +0200 Subject: [PATCH 2/2] Update changelog for v0.17.0. (#157) --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4690e7c9..f03845fe5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main / unreleased +## v0.17.0 + * [CHANGE] The docker base images are now based off distroless images rather than Alpine. #149 * The standard base image is now `gcr.io/distroless/static-debian12:nonroot`. * The boringcrypto base image is now `gcr.io/distroless/base-nossl-debian12:nonroot` (for glibc).