Skip to content

Commit

Permalink
PVC auto scale on a per ordinal basis (#380)
Browse files Browse the repository at this point in the history
* PVC auto scale on a per ordinal basis

* don't try to apply patches below current pvc size

* deep copy resources

* don't include datasource in diff
  • Loading branch information
agouin authored Oct 27, 2023
1 parent 4c4be57 commit 41a210e
Show file tree
Hide file tree
Showing 13 changed files with 389 additions and 175 deletions.
5 changes: 5 additions & 0 deletions api/v1/cosmosfullnode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ type AutoDataSource struct {
// If no VolumeSnapshots found, controller logs error and still creates PVC.
// +optional
VolumeSnapshotSelector map[string]string `json:"volumeSnapshotSelector"`

// If true, the volume snapshot selector will make sure the PVC
// is restored from a VolumeSnapshot on the same node.
// This is useful if the VolumeSnapshots are local to the node, e.g. for topolvm.
MatchInstance bool `json:"matchInstance"`
}

// RolloutStrategy is an update strategy that can be shared between several Cosmos CRDs.
Expand Down
2 changes: 1 addition & 1 deletion api/v1/self_healing_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type HeightDriftMitigationSpec struct {
type SelfHealingStatus struct {
// PVC auto-scaling status.
// +optional
PVCAutoScale *PVCAutoScaleStatus `json:"pvcAutoScale"`
PVCAutoScale map[string]*PVCAutoScaleStatus `json:"pvcAutoScaler"`
}

type PVCAutoScaleStatus struct {
Expand Down
14 changes: 12 additions & 2 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 35 additions & 17 deletions config/crd/bases/cosmos.strange.love_cosmosfullnodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ spec:
set; that field takes precedence. Configuring autoDataSource
may help boostrap new replicas more quickly.
properties:
matchInstance:
description: If true, the volume snapshot selector will
make sure the PVC is restored from a VolumeSnapshot
on the same node. This is useful if the VolumeSnapshots
are local to the node, e.g. for topolvm.
type: boolean
volumeSnapshotSelector:
additionalProperties:
type: string
Expand All @@ -397,6 +403,8 @@ spec:
no VolumeSnapshots found, controller logs error and
still creates PVC.
type: object
required:
- matchInstance
type: object
dataSource:
description: 'Can be used to specify either: * An existing
Expand Down Expand Up @@ -5814,6 +5822,12 @@ spec:
that field takes precedence. Configuring autoDataSource may
help boostrap new replicas more quickly.
properties:
matchInstance:
description: If true, the volume snapshot selector will make
sure the PVC is restored from a VolumeSnapshot on the same
node. This is useful if the VolumeSnapshots are local to
the node, e.g. for topolvm.
type: boolean
volumeSnapshotSelector:
additionalProperties:
type: string
Expand All @@ -5824,6 +5838,8 @@ spec:
namespace as the CosmosFullNode. If no VolumeSnapshots found,
controller logs error and still creates PVC.
type: object
required:
- matchInstance
type: object
dataSource:
description: 'Can be used to specify either: * An existing VolumeSnapshot
Expand Down Expand Up @@ -5986,24 +6002,26 @@ spec:
selfHealing:
description: Status set by the SelfHealing controller.
properties:
pvcAutoScale:
pvcAutoScaler:
additionalProperties:
properties:
requestedAt:
description: The timestamp the SelfHealing controller requested
a PVC increase.
format: date-time
type: string
requestedSize:
anyOf:
- type: integer
- type: string
description: The PVC size requested by the SelfHealing controller.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- requestedAt
- requestedSize
type: object
description: PVC auto-scaling status.
properties:
requestedAt:
description: The timestamp the SelfHealing controller requested
a PVC increase.
format: date-time
type: string
requestedSize:
anyOf:
- type: integer
- type: string
description: The PVC size requested by the SelfHealing controller.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- requestedAt
- requestedSize
type: object
type: object
status:
Expand Down
18 changes: 15 additions & 3 deletions controllers/cosmosfullnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque

syncInfo := fullnode.SyncInfoStatus(ctx, crd, r.cacheController)

defer r.updateStatus(ctx, crd, syncInfo)
pvcStatusChanges := fullnode.PVCStatusChanges{}

defer r.updateStatus(ctx, crd, syncInfo, &pvcStatusChanges)

errs := &kube.ReconcileErrors{}

Expand Down Expand Up @@ -178,7 +180,7 @@ func (r *CosmosFullNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

// Reconcile pvcs.
pvcRequeue, err := r.pvcControl.Reconcile(ctx, reporter, crd)
pvcRequeue, err := r.pvcControl.Reconcile(ctx, reporter, crd, &pvcStatusChanges)
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -221,7 +223,12 @@ func (r *CosmosFullNodeReconciler) resultWithErr(crd *cosmosv1.CosmosFullNode, e
return stopResult, err
}

func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmosv1.CosmosFullNode, syncInfo map[string]*cosmosv1.SyncInfoPodStatus) {
func (r *CosmosFullNodeReconciler) updateStatus(
ctx context.Context,
crd *cosmosv1.CosmosFullNode,
syncInfo map[string]*cosmosv1.SyncInfoPodStatus,
pvcStatusChanges *fullnode.PVCStatusChanges,
) {
if err := r.statusClient.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) {
status.ObservedGeneration = crd.Status.ObservedGeneration
status.Phase = crd.Status.Phase
Expand All @@ -236,6 +243,11 @@ func (r *CosmosFullNodeReconciler) updateStatus(ctx context.Context, crd *cosmos
status.Height[k] = *v.Height
}
}
if status.SelfHealing.PVCAutoScale != nil {
for _, k := range pvcStatusChanges.Deleted {
delete(status.SelfHealing.PVCAutoScale, k)
}
}
}); err != nil {
log.FromContext(ctx).Error(err, "Failed to patch status")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/fullnode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -58,6 +59,8 @@ func (m *mockClient[T]) Get(ctx context.Context, key client.ObjectKey, obj clien
*ref = m.Object.(corev1.PersistentVolumeClaim)
case *cosmosv1.CosmosFullNode:
*ref = m.Object.(cosmosv1.CosmosFullNode)
case *snapshotv1.VolumeSnapshot:
*ref = m.Object.(snapshotv1.VolumeSnapshot)
default:
panic(fmt.Errorf("unknown Object type: %T", m.ObjectList))
}
Expand Down
5 changes: 5 additions & 0 deletions internal/fullnode/pod_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func defaultCRD() cosmosv1.CosmosFullNode {
},
},
},
VolumeClaimTemplate: cosmosv1.PersistentVolumeClaimSpec{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: resource.MustParse("100Gi")},
},
},
},
}
}
Expand Down
88 changes: 51 additions & 37 deletions internal/fullnode/pvc_auto_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package fullnode
import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/samber/lo"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -45,52 +43,68 @@ func NewPVCAutoScaler(client StatusSyncer) *PVCAutoScaler {
// Returns an error if patching unsuccessful.
func (scaler PVCAutoScaler) SignalPVCResize(ctx context.Context, crd *cosmosv1.CosmosFullNode, results []PVCDiskUsage) (bool, error) {
var (
spec = crd.Spec.SelfHeal.PVCAutoScale
trigger = int(spec.UsedSpacePercentage)
pvcCandidate = lo.MaxBy(results, func(a PVCDiskUsage, b PVCDiskUsage) bool { return a.PercentUsed > b.PercentUsed })
spec = crd.Spec.SelfHeal.PVCAutoScale
trigger = int(spec.UsedSpacePercentage)
)

// Calc new size first to catch errors with the increase quantity
newSize, err := scaler.calcNextCapacity(pvcCandidate.Capacity, spec.IncreaseQuantity)
if err != nil {
return false, fmt.Errorf("increaseQuantity must be a percentage string (e.g. 10%%) or a storage quantity (e.g. 100Gi): %w", err)
}
var joinedErr error

// Prevent patching if PVC size not at threshold
if pvcCandidate.PercentUsed < trigger {
return false, nil
}
status := crd.Status.SelfHealing.PVCAutoScale

patches := make(map[string]*cosmosv1.PVCAutoScaleStatus)

now := metav1.NewTime(scaler.now())

// Prevent continuous reconcile loops
if status := crd.Status.SelfHealing.PVCAutoScale; status != nil {
if status.RequestedSize.Value() == newSize.Value() {
return false, nil
for _, pvc := range results {
if pvc.PercentUsed < trigger {
// no need to expand
continue
}
}

// Handle max size
if max := spec.MaxSize; !max.IsZero() {
// If already reached max size, don't patch
if pvcCandidate.Capacity.Cmp(max) >= 0 {
return false, nil
newSize, err := scaler.calcNextCapacity(pvc.Capacity, spec.IncreaseQuantity)
if err != nil {
joinedErr = errors.Join(joinedErr, err)
continue
}
// Cap new size to the max size
if newSize.Cmp(max) >= 0 {
newSize = max

if status != nil {
if pvcStatus, ok := status[pvc.Name]; ok && pvcStatus.RequestedSize.Value() == newSize.Value() {
// already requested
continue
}
}
}

// Patch object status which will signal the CosmosFullNode controller to increase PVC size.
var patch cosmosv1.CosmosFullNode
patch.TypeMeta = crd.TypeMeta
patch.Namespace = crd.Namespace
patch.Name = crd.Name
return true, scaler.client.SyncUpdate(ctx, client.ObjectKeyFromObject(&patch), func(status *cosmosv1.FullNodeStatus) {
status.SelfHealing.PVCAutoScale = &cosmosv1.PVCAutoScaleStatus{
if max := spec.MaxSize; !max.IsZero() {
if pvc.Capacity.Cmp(max) >= 0 {
// already at max size
continue
}

if newSize.Cmp(max) >= 0 {
// Cap new size to the max size
newSize = max
}
}

patches[pvc.Name] = &cosmosv1.PVCAutoScaleStatus{
RequestedSize: newSize,
RequestedAt: metav1.NewTime(scaler.now()),
RequestedAt: now,
}
}

if len(patches) == 0 {
return false, joinedErr
}

return true, errors.Join(joinedErr, scaler.client.SyncUpdate(ctx, client.ObjectKeyFromObject(crd), func(status *cosmosv1.FullNodeStatus) {
if status.SelfHealing.PVCAutoScale == nil {
status.SelfHealing.PVCAutoScale = patches
return
}
for k, v := range patches {
status.SelfHealing.PVCAutoScale[k] = v
}
})
}))
}

func (scaler PVCAutoScaler) calcNextCapacity(current resource.Quantity, increase string) (resource.Quantity, error) {
Expand Down
Loading

0 comments on commit 41a210e

Please sign in to comment.