Skip to content

Commit

Permalink
chore: migrate uncompressed resources data
Browse files Browse the repository at this point in the history
Abusing generics to compress uncompressed data in `*omni.ConfigPatch`, `*omni.ClusterMachineConfig`, `*omni.RedactedClusterMachineConfig` and `*omni.ClusterMachineConfigPatches`.

Fixes #853

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Jan 23, 2025
1 parent 4a436b4 commit d053989
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
4 changes: 4 additions & 0 deletions internal/backend/runtime/omni/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func NewManager(state state.State, logger *zap.Logger) *Manager {
callback: removeMaintenanceConfigPatchFinalizers,
name: "removeMaintenanceConfigPatchFinalizers",
},
{
callback: compressMachineAndPatches,
name: "compressMachineAndPatches",
},
},
}
}
Expand Down
94 changes: 94 additions & 0 deletions internal/backend/runtime/omni/migration/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"slices"
"strings"

"github.com/cosi-project/runtime/pkg/controller/generic"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/kvutils"
"github.com/cosi-project/runtime/pkg/resource/protobuf"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/google/uuid"
Expand Down Expand Up @@ -1333,3 +1335,95 @@ func removeMaintenanceConfigPatchFinalizers(ctx context.Context, st state.State,
return st.RemoveFinalizer(ctx, item.Metadata(), "MaintenanceConfigPatchController")
})
}

func compressMachineAndPatches(ctx context.Context, st state.State, l *zap.Logger) error {
doConfigPatch := updateSingle[string, specs.ConfigPatchSpec, *specs.ConfigPatchSpec]
doMachineConfig := updateSingle[[]byte, specs.ClusterMachineConfigSpec, *specs.ClusterMachineConfigSpec]
doRedactedMachineConfig := updateSingle[string, specs.RedactedClusterMachineConfigSpec, *specs.RedactedClusterMachineConfigSpec]

for _, fn := range []func(context.Context, state.State, *zap.Logger) error{
compressUncompressed[*omni.ConfigPatch](doConfigPatch),
compressUncompressed[*omni.ClusterMachineConfig](doMachineConfig),
compressUncompressed[*omni.RedactedClusterMachineConfig](doRedactedMachineConfig),
compressUncompressed[*omni.ClusterMachineConfigPatches](doClusterMachineConfigPatches),
} {
if err := fn(ctx, st, l); err != nil {
return err
}
}

return nil
}

func compressUncompressed[
R interface {
generic.ResourceWithRD
TypedSpec() *protobuf.ResourceSpec[T, S]
},
T any,
S protobuf.Spec[T],
](update func(spec *protobuf.ResourceSpec[T, S]) (bool, error)) func(context.Context, state.State, *zap.Logger) error {
return func(ctx context.Context, st state.State, _ *zap.Logger) error {
items, err := safe.ReaderListAll[R](ctx, st)
if err != nil {
return err
}

for val := range items.All() {
spec := val.TypedSpec()

ok, uerr := update(spec)
if uerr != nil {
return uerr
} else if !ok {
continue
}

if _, err = safe.StateUpdateWithConflicts(ctx, st, val.Metadata(), func(res R) error {
res.TypedSpec().Value = spec.Value

return nil
}); err != nil {
return fmt.Errorf("failed to update %s: %w", val.Metadata(), err)
}
}

return nil
}
}

func updateSingle[
D string | []byte,
T any,
S interface {
GetData() D
SetUncompressedData(data []byte, opts ...specs.CompressionOption) error
protobuf.Spec[T]
},
](spec *protobuf.ResourceSpec[T, S]) (bool, error) {
data := spec.Value.GetData()
if len(data) == 0 {
return false, nil
}

err := spec.Value.SetUncompressedData([]byte(data))
if err != nil {
return false, fmt.Errorf("failed to compress data during migration: %w", err)
}

return true, nil
}

func doClusterMachineConfigPatches(spec *protobuf.ResourceSpec[specs.ClusterMachineConfigPatchesSpec, *specs.ClusterMachineConfigPatchesSpec]) (bool, error) {
patches := spec.Value.GetPatches()
if len(patches) == 0 {
return false, nil
}

err := spec.Value.SetUncompressedPatches(patches)
if err != nil {
return false, fmt.Errorf("failed to compress data during migration: %w", err)
}

return true, nil
}

0 comments on commit d053989

Please sign in to comment.