From d053989be5eae6e65ce760a35a6004da94db88bd Mon Sep 17 00:00:00 2001 From: Dmitriy Matrenichev Date: Wed, 22 Jan 2025 20:28:38 +0300 Subject: [PATCH] chore: migrate uncompressed resources data Abusing generics to compress uncompressed data in `*omni.ConfigPatch`, `*omni.ClusterMachineConfig`, `*omni.RedactedClusterMachineConfig` and `*omni.ClusterMachineConfigPatches`. Fixes #853 Signed-off-by: Dmitriy Matrenichev --- .../backend/runtime/omni/migration/manager.go | 4 + .../runtime/omni/migration/migrations.go | 94 +++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/internal/backend/runtime/omni/migration/manager.go b/internal/backend/runtime/omni/migration/manager.go index cd2836da..1f47ebe8 100644 --- a/internal/backend/runtime/omni/migration/manager.go +++ b/internal/backend/runtime/omni/migration/manager.go @@ -181,6 +181,10 @@ func NewManager(state state.State, logger *zap.Logger) *Manager { callback: removeMaintenanceConfigPatchFinalizers, name: "removeMaintenanceConfigPatchFinalizers", }, + { + callback: compressMachineAndPatches, + name: "compressMachineAndPatches", + }, }, } } diff --git a/internal/backend/runtime/omni/migration/migrations.go b/internal/backend/runtime/omni/migration/migrations.go index 69f8383e..5b07e9fa 100644 --- a/internal/backend/runtime/omni/migration/migrations.go +++ b/internal/backend/runtime/omni/migration/migrations.go @@ -11,8 +11,10 @@ import ( "slices" "strings" + "github.com/cosi-project/runtime/pkg/controller/generic" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/kvutils" + "github.com/cosi-project/runtime/pkg/resource/protobuf" "github.com/cosi-project/runtime/pkg/safe" "github.com/cosi-project/runtime/pkg/state" "github.com/google/uuid" @@ -1333,3 +1335,95 @@ func removeMaintenanceConfigPatchFinalizers(ctx context.Context, st state.State, return st.RemoveFinalizer(ctx, item.Metadata(), "MaintenanceConfigPatchController") }) } + +func compressMachineAndPatches(ctx context.Context, st state.State, l *zap.Logger) error { + doConfigPatch := updateSingle[string, specs.ConfigPatchSpec, *specs.ConfigPatchSpec] + doMachineConfig := updateSingle[[]byte, specs.ClusterMachineConfigSpec, *specs.ClusterMachineConfigSpec] + doRedactedMachineConfig := updateSingle[string, specs.RedactedClusterMachineConfigSpec, *specs.RedactedClusterMachineConfigSpec] + + for _, fn := range []func(context.Context, state.State, *zap.Logger) error{ + compressUncompressed[*omni.ConfigPatch](doConfigPatch), + compressUncompressed[*omni.ClusterMachineConfig](doMachineConfig), + compressUncompressed[*omni.RedactedClusterMachineConfig](doRedactedMachineConfig), + compressUncompressed[*omni.ClusterMachineConfigPatches](doClusterMachineConfigPatches), + } { + if err := fn(ctx, st, l); err != nil { + return err + } + } + + return nil +} + +func compressUncompressed[ + R interface { + generic.ResourceWithRD + TypedSpec() *protobuf.ResourceSpec[T, S] + }, + T any, + S protobuf.Spec[T], +](update func(spec *protobuf.ResourceSpec[T, S]) (bool, error)) func(context.Context, state.State, *zap.Logger) error { + return func(ctx context.Context, st state.State, _ *zap.Logger) error { + items, err := safe.ReaderListAll[R](ctx, st) + if err != nil { + return err + } + + for val := range items.All() { + spec := val.TypedSpec() + + ok, uerr := update(spec) + if uerr != nil { + return uerr + } else if !ok { + continue + } + + if _, err = safe.StateUpdateWithConflicts(ctx, st, val.Metadata(), func(res R) error { + res.TypedSpec().Value = spec.Value + + return nil + }); err != nil { + return fmt.Errorf("failed to update %s: %w", val.Metadata(), err) + } + } + + return nil + } +} + +func updateSingle[ + D string | []byte, + T any, + S interface { + GetData() D + SetUncompressedData(data []byte, opts ...specs.CompressionOption) error + protobuf.Spec[T] + }, +](spec *protobuf.ResourceSpec[T, S]) (bool, error) { + data := spec.Value.GetData() + if len(data) == 0 { + return false, nil + } + + err := spec.Value.SetUncompressedData([]byte(data)) + if err != nil { + return false, fmt.Errorf("failed to compress data during migration: %w", err) + } + + return true, nil +} + +func doClusterMachineConfigPatches(spec *protobuf.ResourceSpec[specs.ClusterMachineConfigPatchesSpec, *specs.ClusterMachineConfigPatchesSpec]) (bool, error) { + patches := spec.Value.GetPatches() + if len(patches) == 0 { + return false, nil + } + + err := spec.Value.SetUncompressedPatches(patches) + if err != nil { + return false, fmt.Errorf("failed to compress data during migration: %w", err) + } + + return true, nil +}