diff --git a/pkg/common/constants/constants.go b/pkg/common/constants/constants.go index b9783b7b8..af2b57ae0 100644 --- a/pkg/common/constants/constants.go +++ b/pkg/common/constants/constants.go @@ -112,3 +112,6 @@ const AnnotationEnableYuniKorn = "yunikorn.apache.org/namespace.enableYuniKorn" // Admission Controller pod label update constants const AutoGenAppPrefix = "yunikorn" const AutoGenAppSuffix = "autogen" + +// Compression Algorithms for schedulerConfig +const GzipSuffix = "gzip" diff --git a/pkg/common/utils/utils_test.go b/pkg/common/utils/utils_test.go index a386a50a6..e921f3806 100644 --- a/pkg/common/utils/utils_test.go +++ b/pkg/common/utils/utils_test.go @@ -19,6 +19,9 @@ package utils import ( + "bytes" + "compress/gzip" + "encoding/base64" "fmt" "testing" "time" @@ -29,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-k8shim/pkg/common" "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/conf" @@ -1137,6 +1141,25 @@ func TestGetCoreSchedulerConfigFromConfigMap(t *testing.T) { assert.Equal(t, "test", GetCoreSchedulerConfigFromConfigMap(cm)) } +func TestGzipCompressedConfigMap(t *testing.T) { + var b bytes.Buffer + gzWriter := gzip.NewWriter(&b) + if _, err := gzWriter.Write([]byte(configs.DefaultSchedulerConfig)); err != nil { + t.Fatal("expected nil, got error while compressing test schedulerConfig") + } + if err := gzWriter.Close(); err != nil { + t.Fatal("expected nil, got error") + } + encodedConfigString := make([]byte, base64.StdEncoding.EncodedLen(len(b.Bytes()))) + base64.StdEncoding.Encode(encodedConfigString, b.Bytes()) + confMap := conf.FlattenConfigMaps([]*v1.ConfigMap{ + {Data: map[string]string{}}, + {Data: map[string]string{conf.CMSvcClusterID: "new"}, BinaryData: map[string][]byte{"queues.yaml.gzip": encodedConfigString}}, + }) + config := GetCoreSchedulerConfigFromConfigMap(confMap) + assert.DeepEqual(t, configs.DefaultSchedulerConfig, config) +} + func TestGetExtraConfigFromConfigMapNil(t *testing.T) { res := GetExtraConfigFromConfigMap(nil) assert.Equal(t, 0, len(res)) diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index 7b92cefdd..f156ed165 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -19,10 +19,14 @@ package conf import ( + "bytes" + "compress/gzip" + "encoding/base64" "encoding/json" "errors" "flag" "fmt" + "io" "os" "strconv" "strings" @@ -456,6 +460,40 @@ func DumpConfiguration() { } } +func Decompress(key string, value []byte) (string, string) { + var uncompressedData string + decodedValue := make([]byte, base64.StdEncoding.DecodedLen(len(value))) + n, err := base64.StdEncoding.Decode(decodedValue, value) + if err != nil { + log.Log(log.ShimConfig).Error("failed to decode schedulerConfig entry", zap.Error(err)) + return "", "" + } + decodedValue = decodedValue[:n] + splitKey := strings.Split(key, ".") + compressionAlgo := splitKey[len(splitKey)-1] + if strings.EqualFold(compressionAlgo, constants.GzipSuffix) { + reader := bytes.NewReader(decodedValue) + gzReader, err := gzip.NewReader(reader) + if err != nil { + log.Log(log.ShimConfig).Error("failed to decompress decoded schedulerConfig entry", zap.Error(err)) + return "", "" + } + defer func() { + if err = gzReader.Close(); err != nil { + log.Log(log.ShimConfig).Debug("gzip Reader could not be closed ", zap.Error(err)) + } + }() + decompressedBytes, err := io.ReadAll(gzReader) + if err != nil { + log.Log(log.ShimConfig).Error("failed to decompress decoded schedulerConfig entry", zap.Error(err)) + return "", "" + } + uncompressedData = string(decompressedBytes) + } + strippedKey, _ := strings.CutSuffix(key, "."+compressionAlgo) + return strippedKey, uncompressedData +} + func FlattenConfigMaps(configMaps []*v1.ConfigMap) map[string]string { result := make(map[string]string) for _, configMap := range configMaps { @@ -463,6 +501,10 @@ func FlattenConfigMaps(configMaps []*v1.ConfigMap) map[string]string { for k, v := range configMap.Data { result[k] = v } + for k, v := range configMap.BinaryData { + strippedKey, uncompressedData := Decompress(k, v) + result[strippedKey] = uncompressedData + } } } return result diff --git a/pkg/conf/schedulerconf_test.go b/pkg/conf/schedulerconf_test.go index 096e3796d..f79c108a1 100644 --- a/pkg/conf/schedulerconf_test.go +++ b/pkg/conf/schedulerconf_test.go @@ -18,14 +18,19 @@ limitations under the License. package conf import ( + "bytes" + "compress/gzip" + "encoding/base64" "fmt" "reflect" + "strings" "testing" "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" + "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-k8shim/pkg/common/constants" ) @@ -72,6 +77,39 @@ func assertDefaults(t *testing.T, conf *SchedulerConf) { assert.Equal(t, conf.UserLabelKey, constants.DefaultUserLabel) } +func TestDecompress(t *testing.T) { + var b bytes.Buffer + gzWriter := gzip.NewWriter(&b) + if _, err := gzWriter.Write([]byte(configs.DefaultSchedulerConfig)); err != nil { + assert.NilError(t, err, "expected nil, got error while compressing test schedulerConfig") + } + if err := gzWriter.Close(); err != nil { + assert.NilError(t, err, "expected nil, got error") + t.Fatal("expected nil, got error") + } + encodedConfigString := make([]byte, base64.StdEncoding.EncodedLen(len(b.Bytes()))) + base64.StdEncoding.Encode(encodedConfigString, b.Bytes()) + key, decodedConfigString := Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString) + assert.Equal(t, "queues.yaml", key) + assert.Equal(t, configs.DefaultSchedulerConfig, decodedConfigString) +} + +func TestDecompressUnkownKey(t *testing.T) { + encodedConfigString := make([]byte, base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig)))) + base64.StdEncoding.Encode(encodedConfigString, []byte(configs.DefaultSchedulerConfig)) + key, decodedConfigString := Decompress("queues.yaml.bin", encodedConfigString) + assert.Equal(t, "queues.yaml", key) + assert.Assert(t, len(decodedConfigString) == 0, "expected decodedConfigString to be nil") +} + +func TestDecompressBadCompression(t *testing.T) { + encodedConfigString := make([]byte, base64.StdEncoding.EncodedLen(len([]byte(configs.DefaultSchedulerConfig)))) + base64.StdEncoding.Encode(encodedConfigString, []byte(configs.DefaultSchedulerConfig)) + key, decodedConfigString := Decompress("queues.yaml."+constants.GzipSuffix, encodedConfigString) + assert.Equal(t, "", key) + assert.Assert(t, !strings.EqualFold(configs.DefaultSchedulerConfig, decodedConfigString), "expected decodedConfigString to be nil") +} + func TestParseConfigMap(t *testing.T) { testCases := []struct { name string