From 0c0ce3b0549fb139ce852186c83f045411d57363 Mon Sep 17 00:00:00 2001 From: Nahshon Unna-Tsameret Date: Tue, 31 Dec 2024 18:45:49 +0200 Subject: [PATCH 1/2] add stream utils for iterators Signed-off-by: Nahshon Unna-Tsameret --- controllers/operands/cdi_test.go | 9 ++-- controllers/operands/dashboard_test.go | 14 ++--- controllers/operands/kubevirt.go | 44 +++++++-------- pkg/stream/stream.go | 68 +++++++++++++++++++++++ pkg/stream/stream_test.go | 74 ++++++++++++++++++++++++++ 5 files changed, 178 insertions(+), 31 deletions(-) create mode 100644 pkg/stream/stream.go create mode 100644 pkg/stream/stream_test.go diff --git a/controllers/operands/cdi_test.go b/controllers/operands/cdi_test.go index b572daf157..c30f8c794c 100644 --- a/controllers/operands/cdi_test.go +++ b/controllers/operands/cdi_test.go @@ -2,6 +2,7 @@ package operands import ( "context" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" "maps" "time" @@ -103,9 +104,11 @@ var _ = Describe("CDI Operand", func() { outdatedResource, err := NewCDI(hco) Expect(err).ToNot(HaveOccurred()) expectedLabels := maps.Clone(outdatedResource.Labels) - for k, v := range expectedLabels { - outdatedResource.Labels[k] = "wrong_" + v - } + + outdatedResource.Labels = maps.Collect(stream.Transform22(maps.All(outdatedResource.Labels), func(k, v string) (string, string) { + return k, "wrong_" + v + })) + outdatedResource.Labels[userLabelKey] = userLabelValue cl := commontestutils.InitClient([]client.Object{hco, outdatedResource}) diff --git a/controllers/operands/dashboard_test.go b/controllers/operands/dashboard_test.go index 8ae0f39873..69cb6f071f 100644 --- a/controllers/operands/dashboard_test.go +++ b/controllers/operands/dashboard_test.go @@ -3,9 +3,11 @@ package operands import ( "context" "fmt" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" "maps" "os" "path" + "slices" "strings" "time" @@ -192,13 +194,11 @@ var _ = Describe("Dashboard tests", func() { Expect(cmList.Items[0].Name).To(Equal("grafana-dashboard-kubevirt-top-consumers")) }) - expectedLabels := make(map[string]map[string]string) - - By("getting opinionated labels", func() { - for _, cm := range cmList.Items { - expectedLabels[cm.Name] = maps.Clone(cm.Labels) - } - }) + By("getting opinionated labels") + mapNameLabels := func(cm corev1.ConfigMap) (string, map[string]string) { + return cm.Name, maps.Clone(cm.Labels) + } + expectedLabels := maps.Collect(stream.Transform2(slices.Values(cmList.Items), mapNameLabels)) By("altering the cm objects", func() { for _, foundResource := range cmList.Items { diff --git a/controllers/operands/kubevirt.go b/controllers/operands/kubevirt.go index 4d4cb66299..18519a5429 100644 --- a/controllers/operands/kubevirt.go +++ b/controllers/operands/kubevirt.go @@ -9,6 +9,7 @@ import ( "os" "path" "reflect" + "slices" "strconv" "strings" @@ -26,6 +27,7 @@ import ( hcov1beta1 "github.com/kubevirt/hyperconverged-cluster-operator/api/v1beta1" "github.com/kubevirt/hyperconverged-cluster-operator/controllers/common" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" hcoutil "github.com/kubevirt/hyperconverged-cluster-operator/pkg/util" kubevirtcorev1 "kubevirt.io/api/core/v1" ) @@ -397,16 +399,17 @@ func hcWorkloadUpdateStrategyToKv(hcObject *hcov1beta1.HyperConvergedWorkloadUpd } if hcObject.BatchEvictionSize != nil { - kvObject.BatchEvictionSize = new(int) - *kvObject.BatchEvictionSize = *hcObject.BatchEvictionSize + kvObject.BatchEvictionSize = ptr.To(*hcObject.BatchEvictionSize) } - if size := len(hcObject.WorkloadUpdateMethods); size > 0 { - kvObject.WorkloadUpdateMethods = make([]kubevirtcorev1.WorkloadUpdateMethod, size) - for i, updateMethod := range hcObject.WorkloadUpdateMethods { - kvObject.WorkloadUpdateMethods[i] = kubevirtcorev1.WorkloadUpdateMethod(updateMethod) - } - } + kvObject.WorkloadUpdateMethods = slices.Collect( + stream.Transform( + slices.Values(hcObject.WorkloadUpdateMethods), + func(method string) kubevirtcorev1.WorkloadUpdateMethod { + return kubevirtcorev1.WorkloadUpdateMethod(method) + }, + ), + ) } return kvObject @@ -651,21 +654,20 @@ func toKvUSBHostDevices(hcoUSBHostdevices []hcov1beta1.USBHostDevice) []kubevirt } func toKvMediatedDevices(hcoMediatedDevices []hcov1beta1.MediatedHostDevice) []kubevirtcorev1.MediatedHostDevice { - if len(hcoMediatedDevices) > 0 { - mediatedDevices := make([]kubevirtcorev1.MediatedHostDevice, 0, len(hcoMediatedDevices)) - for _, hcoMediatedHostDevice := range hcoMediatedDevices { - if !hcoMediatedHostDevice.Disabled { - mediatedDevices = append(mediatedDevices, kubevirtcorev1.MediatedHostDevice{ - MDEVNameSelector: hcoMediatedHostDevice.MDEVNameSelector, - ResourceName: hcoMediatedHostDevice.ResourceName, - ExternalResourceProvider: hcoMediatedHostDevice.ExternalResourceProvider, - }) - } - } - return mediatedDevices + filter := func(dev hcov1beta1.MediatedHostDevice) bool { + return !dev.Disabled } - return nil + + mapHCO2KV := func(hcoDev hcov1beta1.MediatedHostDevice) kubevirtcorev1.MediatedHostDevice { + return kubevirtcorev1.MediatedHostDevice{ + MDEVNameSelector: hcoDev.MDEVNameSelector, + ResourceName: hcoDev.ResourceName, + ExternalResourceProvider: hcoDev.ExternalResourceProvider, + } + } + + return slices.Collect(stream.Transform(stream.Filter(slices.Values(hcoMediatedDevices), filter), mapHCO2KV)) } func hcLiveMigrationToKv(lm hcov1beta1.LiveMigrationConfigurations) (*kubevirtcorev1.MigrationConfiguration, error) { diff --git a/pkg/stream/stream.go b/pkg/stream/stream.go new file mode 100644 index 0000000000..a062ddab78 --- /dev/null +++ b/pkg/stream/stream.go @@ -0,0 +1,68 @@ +package stream + +import "iter" + +// Filter creates an iterator that filters the elements of a sequence (iterator), by a given function filterFn +// that for each element of type V1, returns true to include the element in the result sequence, or false, to +// skip the element. +// The Filter function returns an iter.Seq sequence. +func Filter[T any](seq iter.Seq[T], filterFn func(T) bool) iter.Seq[T] { + return func(yield func(T) bool) { + for v := range seq { + if filterFn(v) { + if !yield(v) { + return + } + } + } + } +} + +// Transform creates an iterator that modifies the elements of a sequence (iterator), by a given function transFn +// that for each element of type V1, returns a value V2. +// The Transform function returns an iter.Seq sequence. +func Transform[V1, V2 any](seq iter.Seq[V1], transFn func(V1) V2) iter.Seq[V2] { + return func(yield func(V2) bool) { + for v1 := range seq { + if !yield(transFn(v1)) { + return + } + } + } +} + +// Transform2 creates an iterator that modifies the elements of a sequence (iterator), by a given function transFn +// that for each element of type V1, returns a key K and a value V2. +// The Transform2 function returns an iter.Seq2 sequence. +func Transform2[K, V1, V2 any](seq iter.Seq[V1], transFn func(V1) (K, V2)) iter.Seq2[K, V2] { + return func(yield func(K, V2) bool) { + for v1 := range seq { + if !yield(transFn(v1)) { + return + } + } + } +} + +// Transform22 creates an iterator that modifies the elements of a sequence (iterator), by a given function transFn +// that for each pair of K1, V1, returns a pair of K2, V2. +// The Transform22 function returns an iter.Seq2 sequence. +func Transform22[K1, V1, K2, V2 any](seq iter.Seq2[K1, V1], transFn func(K1, V1) (K2, V2)) iter.Seq2[K2, V2] { + return func(yield func(K2, V2) bool) { + for k1, v1 := range seq { + if !yield(transFn(k1, v1)) { + return + } + } + } +} + +func Iter2Values[K, V any](seq2 iter.Seq2[K, V]) iter.Seq[V] { + return func(yield func(V) bool) { + for _, v := range seq2 { + if !yield(v) { + return + } + } + } +} diff --git a/pkg/stream/stream_test.go b/pkg/stream/stream_test.go new file mode 100644 index 0000000000..2ad14b7df0 --- /dev/null +++ b/pkg/stream/stream_test.go @@ -0,0 +1,74 @@ +package stream + +import ( + "github.com/onsi/gomega" + "maps" + "slices" + "strconv" + "testing" +) + +func TestTransform(t *testing.T) { + g := gomega.NewGomegaWithT(t) + var s []int + g.Expect(slices.Collect(Transform(slices.Values(s), strconv.Itoa))).To(gomega.BeNil()) + + s = []int{1, 2, 3} + + res := slices.Collect(Transform(slices.Values(s), strconv.Itoa)) + g.Expect(res).To(gomega.HaveLen(3)) + g.Expect(res).To(gomega.ContainElements("1", "2", "3")) +} + +func TestTransform2(t *testing.T) { + g := gomega.NewGomegaWithT(t) + var s []int + trans2Func := func(i int) (int, string) { + return i, strconv.Itoa(i) + } + + g.Expect(maps.Collect(Transform2(slices.Values(s), trans2Func))).To(gomega.BeEmpty()) + + s = []int{1, 2, 3} + + res := maps.Collect(Transform2(slices.Values(s), trans2Func)) + g.Expect(res).To(gomega.HaveLen(3)) + g.Expect(res).To(gomega.HaveKeyWithValue(1, "1")) + g.Expect(res).To(gomega.HaveKeyWithValue(2, "2")) + g.Expect(res).To(gomega.HaveKeyWithValue(3, "3")) +} + +func TestTransform22(t *testing.T) { + g := gomega.NewGomegaWithT(t) + var s []int + trans22Func := func(i, v int) (int, string) { + return i * i, strconv.Itoa(v) + } + + g.Expect(maps.Collect(Transform22(slices.All(s), trans22Func))).To(gomega.BeEmpty()) + + s = []int{11, 22, 33, 44} + + res := maps.Collect(Transform22(slices.All(s), trans22Func)) + g.Expect(res).To(gomega.HaveLen(4)) + g.Expect(res).To(gomega.HaveKeyWithValue(0, "11")) + g.Expect(res).To(gomega.HaveKeyWithValue(1, "22")) + g.Expect(res).To(gomega.HaveKeyWithValue(4, "33")) + g.Expect(res).To(gomega.HaveKeyWithValue(9, "44")) + + res2 := slices.Collect(Iter2Values(Transform22(slices.All(s), trans22Func))) + g.Expect(res2).To(gomega.HaveLen(4)) + g.Expect(res2).To(gomega.ContainElements("11", "22", "33", "44")) +} + +func TestFilter(t *testing.T) { + g := gomega.NewGomegaWithT(t) + var s []int + + isEven := func(i int) bool { return i&1 == 0 } + g.Expect(slices.Collect(Filter(slices.Values(s), isEven))).To(gomega.BeNil()) + s = []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + res := slices.Collect(Filter(slices.Values(s), isEven)) + g.Expect(res).To(gomega.HaveLen(4)) + g.Expect(res).To(gomega.ContainElements(2, 4, 6, 8)) +} From dede4749ae5bb2cadc02657273f0a406b29bb35d Mon Sep 17 00:00:00 2001 From: Nahshon Unna-Tsameret Date: Thu, 2 Jan 2025 11:57:33 +0200 Subject: [PATCH 2/2] some example of using streams Signed-off-by: Nahshon Unna-Tsameret --- controllers/operands/cdi_test.go | 2 +- controllers/operands/dashboard_test.go | 2 +- controllers/operands/imageStream.go | 10 ++++--- controllers/operands/kubevirt.go | 15 ++++++---- controllers/operands/operand.go | 13 ++------ pkg/stream/stream_test.go | 3 +- tools/csv-merger/csv-merger.go | 30 +++++++++---------- .../manifest-templator/manifest-templator.go | 7 ++--- 8 files changed, 40 insertions(+), 42 deletions(-) diff --git a/controllers/operands/cdi_test.go b/controllers/operands/cdi_test.go index c30f8c794c..4255c181d0 100644 --- a/controllers/operands/cdi_test.go +++ b/controllers/operands/cdi_test.go @@ -2,7 +2,6 @@ package operands import ( "context" - "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" "maps" "time" @@ -20,6 +19,7 @@ import ( "k8s.io/client-go/tools/reference" "k8s.io/utils/ptr" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" cdiv1beta1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/controllers/operands/dashboard_test.go b/controllers/operands/dashboard_test.go index 69cb6f071f..380703d434 100644 --- a/controllers/operands/dashboard_test.go +++ b/controllers/operands/dashboard_test.go @@ -3,7 +3,6 @@ package operands import ( "context" "fmt" - "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" "maps" "os" "path" @@ -19,6 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "github.com/kubevirt/hyperconverged-cluster-operator/controllers/commontestutils" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" hcoutil "github.com/kubevirt/hyperconverged-cluster-operator/pkg/util" ) diff --git a/controllers/operands/imageStream.go b/controllers/operands/imageStream.go index 3961defd04..67e62b809f 100644 --- a/controllers/operands/imageStream.go +++ b/controllers/operands/imageStream.go @@ -3,9 +3,11 @@ package operands import ( "errors" "fmt" + "maps" "os" "path/filepath" "reflect" + "slices" "strings" log "github.com/go-logr/logr" @@ -18,6 +20,7 @@ import ( hcov1beta1 "github.com/kubevirt/hyperconverged-cluster-operator/api/v1beta1" "github.com/kubevirt/hyperconverged-cluster-operator/controllers/common" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" "github.com/kubevirt/hyperconverged-cluster-operator/pkg/util" ) @@ -138,10 +141,9 @@ type isHooks struct { } func newIsHook(required *imagev1.ImageStream, origNS string) *isHooks { - tags := make(map[string]imagev1.TagReference) - for _, tag := range required.Spec.Tags { - tags[tag.Name] = tag - } + tags := maps.Collect(stream.Transform22(slices.All(required.Spec.Tags), func(_ int, tag imagev1.TagReference) (string, imagev1.TagReference) { + return tag.Name, tag + })) return &isHooks{required: required, tags: tags, originalNS: origNS} } diff --git a/controllers/operands/kubevirt.go b/controllers/operands/kubevirt.go index 18519a5429..4c99a6643a 100644 --- a/controllers/operands/kubevirt.go +++ b/controllers/operands/kubevirt.go @@ -540,16 +540,19 @@ func getNetworkBindings(hcoNetworkBindings map[string]kubevirtcorev1.InterfaceBi } func getObsoleteCPUConfig(hcObsoleteCPUConf *hcov1beta1.HyperConvergedObsoleteCPUs) (map[string]bool, string) { - obsoleteCPUModels := make(map[string]bool) - for _, cpu := range hardcodedObsoleteCPUModels { - obsoleteCPUModels[cpu] = true + transformFunc := func(cpu string) (string, bool) { + return cpu, true } + + obsoleteCPUModels := maps.Collect(stream.Transform2(slices.Values(hardcodedObsoleteCPUModels), transformFunc)) + minCPUModel := "" if hcObsoleteCPUConf != nil { - for _, cpu := range hcObsoleteCPUConf.CPUModels { - obsoleteCPUModels[cpu] = true - } + maps.Insert( + obsoleteCPUModels, + stream.Transform2(slices.Values(hcObsoleteCPUConf.CPUModels), transformFunc), + ) minCPUModel = hcObsoleteCPUConf.MinCPUModel } diff --git a/controllers/operands/operand.go b/controllers/operands/operand.go index f3dcbbe13f..90919b21f2 100644 --- a/controllers/operands/operand.go +++ b/controllers/operands/operand.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "slices" "strings" jsonpatch "github.com/evanphx/json-patch/v5" @@ -18,6 +19,7 @@ import ( hcov1beta1 "github.com/kubevirt/hyperconverged-cluster-operator/api/v1beta1" "github.com/kubevirt/hyperconverged-cluster-operator/controllers/common" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" hcoutil "github.com/kubevirt/hyperconverged-cluster-operator/pkg/util" ) @@ -443,14 +445,5 @@ func osConditionToK8s(condition conditionsv1.Condition) metav1.Condition { } func osConditionsToK8s(conditions []conditionsv1.Condition) []metav1.Condition { - if len(conditions) == 0 { - return nil - } - - newCond := make([]metav1.Condition, len(conditions)) - for i, c := range conditions { - newCond[i] = osConditionToK8s(c) - } - - return newCond + return slices.Collect(stream.Transform(slices.Values(conditions), osConditionToK8s)) } diff --git a/pkg/stream/stream_test.go b/pkg/stream/stream_test.go index 2ad14b7df0..a7f02e4b6f 100644 --- a/pkg/stream/stream_test.go +++ b/pkg/stream/stream_test.go @@ -1,11 +1,12 @@ package stream import ( - "github.com/onsi/gomega" "maps" "slices" "strconv" "testing" + + "github.com/onsi/gomega" ) func TestTransform(t *testing.T) { diff --git a/tools/csv-merger/csv-merger.go b/tools/csv-merger/csv-merger.go index 0d78d4d48d..9b107e4b14 100644 --- a/tools/csv-merger/csv-merger.go +++ b/tools/csv-merger/csv-merger.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/kubevirt/hyperconverged-cluster-operator/pkg/components" + "github.com/kubevirt/hyperconverged-cluster-operator/pkg/stream" hcoutil "github.com/kubevirt/hyperconverged-cluster-operator/pkg/util" "github.com/kubevirt/hyperconverged-cluster-operator/tools/util" ) @@ -65,10 +66,13 @@ var ( type EnvVarFlags []corev1.EnvVar func (i *EnvVarFlags) String() string { - es := make([]string, 0) - for _, ev := range *i { - es = append(es, fmt.Sprintf("%s=%s", ev.Name, ev.Value)) - } + es := slices.Collect(stream.Transform( + slices.Values(*i), + func(ev corev1.EnvVar) string { + return fmt.Sprintf("%s=%s", ev.Name, ev.Value) + }, + )) + return strings.Join(es, ",") } @@ -136,16 +140,17 @@ var ( ) func IOReadDir(root string) ([]string, error) { - var files []string fileInfo, err := os.ReadDir(root) if err != nil { - return files, err + return nil, err } - for _, file := range fileInfo { - files = append(files, filepath.Join(root, file.Name())) - } - return files, nil + return slices.Collect(stream.Transform( + slices.Values(fileInfo), + func(file os.DirEntry) string { + return filepath.Join(root, file.Name()) + }, + )), nil } func validateNoAPIOverlap(crdDir string) error { @@ -182,11 +187,6 @@ func detectAPIOverlap(crdMap map[string][]string) map[string]sets.Set[string] { overlapsMap := make(map[string]sets.Set[string]) for operator, groups := range crdMap { for _, apiGroup := range groups { - // We work on replacement for current v2v. Remove this check when vmware import is removed - if apiGroup == "v2v.kubevirt.io" { - continue - } - compareMapWithEntry(crdMap, operator, apiGroup, overlapsMap) } } diff --git a/tools/manifest-templator/manifest-templator.go b/tools/manifest-templator/manifest-templator.go index 21c6ba92df..e53ddb350c 100644 --- a/tools/manifest-templator/manifest-templator.go +++ b/tools/manifest-templator/manifest-templator.go @@ -22,8 +22,10 @@ package main import ( "flag" "log" + "maps" "os" "path" + "slices" "sort" "strconv" "strings" @@ -470,10 +472,7 @@ func writeOperatorDeploymentsAndServices(deployments []appsv1.Deployment, servic } func writeServiceAccounts(serviceAccounts map[string]v1.ServiceAccount) { - var keys []string - for saName := range serviceAccounts { - keys = append(keys, saName) - } + keys := slices.Collect(maps.Keys(serviceAccounts)) // since maps are not ordered we must enforce one before writing sort.Strings(keys)