diff --git a/controllers/elbv2/targetgroupbinding_controller.go b/controllers/elbv2/targetgroupbinding_controller.go index ff1856a4a..0f48d18cb 100644 --- a/controllers/elbv2/targetgroupbinding_controller.go +++ b/controllers/elbv2/targetgroupbinding_controller.go @@ -113,7 +113,7 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C return err } - checkPoint, deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb) + deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb) if err != nil { return err @@ -124,7 +124,7 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C return nil } - if err := r.updateTargetGroupBindingStatus(ctx, tgb, checkPoint); err != nil { + if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil { r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err)) return err } @@ -147,18 +147,12 @@ func (r *targetGroupBindingReconciler) cleanupTargetGroupBinding(ctx context.Con return nil } -func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint string) error { - savedCheckPoint := targetgroupbinding.GetTGBReconcileCheckpoint(tgb) - if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation && savedCheckPoint == newCheckPoint { +func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { + if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation { return nil } tgbOld := tgb.DeepCopy() - targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, newCheckPoint) - - if err := r.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { - return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb)) - } tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation) if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { diff --git a/pkg/backend/endpoint_resolver.go b/pkg/backend/endpoint_resolver.go index 254b4ae9f..507ac16ca 100644 --- a/pkg/backend/endpoint_resolver.go +++ b/pkg/backend/endpoint_resolver.go @@ -170,8 +170,10 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte containsPotentialReadyEndpoints = true continue } + podEndpoint := buildPodEndpoint(pod, epAddr, epPort) - if ep.Conditions.Ready != nil && *ep.Conditions.Ready { + // Recommendation from Kubernetes is to consider unknown ready status as ready (ready == nil) + if ep.Conditions.Ready == nil || *ep.Conditions.Ready { readyPodEndpoints = append(readyPodEndpoints, podEndpoint) continue } diff --git a/pkg/targetgroupbinding/resource_manager.go b/pkg/targetgroupbinding/resource_manager.go index 32cefa1c4..1b5425392 100644 --- a/pkg/targetgroupbinding/resource_manager.go +++ b/pkg/targetgroupbinding/resource_manager.go @@ -26,11 +26,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -const defaultTargetHealthRequeueDuration = 15 * time.Second +const defaultRequeueDuration = 15 * time.Second // ResourceManager manages the TargetGroupBinding resource. type ResourceManager interface { - Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) + Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error } @@ -60,7 +60,7 @@ func NewDefaultResourceManager(k8sClient client.Client, elbv2Client services.ELB vpcInfoProvider: vpcInfoProvider, podInfoRepo: podInfoRepo, - targetHealthRequeueDuration: defaultTargetHealthRequeueDuration, + requeueDuration: defaultRequeueDuration, } } @@ -78,17 +78,34 @@ type defaultResourceManager struct { podInfoRepo k8s.PodInfoRepo vpcID string - targetHealthRequeueDuration time.Duration + requeueDuration time.Duration } -func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) { +func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error) { if tgb.Spec.TargetType == nil { - return "", false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String()) + return false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String()) } + + var newCheckPoint string + var oldCheckPoint string + var isDeferred bool + var err error + if *tgb.Spec.TargetType == elbv2api.TargetTypeIP { - return m.reconcileWithIPTargetType(ctx, tgb) + newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithIPTargetType(ctx, tgb) + } else { + newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithInstanceTargetType(ctx, tgb) + } + + if err != nil { + return false, err } - return m.reconcileWithInstanceTargetType(ctx, tgb) + + if isDeferred { + return true, nil + } + + return false, m.updateTGBCheckPoint(ctx, tgb, newCheckPoint, oldCheckPoint) } func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { @@ -104,7 +121,7 @@ func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.Targ return nil } -func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) { +func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) { svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef) targetHealthCondType := BuildTargetHealthPodConditionType(tgb) @@ -121,79 +138,93 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, if err != nil { if errors.Is(err, backend.ErrNotFound) { m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return "", false, m.Cleanup(ctx, tgb) + return "", "", false, m.Cleanup(ctx, tgb) } - return "", false, err + return "", "", false, err } - currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) if err != nil { - return "", false, err + return "", "", false, err } - savedCheckPoint := GetTGBReconcileCheckpoint(tgb) + oldCheckPoint := GetTGBReconcileCheckpoint(tgb) - if currentCheckPoint == savedCheckPoint { - m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint) - return currentCheckPoint, true, nil + if !containsPotentialReadyEndpoints && oldCheckPoint == newCheckPoint { + m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", newCheckPoint) + return newCheckPoint, oldCheckPoint, true, nil } tgARN := tgb.Spec.TargetGroupARN vpcID := tgb.Spec.VpcID targets, err := m.targetsManager.ListTargets(ctx, tgARN) if err != nil { - return "", false, err + return "", "", false, err } - notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets) + notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets) matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets) needNetworkingRequeue := false if err := m.networkingManager.ReconcileForPodEndpoints(ctx, tgb, endpoints); err != nil { + m.logger.Error(err, "Requesting network requeue due to error from ReconcileForPodEndpoints") m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedNetworkReconcile, err.Error()) needNetworkingRequeue = true } + + if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 || needNetworkingRequeue { + // Set to an empty checkpoint, to ensure that no matter what we try to reconcile atleast one more time. + // Consider this ordering of events (without using this method of overriding the checkpoint) + // 1. Register some pod IP, don't update TGB checkpoint. + // 2. Before next invocation of reconcile happens, the pod is removed. + // 3. The next reconcile loop has no knowledge that it needs to deregister the pod ip, therefore it skips deregistering the removed pod ip. + err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint) + if err != nil { + m.logger.Error(err, "Unable to update checkpoint before mutating change") + return "", "", false, err + } + } + if len(unmatchedTargets) > 0 { if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil { - return "", false, err + return "", "", false, err } } if len(unmatchedEndpoints) > 0 { if err := m.registerPodEndpoints(ctx, tgARN, vpcID, unmatchedEndpoints); err != nil { - return "", false, err + return "", "", false, err } } anyPodNeedFurtherProbe, err := m.updateTargetHealthPodCondition(ctx, targetHealthCondType, matchedEndpointAndTargets, unmatchedEndpoints) if err != nil { - return "", false, err + return "", "", false, err } if anyPodNeedFurtherProbe { - if containsTargetsInInitialState(matchedEndpointAndTargets) || len(unmatchedEndpoints) != 0 { - return "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.targetHealthRequeueDuration) - } - return "", false, runtime.NewRequeueNeeded("monitor targetHealth") + m.logger.Info("Requeue for target monitor target health") + return "", "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.requeueDuration) } if containsPotentialReadyEndpoints { - return "", false, runtime.NewRequeueNeeded("monitor potential ready endpoints") + m.logger.Info("Requeue for potentially ready endpoints") + return "", "", false, runtime.NewRequeueNeededAfter("monitor potential ready endpoints", m.requeueDuration) } - _ = drainingTargets - if needNetworkingRequeue { - return "", false, runtime.NewRequeueNeeded("networking reconciliation") + m.logger.Info("Requeue for networking requeue") + return "", "", false, runtime.NewRequeueNeededAfter("networking reconciliation", m.requeueDuration) } - return currentCheckPoint, false, nil + m.logger.Info("Successful reconcile", "checkpoint", newCheckPoint) + return newCheckPoint, oldCheckPoint, false, nil } -func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) { +func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) { svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef) nodeSelector, err := backend.GetTrafficProxyNodeSelector(tgb) if err != nil { - return "", false, err + return "", "", false, err } resolveOpts := []backend.EndpointResolveOption{backend.WithNodeSelector(nodeSelector)} @@ -201,48 +232,58 @@ func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Con if err != nil { if errors.Is(err, backend.ErrNotFound) { m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error()) - return "", false, m.Cleanup(ctx, tgb) + return "", "", false, m.Cleanup(ctx, tgb) } - return "", false, err + return "", "", false, err } - currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) + newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb) if err != nil { - return "", false, err + return "", "", false, err } - savedCheckPoint := GetTGBReconcileCheckpoint(tgb) + oldCheckPoint := GetTGBReconcileCheckpoint(tgb) - if currentCheckPoint == savedCheckPoint { - m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint) - return currentCheckPoint, true, nil + if newCheckPoint == oldCheckPoint { + m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", newCheckPoint) + return newCheckPoint, oldCheckPoint, true, nil } tgARN := tgb.Spec.TargetGroupARN targets, err := m.targetsManager.ListTargets(ctx, tgARN) if err != nil { - return "", false, err + return "", "", false, err } - notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets) + notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets) _, unmatchedEndpoints, unmatchedTargets := matchNodePortEndpointWithTargets(endpoints, notDrainingTargets) if err := m.networkingManager.ReconcileForNodePortEndpoints(ctx, tgb, endpoints); err != nil { - return "", false, err + m.logger.Error(err, "Requesting network requeue due to error from ReconcileForNodePortEndpoints") + return "", "", false, err + } + + if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 { + // Same thought process, see the IP target registration code as to why we clear out the check point. + err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint) + if err != nil { + m.logger.Error(err, "Unable to update checkpoint before mutating change") + return "", "", false, err + } } if len(unmatchedTargets) > 0 { if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil { - return "", false, err + return "", "", false, err } } if len(unmatchedEndpoints) > 0 { if err := m.registerNodePortEndpoints(ctx, tgARN, unmatchedEndpoints); err != nil { - return "", false, err + return "", "", false, err } } - _ = drainingTargets - return currentCheckPoint, false, nil + m.logger.Info("Successful reconcile", "checkpoint", newCheckPoint) + return newCheckPoint, oldCheckPoint, false, nil } func (m *defaultResourceManager) cleanupTargets(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error { @@ -461,6 +502,20 @@ func (m *defaultResourceManager) registerNodePortEndpoints(ctx context.Context, return m.targetsManager.RegisterTargets(ctx, tgARN, sdkTargets) } +func (m *defaultResourceManager) updateTGBCheckPoint(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint, previousCheckPoint string) error { + if newCheckPoint == previousCheckPoint { + return nil + } + + tgbOld := tgb.DeepCopy() + SaveTGBReconcileCheckpoint(tgb, newCheckPoint) + + if err := m.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil { + return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb)) + } + return nil +} + type podEndpointAndTargetPair struct { endpoint backend.PodEndpoint target TargetInfo