diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 5082080217129..5b0693ef336b1 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -1066,298 +1066,6 @@ func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpoints( return err } -// createOrUpdateHeadlessEndpoints processes endpoints objects for exported -// headless services. When an endpoints object is created or updated in the -// remote cluster, it will be processed here in order to reconcile the local -// cluster state with the remote cluster state. -// -// createOrUpdateHeadlessEndpoints is also responsible for creating the service -// mirror in the source cluster. In order for an exported headless service to be -// mirrored as headless, it must have at least one port defined and at least one -// named address in its endpoints object (e.g a deployment would not work since -// pods may not have arbitrary hostnames). As such, when an endpoints object is -// first processed, if there is no mirror service, we create one, by looking at -// the endpoints object itself. If the exported service is deemed to be valid -// for headless mirroring, then the function will create the headless mirror and -// then create an endpoints object for it in the source cluster. If it is not -// valid, the exported service will be mirrored as clusterIP and its endpoints -// will point to the gateway. -// -// When creating endpoints for a headless mirror, we also create an endpoint -// mirror (clusterIP) service for each of the endpoints' named addresses. If the -// headless mirror exists and has an endpoints object, we simply update by -// either creating or deleting endpoint mirror services. -func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error { - exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name) - if err != nil { - rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) - return fmt.Errorf("error retrieving exported service %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) - } - - // Check whether the endpoints should be processed for a headless exported - // service. If the exported service does not have any ports exposed, then - // neither will its corresponding endpoint mirrors, it should not be created - // as a headless mirror. If the service does not have any named addresses in - // its Endpoints object, then the endpoints should not be processed. - if len(exportedService.Spec.Ports) == 0 { - rcsw.recorder.Event(exportedService, v1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports") - rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name) - return nil - } - - mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) - mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName) - if err != nil { - if !kerrors.IsNotFound(err) { - return err - } - - // If the mirror service does not exist, create it, either as clusterIP - // or as headless. - mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints) - if err != nil { - return err - } - } - - headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name) - headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName) - if err != nil { - if !kerrors.IsNotFound(err) { - return err - } - - if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { - return rcsw.createGatewayEndpoints(ctx, exportedService) - } - - // Create endpoint mirrors for headless mirror - if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil { - rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) - return err - } - - return nil - } - - // Past this point, we do not want to process a mirror service that is not - // headless. We want to process only endpoints for headless mirrors; before - // this point it would have been possible to have a clusterIP mirror, since - // we are creating the mirror service in the scope of the function. - if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { - return nil - } - - mirrorEndpoints := headlessMirrorEndpoints.DeepCopy() - endpointMirrors := make(map[string]struct{}) - newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets)) - for _, subset := range exportedEndpoints.Subsets { - newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses)) - for _, address := range subset.Addresses { - if address.Hostname == "" { - continue - } - - endpointMirrorName := rcsw.mirroredResourceName(address.Hostname) - endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName) - if err != nil { - if !kerrors.IsNotFound(err) { - return err - } - // If the error is 'NotFound' then the Endpoint Mirror service - // does not exist, so create it. - endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) - if err != nil { - return err - } - } - - endpointMirrors[endpointMirrorName] = struct{}{} - newAddresses = append(newAddresses, corev1.EndpointAddress{ - Hostname: address.Hostname, - IP: endpointMirrorService.Spec.ClusterIP, - }) - } - - if len(newAddresses) == 0 { - continue - } - - // copy ports, create subset - newSubsets = append(newSubsets, corev1.EndpointSubset{ - Addresses: newAddresses, - Ports: subset.DeepCopy().Ports, - }) - } - - headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name) - matchLabels := map[string]string{ - consts.MirroredHeadlessSvcNameLabel: headlessMirrorName, - } - - // Fetch all Endpoint Mirror services that belong to the same Headless Mirror - endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector()) - if err != nil { - return err - } - - var errors []error - for _, service := range endpointMirrorServices { - // If the service's name does not show up in the up-to-date map of - // Endpoint Mirror names, then we should delete it. - if _, found := endpointMirrors[service.Name]; found { - continue - } - err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}) - if err != nil { - if !kerrors.IsNotFound(err) { - errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %v", service.Namespace, service.Name, err)) - } - } - } - - if len(errors) > 0 { - return RetryableError{errors} - } - - // Update - mirrorEndpoints.Subsets = newSubsets - _, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(mirrorEndpoints.Namespace).Update(ctx, mirrorEndpoints, metav1.UpdateOptions{}) - if err != nil { - return RetryableError{[]error{err}} - } - - return nil -} - -// createRemoteHeadlessService creates a mirror service for an exported headless -// service. Whether the mirror will be created as a headless or clusterIP -// service depends on the endpoints object associated with the exported service. -// If there is at least one named address, then the service will be mirrored as -// headless. -// -// Note: we do not check for any exposed ports because it was previously done -// when the service was picked up by the service mirror. We also do not need to -// check if the exported service is headless; its endpoints will be processed -// only if it is headless so we are certain at this point that is the case. -func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) { - // If we don't have any subsets to process then avoid creating the service. - // We need at least one address to be make a decision (whether we should - // create as clusterIP or headless), rely on the endpoints being eventually - // consistent, will likely receive an update with subsets. - if len(exportedEndpoints.Subsets) == 0 { - return &corev1.Service{}, nil - } - - remoteService := exportedService.DeepCopy() - serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) - localServiceName := rcsw.mirroredResourceName(remoteService.Name) - - if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { - return &corev1.Service{}, err - } - - serviceToCreate := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: localServiceName, - Namespace: remoteService.Namespace, - Annotations: rcsw.getMirroredServiceAnnotations(remoteService), - Labels: rcsw.getMirroredServiceLabels(remoteService), - }, - Spec: corev1.ServiceSpec{ - Ports: remapRemoteServicePorts(remoteService.Spec.Ports), - }, - } - - if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) { - serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone - rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo) - } else { - rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo) - } - - svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}) - if err != nil { - if !kerrors.IsAlreadyExists(err) { - // we might have created it during earlier attempt, if that is not the case, we retry - return &corev1.Service{}, RetryableError{[]error{err}} - } - } - - return svc, err -} - -// createHeadlessMirrorEndpoints creates an endpoints object for a Headless -// Mirror service. The endpoints object will contain the same subsets and hosts -// as the endpoints object of the exported headless service. Each host in the -// Headless Mirror's endpoints object will point to an Endpoint Mirror service. -func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error { - exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) - endpointsHostnames := make(map[string]struct{}) - subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets)) - for _, subset := range exportedEndpoints.Subsets { - newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses)) - for _, addr := range subset.Addresses { - if addr.Hostname == "" { - continue - } - - endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname) - createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) - if err != nil { - rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err) - continue - } - - endpointsHostnames[addr.Hostname] = struct{}{} - newAddresses = append(newAddresses, corev1.EndpointAddress{ - Hostname: addr.TargetRef.Name, - IP: createdService.Spec.ClusterIP, - }) - - } - - if len(newAddresses) == 0 { - continue - } - - subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{ - Addresses: newAddresses, - Ports: subset.DeepCopy().Ports, - }) - } - - headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) - headlessMirrorEndpoints := &corev1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: headlessMirrorServiceName, - Namespace: exportedService.Namespace, - Labels: map[string]string{ - consts.MirroredResourceLabel: "true", - consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, - }, - Annotations: map[string]string{ - consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain), - }, - }, - Subsets: subsetsToCreate, - } - - if rcsw.link.GatewayIdentity != "" { - headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity - } - - rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace) - if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{}); err != nil { - // we clean up after ourselves - rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{}) - // and retry - return RetryableError{[]error{err}} - } - - return nil -} - // createEndpointMirrorService creates a new Endpoint Mirror service and its // corresponding endpoints object. It returns the newly created Endpoint Mirror // service object. When a headless service is exported, we create a Headless @@ -1435,30 +1143,6 @@ func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context return createdService, nil } -// shouldExportAsHeadlessService checks if an exported service should be -// mirrored as a headless service or as a clusterIP service, based on its -// endpoints object. For an exported service to be a headless mirror, it needs -// to have at least one named address in its endpoints (that is, a pod with a -// `hostname`). If the endpoints object does not contain at least one named -// address, it should be exported as clusterIP. -func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool { - for _, subset := range endpoints.Subsets { - for _, addr := range subset.Addresses { - if addr.Hostname != "" { - return true - } - } - - for _, addr := range subset.NotReadyAddresses { - if addr.Hostname != "" { - return true - } - } - } - log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name) - return false -} - func isExportedEndpoints(obj interface{}, log *logging.Entry) bool { ep, ok := obj.(*corev1.Endpoints) if !ok { @@ -1473,22 +1157,3 @@ func isExportedEndpoints(obj interface{}, log *logging.Entry) bool { return true } - -// isHeadlessEndpoints checks if an endpoints object belongs to a -// headless service. -func isHeadlessEndpoints(obj interface{}, log *logging.Entry) bool { - ep, ok := obj.(*corev1.Endpoints) - if !ok { - log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) - return false - } - - if _, found := ep.Labels[corev1.IsHeadlessService]; !found { - // Not an Endpoints object for a headless service? Then we likely don't want - // to update anything. - log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService) - return false - } - - return true -} diff --git a/multicluster/service-mirror/cluster_watcher_headless.go b/multicluster/service-mirror/cluster_watcher_headless.go new file mode 100644 index 0000000000000..ad19b72bf8f74 --- /dev/null +++ b/multicluster/service-mirror/cluster_watcher_headless.go @@ -0,0 +1,349 @@ +package servicemirror + +import ( + "context" + "fmt" + + consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// createOrUpdateHeadlessEndpoints processes endpoints objects for exported +// headless services. When an endpoints object is created or updated in the +// remote cluster, it will be processed here in order to reconcile the local +// cluster state with the remote cluster state. +// +// createOrUpdateHeadlessEndpoints is also responsible for creating the service +// mirror in the source cluster. In order for an exported headless service to be +// mirrored as headless, it must have at least one port defined and at least one +// named address in its endpoints object (e.g a deployment would not work since +// pods may not have arbitrary hostnames). As such, when an endpoints object is +// first processed, if there is no mirror service, we create one, by looking at +// the endpoints object itself. If the exported service is deemed to be valid +// for headless mirroring, then the function will create the headless mirror and +// then create an endpoints object for it in the source cluster. If it is not +// valid, the exported service will be mirrored as clusterIP and its endpoints +// will point to the gateway. +// +// When creating endpoints for a headless mirror, we also create an endpoint +// mirror (clusterIP) service for each of the endpoints' named addresses. If the +// headless mirror exists and has an endpoints object, we simply update by +// either creating or deleting endpoint mirror services. +func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error { + exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name) + if err != nil { + rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) + return fmt.Errorf("error retrieving exported service %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) + } + + // Check whether the endpoints should be processed for a headless exported + // service. If the exported service does not have any ports exposed, then + // neither will its corresponding endpoint mirrors, it should not be created + // as a headless mirror. If the service does not have any named addresses in + // its Endpoints object, then the endpoints should not be processed. + if len(exportedService.Spec.Ports) == 0 { + rcsw.recorder.Event(exportedService, v1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports") + rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name) + return nil + } + + mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + // If the mirror service does not exist, create it, either as clusterIP + // or as headless. + mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints) + if err != nil { + return err + } + } + + headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name) + headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { + return rcsw.createGatewayEndpoints(ctx, exportedService) + } + + // Create endpoint mirrors for headless mirror + if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil { + rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err) + return err + } + + return nil + } + + // Past this point, we do not want to process a mirror service that is not + // headless. We want to process only endpoints for headless mirrors; before + // this point it would have been possible to have a clusterIP mirror, since + // we are creating the mirror service in the scope of the function. + if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { + return nil + } + + mirrorEndpoints := headlessMirrorEndpoints.DeepCopy() + endpointMirrors := make(map[string]struct{}) + newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets)) + for _, subset := range exportedEndpoints.Subsets { + newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses)) + for _, address := range subset.Addresses { + if address.Hostname == "" { + continue + } + + endpointMirrorName := rcsw.mirroredResourceName(address.Hostname) + endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + // If the error is 'NotFound' then the Endpoint Mirror service + // does not exist, so create it. + endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) + if err != nil { + return err + } + } + + endpointMirrors[endpointMirrorName] = struct{}{} + newAddresses = append(newAddresses, corev1.EndpointAddress{ + Hostname: address.Hostname, + IP: endpointMirrorService.Spec.ClusterIP, + }) + } + + if len(newAddresses) == 0 { + continue + } + + // copy ports, create subset + newSubsets = append(newSubsets, corev1.EndpointSubset{ + Addresses: newAddresses, + Ports: subset.DeepCopy().Ports, + }) + } + + headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name) + matchLabels := map[string]string{ + consts.MirroredHeadlessSvcNameLabel: headlessMirrorName, + } + + // Fetch all Endpoint Mirror services that belong to the same Headless Mirror + endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector()) + if err != nil { + return err + } + + var errors []error + for _, service := range endpointMirrorServices { + // If the service's name does not show up in the up-to-date map of + // Endpoint Mirror names, then we should delete it. + if _, found := endpointMirrors[service.Name]; found { + continue + } + err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %v", service.Namespace, service.Name, err)) + } + } + } + + if len(errors) > 0 { + return RetryableError{errors} + } + + // Update + mirrorEndpoints.Subsets = newSubsets + _, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(mirrorEndpoints.Namespace).Update(ctx, mirrorEndpoints, metav1.UpdateOptions{}) + if err != nil { + return RetryableError{[]error{err}} + } + + return nil +} + +// createRemoteHeadlessService creates a mirror service for an exported headless +// service. Whether the mirror will be created as a headless or clusterIP +// service depends on the endpoints object associated with the exported service. +// If there is at least one named address, then the service will be mirrored as +// headless. +// +// Note: we do not check for any exposed ports because it was previously done +// when the service was picked up by the service mirror. We also do not need to +// check if the exported service is headless; its endpoints will be processed +// only if it is headless so we are certain at this point that is the case. +func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) { + // If we don't have any subsets to process then avoid creating the service. + // We need at least one address to be make a decision (whether we should + // create as clusterIP or headless), rely on the endpoints being eventually + // consistent, will likely receive an update with subsets. + if len(exportedEndpoints.Subsets) == 0 { + return &corev1.Service{}, nil + } + + remoteService := exportedService.DeepCopy() + serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) + localServiceName := rcsw.mirroredResourceName(remoteService.Name) + + if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { + return &corev1.Service{}, err + } + + serviceToCreate := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: localServiceName, + Namespace: remoteService.Namespace, + Annotations: rcsw.getMirroredServiceAnnotations(remoteService), + Labels: rcsw.getMirroredServiceLabels(remoteService), + }, + Spec: corev1.ServiceSpec{ + Ports: remapRemoteServicePorts(remoteService.Spec.Ports), + }, + } + + if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) { + serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone + rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo) + } else { + rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo) + } + + svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}) + if err != nil { + if !kerrors.IsAlreadyExists(err) { + // we might have created it during earlier attempt, if that is not the case, we retry + return &corev1.Service{}, RetryableError{[]error{err}} + } + } + + return svc, err +} + +// createHeadlessMirrorEndpoints creates an endpoints object for a Headless +// Mirror service. The endpoints object will contain the same subsets and hosts +// as the endpoints object of the exported headless service. Each host in the +// Headless Mirror's endpoints object will point to an Endpoint Mirror service. +func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error { + exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) + endpointsHostnames := make(map[string]struct{}) + subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets)) + for _, subset := range exportedEndpoints.Subsets { + newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses)) + for _, addr := range subset.Addresses { + if addr.Hostname == "" { + continue + } + + endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname) + createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService) + if err != nil { + rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err) + continue + } + + endpointsHostnames[addr.Hostname] = struct{}{} + newAddresses = append(newAddresses, corev1.EndpointAddress{ + Hostname: addr.TargetRef.Name, + IP: createdService.Spec.ClusterIP, + }) + + } + + if len(newAddresses) == 0 { + continue + } + + subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{ + Addresses: newAddresses, + Ports: subset.DeepCopy().Ports, + }) + } + + headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name) + headlessMirrorEndpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: headlessMirrorServiceName, + Namespace: exportedService.Namespace, + Labels: map[string]string{ + consts.MirroredResourceLabel: "true", + consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName, + }, + Annotations: map[string]string{ + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain), + }, + }, + Subsets: subsetsToCreate, + } + + if rcsw.link.GatewayIdentity != "" { + headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity + } + + rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace) + if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{}); err != nil { + // we clean up after ourselves + rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{}) + // and retry + return RetryableError{[]error{err}} + } + + return nil +} + +// shouldExportAsHeadlessService checks if an exported service should be +// mirrored as a headless service or as a clusterIP service, based on its +// endpoints object. For an exported service to be a headless mirror, it needs +// to have at least one named address in its endpoints (that is, a pod with a +// `hostname`). If the endpoints object does not contain at least one named +// address, it should be exported as clusterIP. +func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool { + for _, subset := range endpoints.Subsets { + for _, addr := range subset.Addresses { + if addr.Hostname != "" { + return true + } + } + + for _, addr := range subset.NotReadyAddresses { + if addr.Hostname != "" { + return true + } + } + } + log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name) + return false +} + +// isHeadlessEndpoints checks if an endpoints object belongs to a +// headless service. +func isHeadlessEndpoints(obj interface{}, log *logging.Entry) bool { + ep, ok := obj.(*corev1.Endpoints) + if !ok { + log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) + return false + } + + if _, found := ep.Labels[corev1.IsHeadlessService]; !found { + // Not an Endpoints object for a headless service? Then we likely don't want + // to update anything. + log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService) + return false + } + + return true +}