diff --git a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go index 2903f09329..8ec293e964 100644 --- a/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go +++ b/deploy/kubernetes/operator/api/uniffle/v1alpha1/remoteshuffleservice_types.go @@ -106,6 +106,14 @@ type CoordinatorConfig struct { // HTTPNodePort defines http port of node port service used for coordinators' external access. // +optional HTTPNodePort []int32 `json:"httpNodePort,omitempty"` + + // NodePortServiceAnnotations is a list of annotations for the NodePort service. + // +optional + NodePortServiceAnnotations []map[string]string `json:"nodePortServiceAnnotations,omitempty"` + + // HeadlessServiceAnnotations is a list of annotations for the headless service. + // +optional + HeadlessServiceAnnotations []map[string]string `json:"headlessServiceAnnotations,omitempty"` } // ShuffleServerConfig records configuration used to generate workload of shuffle servers. diff --git a/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go b/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go index c9e15d7a5e..da70877ba3 100644 --- a/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go +++ b/deploy/kubernetes/operator/api/uniffle/v1alpha1/zz_generated.deepcopy.go @@ -99,6 +99,32 @@ func (in *CoordinatorConfig) DeepCopyInto(out *CoordinatorConfig) { *out = make([]int32, len(*in)) copy(*out, *in) } + if in.NodePortServiceAnnotations != nil { + in, out := &in.NodePortServiceAnnotations, &out.NodePortServiceAnnotations + *out = make([]map[string]string, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + } + } + if in.HeadlessServiceAnnotations != nil { + in, out := &in.HeadlessServiceAnnotations, &out.HeadlessServiceAnnotations + *out = make([]map[string]string, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CoordinatorConfig. diff --git a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml index 46e3e23b83..e36e46b406 100644 --- a/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml +++ b/deploy/kubernetes/operator/config/crd/bases/uniffle.apache.org_remoteshuffleservices.yaml @@ -1786,6 +1786,14 @@ spec: description: ExcludeNodesFilePath indicates exclude nodes file path in coordinators' containers. type: string + headlessServiceAnnotations: + description: HeadlessServiceAnnotations is a list of annotations + for the headless service. + items: + additionalProperties: + type: string + type: object + type: array hostNetwork: default: true description: HostNetwork indicates whether we need to enable host @@ -1827,6 +1835,14 @@ spec: description: LogHostPath represents host path used to save logs of shuffle servers. type: string + nodePortServiceAnnotations: + description: NodePortServiceAnnotations is a list of annotations + for the NodePort service. + items: + additionalProperties: + type: string + type: object + type: array nodeSelector: additionalProperties: type: string diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go index 84b6c84a68..b134d6ca44 100644 --- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go +++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go @@ -105,10 +105,19 @@ func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) * name := GenerateNameByIndex(rss, index) serviceName := appendHeadless(name) + annotations := map[string]string{} + + if len(rss.Spec.Coordinator.HeadlessServiceAnnotations) > index { + for key, value := range rss.Spec.Coordinator.HeadlessServiceAnnotations[index] { + annotations[key] = value + } + } + svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - Namespace: rss.Namespace, + Name: serviceName, + Namespace: rss.Namespace, + Annotations: annotations, }, Spec: corev1.ServiceSpec{ ClusterIP: corev1.ClusterIPNone, @@ -140,10 +149,20 @@ func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) * // this function is skipped. func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) *corev1.Service { name := GenerateNameByIndex(rss, index) + + annotations := map[string]string{} + + if len(rss.Spec.Coordinator.NodePortServiceAnnotations) > index { + for key, value := range rss.Spec.Coordinator.NodePortServiceAnnotations[index] { + annotations[key] = value + } + } + svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: rss.Namespace, + Name: name, + Namespace: rss.Namespace, + Annotations: annotations, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeNodePort, diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go index 22caf5fc41..ebfdb3cf53 100644 --- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go +++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go @@ -138,8 +138,24 @@ var ( "key1": "value1", "key2": "value2", } + + testSvcAnnotationsList = []map[string]string{ + { + "annotation1": "value1", + }, + { + "annotation2": "value2", + }, + } ) +func buildRssWithSvcAnnotations() *uniffleapi.RemoteShuffleService { + rss := utils.BuildRSSWithDefaultValue() + rss.Spec.Coordinator.NodePortServiceAnnotations = testSvcAnnotationsList + rss.Spec.Coordinator.HeadlessServiceAnnotations = testSvcAnnotationsList + return rss +} + func buildRssWithLabels() *uniffleapi.RemoteShuffleService { rss := utils.BuildRSSWithDefaultValue() rss.Spec.Coordinator.Labels = testLabels @@ -546,6 +562,35 @@ func TestGenerateSvcForCoordinator(t *testing.T) { } } +func TestGenerateSvcWithAnnotationsForCoordinator(t *testing.T) { + for _, tt := range []struct { + name string + rss *uniffleapi.RemoteShuffleService + expectedAnnotations []map[string]string + }{ + { + name: "nodeport and headless services with annotations", + rss: buildRssWithSvcAnnotations(), + expectedAnnotations: []map[string]string{ + {"annotation1": "value1"}, + {"annotation1": "value1"}, + {"annotation2": "value2"}, + {"annotation2": "value2"}}, + }, + } { + t.Run(tt.name, func(tc *testing.T) { + _, _, services, _ := GenerateCoordinators(tt.rss) + + for i, svc := range services { + match := reflect.DeepEqual(tt.expectedAnnotations[i], svc.Annotations) + if !match { + tc.Errorf("unexpected annotations: %v, expected: %v", svc.Annotations, tt.expectedAnnotations[i]) + } + } + }) + } +} + func TestGenerateAddresses(t *testing.T) { assertion := assert.New(t) rss := buildRssWithLabels()