Skip to content

Commit

Permalink
[YUNIKORN-2287] Decompress function doesn't need to decode base64
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Dec 22, 2023
1 parent 24c699f commit f04525e
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 13 deletions.
10 changes: 1 addition & 9 deletions pkg/conf/schedulerconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package conf
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"errors"
"flag"
Expand Down Expand Up @@ -452,17 +451,10 @@ func DumpConfiguration() {

func Decompress(key string, value []byte) (string, string) {
var uncompressedData string
decodedValue := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
n, err := base64.StdEncoding.Decode(decodedValue, value)
if err != nil {
log.Log(log.ShimConfig).Error("failed to decode schedulerConfig entry", zap.Error(err))
return "", ""
}
decodedValue = decodedValue[:n]
splitKey := strings.Split(key, ".")
compressionAlgo := splitKey[len(splitKey)-1]
if strings.EqualFold(compressionAlgo, constants.GzipSuffix) {
reader := bytes.NewReader(decodedValue)
reader := bytes.NewReader(value)
gzReader, err := gzip.NewReader(reader)
if err != nil {
log.Log(log.ShimConfig).Error("failed to decompress decoded schedulerConfig entry", zap.Error(err))
Expand Down
6 changes: 2 additions & 4 deletions pkg/conf/schedulerconf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,15 @@ func TestDecompress(t *testing.T) {
assert.NilError(t, err, "expected nil, got error")
t.Fatal("expected nil, got error")
}
encodedConfigString := make([]byte, base64.StdEncoding.EncodedLen(len(b.Bytes())))
base64.StdEncoding.Encode(encodedConfigString, b.Bytes())
key, decodedConfigString := Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString)
key, decodedConfigString := Decompress("queues.yaml."+constants.GzipSuffix, b.Bytes())
assert.Equal(t, "queues.yaml", key)
assert.Equal(t, configs.DefaultSchedulerConfig, decodedConfigString)
}

func TestDecompressUnknownKey(t *testing.T) {
encodedConfigString := make([]byte, base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig))))
base64.StdEncoding.Encode(encodedConfigString, []byte(configs.DefaultSchedulerConfig))
key, decodedConfigString := Decompress("queues.yaml.bin", encodedConfigString)
key, decodedConfigString := Decompress("queues.yaml.bin", []byte(configs.DefaultSchedulerConfig))
assert.Equal(t, "queues.yaml", key)
assert.Assert(t, len(decodedConfigString) == 0, "expected decodedConfigString to be nil")
}
Expand Down
93 changes: 93 additions & 0 deletions test/e2e/configmap/configmap_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package configmap

import (
"path/filepath"
"testing"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/ginkgo/v2/reporters"
"github.com/onsi/gomega"

"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
)

func init() {
configmanager.YuniKornTestConfig.ParseFlags()
}

func TestConfigMap(t *testing.T) {
ginkgo.ReportAfterSuite("TestConfigMap", func(report ginkgo.Report) {
err := common.CreateJUnitReportDir()
Ω(err).NotTo(gomega.HaveOccurred())
err = reporters.GenerateJUnitReportWithConfig(
report,
filepath.Join(configmanager.YuniKornTestConfig.LogDir, "TEST-configmap_junit.xml"),
reporters.JunitReportConfig{OmitSpecLabels: true},
)
Ω(err).NotTo(HaveOccurred())
})
gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, "TestConfigMap", ginkgo.Label("TestConfigMap"))
}

var (
kClient k8s.KubeCtl
restClient yunikorn.RClient
)

var _ = BeforeSuite(func() {

kClient = k8s.KubeCtl{}
Ω(kClient.SetClient()).To(BeNil())

restClient = yunikorn.RClient{}
Ω(restClient).NotTo(BeNil())

yunikorn.EnsureYuniKornConfigsPresent()

// Restart yunikorn and port-forward
// Required to change node sort policy.
ginkgo.By("Restart the scheduler pod")
yunikorn.RestartYunikorn(&kClient)

ginkgo.By("Port-forward scheduler pod after restart")
yunikorn.RestorePortForwarding(&kClient)
})

var _ = AfterSuite(func() {})

var Describe = ginkgo.Describe
var It = ginkgo.It
var By = ginkgo.By
var BeforeEach = ginkgo.BeforeEach
var AfterEach = ginkgo.AfterEach
var BeforeSuite = ginkgo.BeforeSuite
var AfterSuite = ginkgo.AfterSuite

// Declarations for Gomega Matchers
var Equal = gomega.Equal
var Ω = gomega.Expect
var BeNil = gomega.BeNil
var BeNumerically = gomega.BeNumerically
var HaveOccurred = gomega.HaveOccurred
106 changes: 106 additions & 0 deletions test/e2e/configmap/configmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package configmap

import (
"bytes"
"compress/gzip"
"time"

v1 "k8s.io/api/core/v1"

"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
)

var _ = Describe("ConfigMap", func() {
It("Verify_Compressed_ConfigMap", func() {
schedulerConfig := &configs.SchedulerConfig{
Partitions: []configs.PartitionConfig{{
Name: "default",
PlacementRules: []configs.PlacementRule{{
Name: "tag",
Value: "namespace",
Create: true,
}},
Queues: []configs.QueueConfig{{
Name: "root",
SubmitACL: "*",
Queues: []configs.QueueConfig{{
Name: "queue1",
SubmitACL: "*",
}},
}},
}},
}

// Wait for 1 second to set a new timestamp. If we don't wait for it, we may get a same timestamp.
time.Sleep(1 * time.Second)
ts, err := common.SetQueueTimestamp(schedulerConfig, "default", "root")
Ω(err).NotTo(HaveOccurred())

schedulerConfigStr, err := common.ToYAML(schedulerConfig)
Ω(err).NotTo(HaveOccurred())

// use gzip to compress
var b bytes.Buffer
gzWriter := gzip.NewWriter(&b)
_, err = gzWriter.Write([]byte(schedulerConfigStr))
Ω(err).NotTo(HaveOccurred())

err = gzWriter.Close()
Ω(err).NotTo(HaveOccurred())

// use base64 to encode
// encodedSchedulerConfigString := make([]byte, base64.StdEncoding.EncodedLen(len(b.Bytes())))
// base64.StdEncoding.Encode(encodedSchedulerConfigString, b.Bytes())

var configMap *v1.ConfigMap
oldConfigMap := &v1.ConfigMap{}
configMap, err = kClient.GetConfigMaps(configmanager.YuniKornTestConfig.YkNamespace,
configmanager.DefaultYuniKornConfigMap)
Ω(err).NotTo(HaveOccurred())
Ω(configMap).NotTo(BeNil())

configMap.DeepCopyInto(oldConfigMap)
delete(configMap.Data, configmanager.DefaultPolicyGroup)
if configMap.BinaryData == nil {
configMap.BinaryData = map[string][]byte{}
}
configMap.BinaryData[configmanager.DefaultPolicyGroup+".gz"] = b.Bytes()

_, err = kClient.UpdateConfigMap(configMap, configmanager.YuniKornTestConfig.YkNamespace)
Ω(err).NotTo(HaveOccurred())

err = yunikorn.WaitForQueueTS("root", ts, 30*time.Second)
Ω(err).NotTo(HaveOccurred())

queueDAOInfo, err := restClient.GetQueue(yunikorn.DefaultPartition, "root")
Ω(err).NotTo(HaveOccurred())
Ω(queueDAOInfo).NotTo(BeNil())

Ω(len(queueDAOInfo.Children)).To(Equal(1))
Ω(queueDAOInfo.Children[0].QueueName).To(Equal("root.queue1"))

// Restore the old config maps
yunikorn.RestoreConfigMapWrapper(oldConfigMap, "")
})
})
1 change: 1 addition & 0 deletions test/e2e/framework/helpers/yunikorn/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func RestoreConfigMapWrapper(oldConfigMap *v1.ConfigMap, annotation string) {
ts, tsErr := common.SetQueueTimestamp(oldSC, "default", "root")
Ω(tsErr).NotTo(HaveOccurred())
c.Data = oldConfigMap.Data
c.BinaryData = oldConfigMap.BinaryData
c.Data[configmanager.DefaultPolicyGroup], err = common.ToYAML(oldSC)
Ω(err).NotTo(HaveOccurred())

Expand Down

0 comments on commit f04525e

Please sign in to comment.