Skip to content

Commit

Permalink
Move headless-service-specific functions to cluster_watcher_headless.…
Browse files Browse the repository at this point in the history
…go (linkerd#7559)

This just unclutters a bit `cluster_watcher.go` by moving all the
functions specific to headless services into the new
`cluster_watch_headless.go` file.
  • Loading branch information
alpeb authored Jan 4, 2022
1 parent 75903af commit 8a9e4de
Show file tree
Hide file tree
Showing 2 changed files with 349 additions and 335 deletions.
335 changes: 0 additions & 335 deletions multicluster/service-mirror/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,298 +1066,6 @@ func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpoints(
return err
}

// createOrUpdateHeadlessEndpoints processes endpoints objects for exported
// headless services. When an endpoints object is created or updated in the
// remote cluster, it will be processed here in order to reconcile the local
// cluster state with the remote cluster state.
//
// createOrUpdateHeadlessEndpoints is also responsible for creating the service
// mirror in the source cluster. In order for an exported headless service to be
// mirrored as headless, it must have at least one port defined and at least one
// named address in its endpoints object (e.g a deployment would not work since
// pods may not have arbitrary hostnames). As such, when an endpoints object is
// first processed, if there is no mirror service, we create one, by looking at
// the endpoints object itself. If the exported service is deemed to be valid
// for headless mirroring, then the function will create the headless mirror and
// then create an endpoints object for it in the source cluster. If it is not
// valid, the exported service will be mirrored as clusterIP and its endpoints
// will point to the gateway.
//
// When creating endpoints for a headless mirror, we also create an endpoint
// mirror (clusterIP) service for each of the endpoints' named addresses. If the
// headless mirror exists and has an endpoints object, we simply update by
// either creating or deleting endpoint mirror services.
func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error {
exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name)
if err != nil {
rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
return fmt.Errorf("error retrieving exported service %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
}

// Check whether the endpoints should be processed for a headless exported
// service. If the exported service does not have any ports exposed, then
// neither will its corresponding endpoint mirrors, it should not be created
// as a headless mirror. If the service does not have any named addresses in
// its Endpoints object, then the endpoints should not be processed.
if len(exportedService.Spec.Ports) == 0 {
rcsw.recorder.Event(exportedService, v1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports")
rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name)
return nil
}

mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName)
if err != nil {
if !kerrors.IsNotFound(err) {
return err
}

// If the mirror service does not exist, create it, either as clusterIP
// or as headless.
mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints)
if err != nil {
return err
}
}

headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name)
headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName)
if err != nil {
if !kerrors.IsNotFound(err) {
return err
}

if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
return rcsw.createGatewayEndpoints(ctx, exportedService)
}

// Create endpoint mirrors for headless mirror
if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil {
rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
return err
}

return nil
}

// Past this point, we do not want to process a mirror service that is not
// headless. We want to process only endpoints for headless mirrors; before
// this point it would have been possible to have a clusterIP mirror, since
// we are creating the mirror service in the scope of the function.
if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
return nil
}

mirrorEndpoints := headlessMirrorEndpoints.DeepCopy()
endpointMirrors := make(map[string]struct{})
newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
for _, subset := range exportedEndpoints.Subsets {
newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
for _, address := range subset.Addresses {
if address.Hostname == "" {
continue
}

endpointMirrorName := rcsw.mirroredResourceName(address.Hostname)
endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName)
if err != nil {
if !kerrors.IsNotFound(err) {
return err
}
// If the error is 'NotFound' then the Endpoint Mirror service
// does not exist, so create it.
endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
if err != nil {
return err
}
}

endpointMirrors[endpointMirrorName] = struct{}{}
newAddresses = append(newAddresses, corev1.EndpointAddress{
Hostname: address.Hostname,
IP: endpointMirrorService.Spec.ClusterIP,
})
}

if len(newAddresses) == 0 {
continue
}

// copy ports, create subset
newSubsets = append(newSubsets, corev1.EndpointSubset{
Addresses: newAddresses,
Ports: subset.DeepCopy().Ports,
})
}

headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name)
matchLabels := map[string]string{
consts.MirroredHeadlessSvcNameLabel: headlessMirrorName,
}

// Fetch all Endpoint Mirror services that belong to the same Headless Mirror
endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
if err != nil {
return err
}

var errors []error
for _, service := range endpointMirrorServices {
// If the service's name does not show up in the up-to-date map of
// Endpoint Mirror names, then we should delete it.
if _, found := endpointMirrors[service.Name]; found {
continue
}
err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{})
if err != nil {
if !kerrors.IsNotFound(err) {
errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %v", service.Namespace, service.Name, err))
}
}
}

if len(errors) > 0 {
return RetryableError{errors}
}

// Update
mirrorEndpoints.Subsets = newSubsets
_, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(mirrorEndpoints.Namespace).Update(ctx, mirrorEndpoints, metav1.UpdateOptions{})
if err != nil {
return RetryableError{[]error{err}}
}

return nil
}

// createRemoteHeadlessService creates a mirror service for an exported headless
// service. Whether the mirror will be created as a headless or clusterIP
// service depends on the endpoints object associated with the exported service.
// If there is at least one named address, then the service will be mirrored as
// headless.
//
// Note: we do not check for any exposed ports because it was previously done
// when the service was picked up by the service mirror. We also do not need to
// check if the exported service is headless; its endpoints will be processed
// only if it is headless so we are certain at this point that is the case.
func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) {
// If we don't have any subsets to process then avoid creating the service.
// We need at least one address to be make a decision (whether we should
// create as clusterIP or headless), rely on the endpoints being eventually
// consistent, will likely receive an update with subsets.
if len(exportedEndpoints.Subsets) == 0 {
return &corev1.Service{}, nil
}

remoteService := exportedService.DeepCopy()
serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
localServiceName := rcsw.mirroredResourceName(remoteService.Name)

if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil {
return &corev1.Service{}, err
}

serviceToCreate := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: localServiceName,
Namespace: remoteService.Namespace,
Annotations: rcsw.getMirroredServiceAnnotations(remoteService),
Labels: rcsw.getMirroredServiceLabels(remoteService),
},
Spec: corev1.ServiceSpec{
Ports: remapRemoteServicePorts(remoteService.Spec.Ports),
},
}

if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) {
serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone
rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo)
} else {
rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo)
}

svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{})
if err != nil {
if !kerrors.IsAlreadyExists(err) {
// we might have created it during earlier attempt, if that is not the case, we retry
return &corev1.Service{}, RetryableError{[]error{err}}
}
}

return svc, err
}

// createHeadlessMirrorEndpoints creates an endpoints object for a Headless
// Mirror service. The endpoints object will contain the same subsets and hosts
// as the endpoints object of the exported headless service. Each host in the
// Headless Mirror's endpoints object will point to an Endpoint Mirror service.
func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error {
exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
endpointsHostnames := make(map[string]struct{})
subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
for _, subset := range exportedEndpoints.Subsets {
newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
for _, addr := range subset.Addresses {
if addr.Hostname == "" {
continue
}

endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname)
createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
if err != nil {
rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err)
continue
}

endpointsHostnames[addr.Hostname] = struct{}{}
newAddresses = append(newAddresses, corev1.EndpointAddress{
Hostname: addr.TargetRef.Name,
IP: createdService.Spec.ClusterIP,
})

}

if len(newAddresses) == 0 {
continue
}

subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{
Addresses: newAddresses,
Ports: subset.DeepCopy().Ports,
})
}

headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
headlessMirrorEndpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: headlessMirrorServiceName,
Namespace: exportedService.Namespace,
Labels: map[string]string{
consts.MirroredResourceLabel: "true",
consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
},
Annotations: map[string]string{
consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
},
},
Subsets: subsetsToCreate,
}

if rcsw.link.GatewayIdentity != "" {
headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
}

rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace)
if _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{}); err != nil {
// we clean up after ourselves
rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{})
// and retry
return RetryableError{[]error{err}}
}

return nil
}

// createEndpointMirrorService creates a new Endpoint Mirror service and its
// corresponding endpoints object. It returns the newly created Endpoint Mirror
// service object. When a headless service is exported, we create a Headless
Expand Down Expand Up @@ -1435,30 +1143,6 @@ func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context
return createdService, nil
}

// shouldExportAsHeadlessService checks if an exported service should be
// mirrored as a headless service or as a clusterIP service, based on its
// endpoints object. For an exported service to be a headless mirror, it needs
// to have at least one named address in its endpoints (that is, a pod with a
// `hostname`). If the endpoints object does not contain at least one named
// address, it should be exported as clusterIP.
func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool {
for _, subset := range endpoints.Subsets {
for _, addr := range subset.Addresses {
if addr.Hostname != "" {
return true
}
}

for _, addr := range subset.NotReadyAddresses {
if addr.Hostname != "" {
return true
}
}
}
log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name)
return false
}

func isExportedEndpoints(obj interface{}, log *logging.Entry) bool {
ep, ok := obj.(*corev1.Endpoints)
if !ok {
Expand All @@ -1473,22 +1157,3 @@ func isExportedEndpoints(obj interface{}, log *logging.Entry) bool {

return true
}

// isHeadlessEndpoints checks if an endpoints object belongs to a
// headless service.
func isHeadlessEndpoints(obj interface{}, log *logging.Entry) bool {
ep, ok := obj.(*corev1.Endpoints)
if !ok {
log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
return false
}

if _, found := ep.Labels[corev1.IsHeadlessService]; !found {
// Not an Endpoints object for a headless service? Then we likely don't want
// to update anything.
log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService)
return false
}

return true
}
Loading

0 comments on commit 8a9e4de

Please sign in to comment.