Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nd/deregister logs and reregister #4062

Draft
wants to merge 5 commits into
base: release/1.4.1
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, err
}

r.Log.Info("retrieved", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
r.Log.Info("retrieved", "k8s-svc-endpoints-name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace, "full-service-endpoints", serviceEndpoints)

// If the endpoints object has the label "consul.hashicorp.com/service-ignore" set to true, deregister all instances in Consul for this service.
// It is possible that the endpoints object has never been registered, in which case deregistration is a no-op.
Expand All @@ -192,26 +192,32 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if address.TargetRef != nil && address.TargetRef.Kind == "Pod" {
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
r.Log.Info("** attempting to get pod for endpoint address", "endpoint-address", address)
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
// If the pod doesn't exist anymore, set up the deregisterEndpointAddress map to deregister it.
if k8serrors.IsNotFound(err) {
deregisterEndpointAddress[address.IP] = true
r.Log.Info("** 1 deregister[] set to true for address", "endpoint-pod-name", address.TargetRef.Name, "endpoint-pod-ns", address.TargetRef.Namespace, "endpoint-ip", address.IP)
r.Log.Info("pod not found", "name", address.TargetRef.Name)
} else {
// If there was a different error fetching the pod, then log the error but don't deregister it
// since this could be a K8s API blip and we don't want to prematurely deregister.
deregisterEndpointAddress[address.IP] = false
r.Log.Info("** 2 deregister[] set to false for address", "endpoint-pod-name", address.TargetRef.Name, "endpoint-pod-ns", address.TargetRef.Namespace, "endpoint-ip", address.IP)
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
errs = multierror.Append(errs, err)
}
continue
}

r.Log.Info("** got pod for registration", "pod", pod)

svcName, ok := pod.Annotations[constants.AnnotationKubernetesService]
if ok && serviceEndpoints.Name != svcName {
r.Log.Info("ignoring endpoint because it doesn't match explicit service annotation", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
// Set up the deregisterEndpointAddress to deregister service instances that don't match the annotation.
deregisterEndpointAddress[address.IP] = true
r.Log.Info("** 3 deregister[] set to true for address", "endpoint-pod-name", address.TargetRef.Name, "endpoint-pod-ns", address.TargetRef.Namespace, "endpoint-ip", address.IP)
continue
}

Expand All @@ -230,6 +236,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
// Build the deregisterEndpointAddress map up for deregistering service instances later.
deregisterEndpointAddress[pod.Status.PodIP] = false
r.Log.Info("** 4 deregister[] set to false for address", "pod-name", pod.Name, "pod-ns", pod.Namespace, "pod-ip", pod.Status.PodIP)
} else {
r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState)
Expand Down Expand Up @@ -258,6 +265,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
// Build the deregisterEndpointAddress map up for deregistering service instances later.
deregisterEndpointAddress[pod.Status.PodIP] = false
r.Log.Info("** 5 deregister[] set to false for address", "pod", address.TargetRef.Name, "pod-ns", address.TargetRef.Namespace, "ip", pod.Status.PodIP)
}
}
}
Expand Down Expand Up @@ -302,29 +310,39 @@ func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod c
}

// Register the service instance with Consul.
r.Log.Info("registering service with Consul", "name", serviceRegistration.Service.Service,
"id", serviceRegistration.Service.ID)
r.Log.Info("registering service with Consul", "csl-svc-name", serviceRegistration.Service.Service,
"csl-svc-id", serviceRegistration.Service.ID, "csl-svc-addr", serviceRegistration.Service.Address, "full-csl-svc", serviceRegistration)
_, err = apiClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to register service", "name", serviceRegistration.Service.Service)
return err
}

// Add manual ip to the VIP table
r.Log.Info("adding manual ip to virtual ip table in Consul", "name", serviceRegistration.Service.Service,
"id", serviceRegistration.ID)
r.Log.Info("adding manual ip to virtual ip table in Consul", "csl-svc-name", serviceRegistration.Service.Service,
"csl-svc-reg-id", serviceRegistration.ID)
err = assignServiceVirtualIP(r.Context, apiClient, serviceRegistration.Service)
if err != nil {
r.Log.Error(err, "failed to add ip to virtual ip table", "name", serviceRegistration.Service.Service)
}

// Register the proxy service instance with Consul.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Service.Service, "id", proxyServiceRegistration.Service.ID)
r.Log.Info("registering proxy service with Consul", "csl-svc-name", proxyServiceRegistration.Service.Service, "csl-svc-id", proxyServiceRegistration.Service.ID, "full-csl-svc", proxyServiceRegistration)
_, err = apiClient.Catalog().Register(proxyServiceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to register proxy service", "name", proxyServiceRegistration.Service.Service)
return err
}

// After registration, read back the service instances to confirm they are correct.
k8sSvcName := serviceEndpoints.Name
k8sSvcNamespace := serviceEndpoints.Namespace
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
// don't return as this is a debug log
}
r.Log.Info("** during register got back the following service instances from consul", "k8s-svc", k8sSvcName, "k8s-ns", k8sSvcNamespace, "services", serviceInstances)
}
return nil
}
Expand Down Expand Up @@ -951,43 +969,52 @@ func (r *Controller) deregisterService(
k8sSvcName string,
k8sSvcNamespace string,
deregisterEndpointAddress map[string]bool) (time.Duration, error) {
r.Log.Info("** deregister called for service", "k8s-svc-name", k8sSvcName, "k8s-svc-ns", k8sSvcNamespace)
r.Log.Info("** deregister with map", "deregister-ep-address-map", deregisterEndpointAddress)

// Get services matching metadata from Consul
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return 0, err
}
r.Log.Info("** during deregister got back the following service instances from consul", "k8s-svc", k8sSvcName, "k8s-ns", k8sSvcNamespace, "services", serviceInstances)

var errs error
var requeueAfter time.Duration
var shouldReregister bool
for _, svc := range serviceInstances {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool

r.Log.Info("** deciding whether to deregister", "csl-svc-name", svc.ServiceName, "csl-svc-id", svc.ServiceID, "csl-svc-address", svc.ServiceAddress, "full-csl-svc", svc)
if deregister(svc.ServiceAddress, deregisterEndpointAddress) {
r.Log.Info("** deregister was set to true on this address", "csl-svc-name", svc.ServiceName, "csl-svc-id", svc.ServiceID, "csl-svc-address", svc.ServiceAddress, "full-csl-svc", svc)
// If graceful shutdown is enabled, continue to the next service instance and
// mark that an event requeue is needed. We should requeue at the longest time interval
// to prevent excessive re-queues. Also, updating the health status in Consul to Critical
// should prevent routing during gracefulShutdown.
podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace)
podShutdownDuration, reregister, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
shouldReregister = reregister

// set requeue response, then continue to the next service instance
if podShutdownDuration > requeueAfter {
requeueAfter = podShutdownDuration
r.Log.Info("** pod shutdown duration was longer than requeue time, resetting requeue time to be the shutdown duration", "csl-svc-name", svc.ServiceName, "csl-svc-id", svc.ServiceID, "csl-svc-address", svc.ServiceAddress)
}
if podShutdownDuration > 0 {
r.Log.Info("** pod shutdown duration was greater than zero, skipping this service instance", "csl-svc-name", svc.ServiceName, "csl-svc-id", svc.ServiceID, "csl-svc-address", svc.ServiceAddress)
continue
}

// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
r.Log.Info("deregistering service instance from consul", "csl-svc-id", svc.ServiceID)
_, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Expand Down Expand Up @@ -1017,11 +1044,15 @@ func (r *Controller) deregisterService(
r.Log.Error(err, "failed to deregister node", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
if shouldReregister {
requeueAfter = 1 * time.Second
r.Log.Info("** re-queuing after 1 second for re-registration", "k8s-svc-name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter)
}
}
}

if requeueAfter > 0 {
r.Log.Info("re-queueing event for graceful shutdown", "name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter)
r.Log.Info("** re-queueing event for graceful shutdown or re-registration", "k8s-svc-name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter)
}

return requeueAfter, errs
Expand All @@ -1030,28 +1061,40 @@ func (r *Controller) deregisterService(
// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the
// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods
// gracefulShutdownPeriod setting).
func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) {
func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, bool, error) {
// Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are
// in graceful termination
podName := svc.ServiceMeta[constants.MetaKeyPodName]
if podName == "" {
return 0, nil
return 0, false, nil
}

var pod corev1.Pod
err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod)
if k8serrors.IsNotFound(err) {
return 0, nil
r.Log.Info("** pod not found, already deleted", "pod-name", podName, "k8sNamespace", k8sNamespace)
return 0, false, nil
}
if err != nil {
r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err)
return 0, false, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err)
}

r.Log.Info("** pod found for graceful shutdown check", "pod", pod)

// In a statefulset rollout, pods can go down and a new pod will come back up with the same name but a different
// address. In that case, it's not the old pod that is gracefully shutting down; the old pod is gone and we
// should deregister that old instance from Consul.
if string(pod.UID) != svc.ServiceMeta[constants.MetaKeyPodUID] {
r.Log.Info("** pod uid does not match registered service instance meta pod uid, re-queueing", "pod-uid", pod.UID, "svc-meta-pod-uid", svc.ServiceMeta[constants.MetaKeyPodUID])
return 0, true, nil
}

r.Log.Info("** getting shutdown period seconds for pod", "pod-name", podName, "k8sNamespace", k8sNamespace)
shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod)
if err != nil {
r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err)
return 0, false, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err)
}

if shutdownSeconds > 0 {
Expand All @@ -1073,19 +1116,19 @@ func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, a
SkipNodeUpdate: true,
}

r.Log.Info("updating health status of service with Consul to critical in order to drain inbound traffic", "name", svc.ServiceName,
"id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace)
r.Log.Info("** updating health status of service with Consul to critical in order to drain inbound traffic", "csl-svc-name", svc.ServiceName,
"csl-svc-id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace)
_, err = apiClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to update service health status to critical", "name", svc.ServiceName, "pod", podName)
return 0, fmt.Errorf("failed to update service health status for pod %s/%s to critical: %w", pod.Namespace, podName, err)
return 0, false, fmt.Errorf("failed to update service health status for pod %s/%s to critical: %w", pod.Namespace, podName, err)
}

// Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for
// any potential delay in the pod killed.
return time.Duration(shutdownSeconds+int(math.Ceil(float64(shutdownSeconds)*0.2))) * time.Second, nil
return time.Duration(shutdownSeconds+int(math.Ceil(float64(shutdownSeconds)*0.2))) * time.Second, false, nil
}
return 0, nil
return 0, false, nil
}

// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified,
Expand Down
Loading