diff --git a/charts/spark-operator-chart/Chart.yaml b/charts/spark-operator-chart/Chart.yaml index 7dcec3a52..417418170 100644 --- a/charts/spark-operator-chart/Chart.yaml +++ b/charts/spark-operator-chart/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 name: spark-operator description: A Helm chart for Spark on Kubernetes operator -version: 1.4.1 -appVersion: v1beta2-1.6.0-3.5.0 +version: 1.4.2 +appVersion: v1beta2-1.6.1-3.5.0 keywords: - spark home: https://github.com/kubeflow/spark-operator diff --git a/charts/spark-operator-chart/README.md b/charts/spark-operator-chart/README.md index 50af340de..8dfe591d0 100644 --- a/charts/spark-operator-chart/README.md +++ b/charts/spark-operator-chart/README.md @@ -1,6 +1,6 @@ # spark-operator -![Version: 1.4.0](https://img.shields.io/badge/Version-1.4.0-informational?style=flat-square) ![AppVersion: v1beta2-1.6.0-3.5.0](https://img.shields.io/badge/AppVersion-v1beta2--1.6.0--3.5.0-informational?style=flat-square) +![Version: 1.4.2](https://img.shields.io/badge/Version-1.4.2-informational?style=flat-square) ![AppVersion: v1beta2-1.6.1-3.5.0](https://img.shields.io/badge/AppVersion-v1beta2--1.6.1--3.5.0-informational?style=flat-square) A Helm chart for Spark on Kubernetes operator @@ -110,7 +110,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | podMonitor.labels | object | `{}` | Pod monitor labels | | podMonitor.podMetricsEndpoint | object | `{"interval":"5s","scheme":"http"}` | Prometheus metrics endpoint properties. `metrics.portName` will be used as a port | | podSecurityContext | object | `{}` | Pod security context | -| priorityClassName | string | `""` | Priority class to be used for running spark-operator pod. This helps in managing the pods during preemption. | +| priorityClassName | string | `""` | A priority class to be used for running spark-operator pod. | | rbac.annotations | object | `{}` | Optional annotations for rbac | | rbac.create | bool | `false` | **DEPRECATED** use `createRole` and `createClusterRole` | | rbac.createClusterRole | bool | `true` | Create and use RBAC `ClusterRole` resources | @@ -134,6 +134,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | volumes | list | `[]` | | | webhook.enable | bool | `false` | Enable webhook server | | webhook.namespaceSelector | string | `""` | The webhook server will only operate on namespaces with this label, specified in the form key1=value1,key2=value2. Empty string (default) will operate on all namespaces | +| webhook.objectSelector | string | `""` | The webhook will only operate on resources with this label/s, specified in the form key1=value1,key2=value2, OR key in (value1,value2). Empty string (default) will operate on all objects | | webhook.port | int | `8080` | Webhook service port | | webhook.portName | string | `"webhook"` | Webhook container port name and service target port name | | webhook.timeout | int | `30` | The annotations applied to init job, required to restore certs deleted by the cleanup job during upgrade | diff --git a/charts/spark-operator-chart/templates/deployment.yaml b/charts/spark-operator-chart/templates/deployment.yaml index 797ea998e..cf12fb2e8 100644 --- a/charts/spark-operator-chart/templates/deployment.yaml +++ b/charts/spark-operator-chart/templates/deployment.yaml @@ -100,6 +100,7 @@ spec: - -webhook-port={{ .Values.webhook.port }} - -webhook-timeout={{ .Values.webhook.timeout }} - -webhook-namespace-selector={{ .Values.webhook.namespaceSelector }} + - -webhook-object-selector={{ .Values.webhook.objectSelector }} {{- end }} - -enable-resource-quota-enforcement={{ .Values.resourceQuotaEnforcement.enable }} {{- if gt (int .Values.replicaCount) 1 }} diff --git a/charts/spark-operator-chart/values.yaml b/charts/spark-operator-chart/values.yaml index df08008df..d9f63b645 100644 --- a/charts/spark-operator-chart/values.yaml +++ b/charts/spark-operator-chart/values.yaml @@ -103,6 +103,9 @@ webhook: # -- The webhook server will only operate on namespaces with this label, specified in the form key1=value1,key2=value2. # Empty string (default) will operate on all namespaces namespaceSelector: "" + # -- The webhook will only operate on resources with this label/s, specified in the form key1=value1,key2=value2, OR key in (value1,value2). + # Empty string (default) will operate on all objects + objectSelector: "" # -- The annotations applied to init job, required to restore certs deleted by the cleanup job during upgrade timeout: 30 diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index a5b54e82c..2984e4641 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -78,6 +78,7 @@ type WebHook struct { serviceRef *arv1.ServiceReference failurePolicy arv1.FailurePolicyType selector *metav1.LabelSelector + objectSelector *metav1.LabelSelector sparkJobNamespace string deregisterOnExit bool enableResourceQuotaEnforcement bool @@ -96,6 +97,7 @@ type webhookFlags struct { webhookPort int webhookFailOnError bool webhookNamespaceSelector string + webhookObjectSelector string } var userConfig webhookFlags @@ -109,6 +111,7 @@ func init() { flag.IntVar(&userConfig.webhookPort, "webhook-port", 8080, "Service port of the webhook server.") flag.BoolVar(&userConfig.webhookFailOnError, "webhook-fail-on-error", false, "Whether Kubernetes should reject requests when the webhook fails.") flag.StringVar(&userConfig.webhookNamespaceSelector, "webhook-namespace-selector", "", "The webhook will only operate on namespaces with this label, specified in the form key1=value1,key2=value2. Required if webhook-fail-on-error is true.") + flag.StringVar(&userConfig.webhookObjectSelector, "webhook-object-selector", "", "The webhook will only operate on pods with this label/s, specified in the form key1=value1,key2=value2, OR key in (value1,value2).") } // New creates a new WebHook instance. @@ -119,8 +122,8 @@ func New( deregisterOnExit bool, enableResourceQuotaEnforcement bool, coreV1InformerFactory informers.SharedInformerFactory, - webhookTimeout *int) (*WebHook, error) { - + webhookTimeout *int, +) (*WebHook, error) { certProvider, err := NewCertProvider( userConfig.webhookServiceName, userConfig.webhookServiceNamespace, @@ -159,13 +162,21 @@ func New( return nil, fmt.Errorf("webhook-namespace-selector must be set when webhook-fail-on-error is true") } } else { - selector, err := parseNamespaceSelector(userConfig.webhookNamespaceSelector) + selector, err := parseSelector(userConfig.webhookNamespaceSelector) if err != nil { return nil, err } hook.selector = selector } + if userConfig.webhookObjectSelector != "" { + selector, err := metav1.ParseToLabelSelector(userConfig.webhookObjectSelector) + if err != nil { + return nil, err + } + hook.objectSelector = selector + } + if enableResourceQuotaEnforcement { hook.resourceQuotaEnforcer = resourceusage.NewResourceQuotaEnforcer(informerFactory, coreV1InformerFactory) } @@ -180,7 +191,7 @@ func New( return hook, nil } -func parseNamespaceSelector(selectorArg string) (*metav1.LabelSelector, error) { +func parseSelector(selectorArg string) (*metav1.LabelSelector, error) { selector := &metav1.LabelSelector{ MatchLabels: make(map[string]string), } @@ -189,7 +200,7 @@ func parseNamespaceSelector(selectorArg string) (*metav1.LabelSelector, error) { for _, selectorStr := range selectorStrs { kv := strings.SplitN(selectorStr, "=", 2) if len(kv) != 2 || kv[0] == "" || kv[1] == "" { - return nil, fmt.Errorf("webhook namespace selector must be in the form key1=value1,key2=value2") + return nil, fmt.Errorf("webhook selector must be in the form key1=value1,key2=value2") } selector.MatchLabels[kv[0]] = kv[1] } @@ -441,6 +452,7 @@ func (wh *WebHook) selfRegistration(webhookConfigName string) error { }, FailurePolicy: &wh.failurePolicy, NamespaceSelector: wh.selector, + ObjectSelector: wh.objectSelector, TimeoutSeconds: wh.timeoutSeconds, SideEffects: &sideEffect, AdmissionReviewVersions: []string{"v1"}, @@ -455,6 +467,7 @@ func (wh *WebHook) selfRegistration(webhookConfigName string) error { }, FailurePolicy: &wh.failurePolicy, NamespaceSelector: wh.selector, + ObjectSelector: wh.objectSelector, TimeoutSeconds: wh.timeoutSeconds, SideEffects: &sideEffect, AdmissionReviewVersions: []string{"v1"}, @@ -587,7 +600,8 @@ func admitScheduledSparkApplications(review *admissionv1.AdmissionReview, enforc func mutatePods( review *admissionv1.AdmissionReview, lister crdlisters.SparkApplicationLister, - sparkJobNs string) (*admissionv1.AdmissionResponse, error) { + sparkJobNs string, +) (*admissionv1.AdmissionResponse, error) { raw := review.Request.Object.Raw pod := &corev1.Pod{} if err := json.Unmarshal(raw, pod); err != nil { diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index 63e7b46f1..6f2e2f088 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -23,12 +23,15 @@ import ( "time" "github.com/stretchr/testify/assert" - admissionv1 "k8s.io/api/admission/v1" + arv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + gotest "k8s.io/client-go/testing" spov1beta2 "github.com/kubeflow/spark-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" crdclientfake "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned/fake" @@ -183,8 +186,93 @@ func serializePod(pod *corev1.Pod) ([]byte, error) { return json.Marshal(pod) } +func TestSelfRegistrationWithObjectSelector(t *testing.T) { + clientset := fake.NewSimpleClientset() + informerFactory := crdinformers.NewSharedInformerFactory(nil, 0) + coreV1InformerFactory := informers.NewSharedInformerFactory(nil, 0) + + // Setup userConfig with object selector + userConfig.webhookObjectSelector = "spark-role in (driver,executor)" + webhookTimeout := 30 + + // Create webhook instance + webhook, err := New(clientset, informerFactory, "default", false, false, coreV1InformerFactory, &webhookTimeout) + assert.NoError(t, err) + + // Mock the clientset's Create function to capture the MutatingWebhookConfiguration object + var createdWebhookConfig *arv1.MutatingWebhookConfiguration + clientset.PrependReactor("create", "mutatingwebhookconfigurations", func(action gotest.Action) (handled bool, ret runtime.Object, err error) { + createAction := action.(gotest.CreateAction) + createdWebhookConfig = createAction.GetObject().(*arv1.MutatingWebhookConfiguration) + return true, createdWebhookConfig, nil + }) + + // Call the selfRegistration method + err = webhook.selfRegistration("test-webhook-config") + assert.NoError(t, err) + + // Verify the MutatingWebhookConfiguration was created with the expected object selector + assert.NotNil(t, createdWebhookConfig, "MutatingWebhookConfiguration should have been created") + + expectedSelector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "spark-role", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"driver", "executor"}, + }, + }, + } + actualSelector := createdWebhookConfig.Webhooks[0].ObjectSelector + + assert.True(t, labelSelectorsEqual(expectedSelector, actualSelector), "ObjectSelectors should be equal") +} + +func labelSelectorsEqual(expected, actual *metav1.LabelSelector) bool { + if expected == nil || actual == nil { + return expected == nil && actual == nil + } + + if len(expected.MatchLabels) != len(actual.MatchLabels) { + return false + } + + for k, v := range expected.MatchLabels { + if actual.MatchLabels[k] != v { + return false + } + } + + if len(expected.MatchExpressions) != len(actual.MatchExpressions) { + return false + } + + for i, expr := range expected.MatchExpressions { + if expr.Key != actual.MatchExpressions[i].Key || + expr.Operator != actual.MatchExpressions[i].Operator || + !equalStringSlices(expr.Values, actual.MatchExpressions[i].Values) { + return false + } + } + + return true +} + +func equalStringSlices(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + func testSelector(input string, expected *metav1.LabelSelector, t *testing.T) { - selector, err := parseNamespaceSelector(input) + selector, err := parseSelector(input) + if expected == nil { if err == nil { t.Errorf("Expected error parsing '%s', but got %v", input, selector)