Skip to content

Commit

Permalink
account for pods that go unready and revive themself
Browse files Browse the repository at this point in the history
  • Loading branch information
zac-nixon committed Oct 1, 2024
1 parent 43ea5c7 commit 37eb983
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 59 deletions.
14 changes: 4 additions & 10 deletions controllers/elbv2/targetgroupbinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
return err
}

checkPoint, deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)
deferred, err := r.tgbResourceManager.Reconcile(ctx, tgb)

if err != nil {
return err
Expand All @@ -124,7 +124,7 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
return nil
}

if err := r.updateTargetGroupBindingStatus(ctx, tgb, checkPoint); err != nil {
if err := r.updateTargetGroupBindingStatus(ctx, tgb); err != nil {
r.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
}
Expand All @@ -147,18 +147,12 @@ func (r *targetGroupBindingReconciler) cleanupTargetGroupBinding(ctx context.Con
return nil
}

func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint string) error {
savedCheckPoint := targetgroupbinding.GetTGBReconcileCheckpoint(tgb)
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation && savedCheckPoint == newCheckPoint {
func (r *targetGroupBindingReconciler) updateTargetGroupBindingStatus(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
if aws.ToInt64(tgb.Status.ObservedGeneration) == tgb.Generation {
return nil
}

tgbOld := tgb.DeepCopy()
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, newCheckPoint)

if err := r.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb))
}

tgb.Status.ObservedGeneration = aws.Int64(tgb.Generation)
if err := r.k8sClient.Status().Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/backend/endpoint_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ func (r *defaultEndpointResolver) resolvePodEndpointsWithEndpointsData(ctx conte
containsPotentialReadyEndpoints = true
continue
}

podEndpoint := buildPodEndpoint(pod, epAddr, epPort)
if ep.Conditions.Ready != nil && *ep.Conditions.Ready {
// Recommendation from Kubernetes is to consider unknown ready status as ready (ready == nil)
if ep.Conditions.Ready == nil || *ep.Conditions.Ready {
readyPodEndpoints = append(readyPodEndpoints, podEndpoint)
continue
}
Expand Down
151 changes: 103 additions & 48 deletions pkg/targetgroupbinding/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const defaultTargetHealthRequeueDuration = 15 * time.Second
const defaultRequeueDuration = 15 * time.Second

// ResourceManager manages the TargetGroupBinding resource.
type ResourceManager interface {
Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error)
Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error)
Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error
}

Expand Down Expand Up @@ -60,7 +60,7 @@ func NewDefaultResourceManager(k8sClient client.Client, elbv2Client services.ELB
vpcInfoProvider: vpcInfoProvider,
podInfoRepo: podInfoRepo,

targetHealthRequeueDuration: defaultTargetHealthRequeueDuration,
requeueDuration: defaultRequeueDuration,
}
}

Expand All @@ -78,17 +78,34 @@ type defaultResourceManager struct {
podInfoRepo k8s.PodInfoRepo
vpcID string

targetHealthRequeueDuration time.Duration
requeueDuration time.Duration
}

func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) {
func (m *defaultResourceManager) Reconcile(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (bool, error) {
if tgb.Spec.TargetType == nil {
return "", false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String())
return false, errors.Errorf("targetType is not specified: %v", k8s.NamespacedName(tgb).String())
}

var newCheckPoint string
var oldCheckPoint string
var isDeferred bool
var err error

if *tgb.Spec.TargetType == elbv2api.TargetTypeIP {
return m.reconcileWithIPTargetType(ctx, tgb)
newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithIPTargetType(ctx, tgb)
} else {
newCheckPoint, oldCheckPoint, isDeferred, err = m.reconcileWithInstanceTargetType(ctx, tgb)
}

if err != nil {
return false, err
}
return m.reconcileWithInstanceTargetType(ctx, tgb)

if isDeferred {
return true, nil
}

return false, m.updateTGBCheckPoint(ctx, tgb, newCheckPoint, oldCheckPoint)
}

func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
Expand All @@ -104,7 +121,7 @@ func (m *defaultResourceManager) Cleanup(ctx context.Context, tgb *elbv2api.Targ
return nil
}

func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) {
func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) {
svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef)

targetHealthCondType := BuildTargetHealthPodConditionType(tgb)
Expand All @@ -121,128 +138,152 @@ func (m *defaultResourceManager) reconcileWithIPTargetType(ctx context.Context,
if err != nil {
if errors.Is(err, backend.ErrNotFound) {
m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error())
return "", false, m.Cleanup(ctx, tgb)
return "", "", false, m.Cleanup(ctx, tgb)
}
return "", false, err
return "", "", false, err
}

currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)
newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)

if err != nil {
return "", false, err
return "", "", false, err
}

savedCheckPoint := GetTGBReconcileCheckpoint(tgb)
oldCheckPoint := GetTGBReconcileCheckpoint(tgb)

if currentCheckPoint == savedCheckPoint {
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint)
return currentCheckPoint, true, nil
if !containsPotentialReadyEndpoints && oldCheckPoint == newCheckPoint {
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", newCheckPoint)
return newCheckPoint, oldCheckPoint, true, nil
}

tgARN := tgb.Spec.TargetGroupARN
vpcID := tgb.Spec.VpcID
targets, err := m.targetsManager.ListTargets(ctx, tgARN)
if err != nil {
return "", false, err
return "", "", false, err
}
notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets)
notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets)
matchedEndpointAndTargets, unmatchedEndpoints, unmatchedTargets := matchPodEndpointWithTargets(endpoints, notDrainingTargets)

needNetworkingRequeue := false
if err := m.networkingManager.ReconcileForPodEndpoints(ctx, tgb, endpoints); err != nil {
m.logger.Error(err, "Requesting network requeue due to error from ReconcileForPodEndpoints")
m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonFailedNetworkReconcile, err.Error())
needNetworkingRequeue = true
}

if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 || needNetworkingRequeue {
// Set to an empty checkpoint, to ensure that no matter what we try to reconcile atleast one more time.
// Consider this ordering of events (without using this method of overriding the checkpoint)
// 1. Register some pod IP, don't update TGB checkpoint.
// 2. Before next invocation of reconcile happens, the pod is removed.
// 3. The next reconcile loop has no knowledge that it needs to deregister the pod ip, therefore it skips deregistering the removed pod ip.
err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint)
if err != nil {
m.logger.Error(err, "Unable to update checkpoint before mutating change")
return "", "", false, err
}
}

if len(unmatchedTargets) > 0 {
if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil {
return "", false, err
return "", "", false, err
}
}
if len(unmatchedEndpoints) > 0 {
if err := m.registerPodEndpoints(ctx, tgARN, vpcID, unmatchedEndpoints); err != nil {
return "", false, err
return "", "", false, err
}
}

anyPodNeedFurtherProbe, err := m.updateTargetHealthPodCondition(ctx, targetHealthCondType, matchedEndpointAndTargets, unmatchedEndpoints)
if err != nil {
return "", false, err
return "", "", false, err
}

if anyPodNeedFurtherProbe {
if containsTargetsInInitialState(matchedEndpointAndTargets) || len(unmatchedEndpoints) != 0 {
return "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.targetHealthRequeueDuration)
}
return "", false, runtime.NewRequeueNeeded("monitor targetHealth")
m.logger.Info("Requeue for target monitor target health")
return "", "", false, runtime.NewRequeueNeededAfter("monitor targetHealth", m.requeueDuration)
}

if containsPotentialReadyEndpoints {
return "", false, runtime.NewRequeueNeeded("monitor potential ready endpoints")
m.logger.Info("Requeue for potentially ready endpoints")
return "", "", false, runtime.NewRequeueNeededAfter("monitor potential ready endpoints", m.requeueDuration)
}

_ = drainingTargets

if needNetworkingRequeue {
return "", false, runtime.NewRequeueNeeded("networking reconciliation")
m.logger.Info("Requeue for networking requeue")
return "", "", false, runtime.NewRequeueNeededAfter("networking reconciliation", m.requeueDuration)
}

return currentCheckPoint, false, nil
m.logger.Info("Successful reconcile", "checkpoint", newCheckPoint, "TGB", k8s.NamespacedName(tgb))
return newCheckPoint, oldCheckPoint, false, nil
}

func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, bool, error) {
func (m *defaultResourceManager) reconcileWithInstanceTargetType(ctx context.Context, tgb *elbv2api.TargetGroupBinding) (string, string, bool, error) {
svcKey := buildServiceReferenceKey(tgb, tgb.Spec.ServiceRef)
nodeSelector, err := backend.GetTrafficProxyNodeSelector(tgb)
if err != nil {
return "", false, err
return "", "", false, err
}

resolveOpts := []backend.EndpointResolveOption{backend.WithNodeSelector(nodeSelector)}
endpoints, err := m.endpointResolver.ResolveNodePortEndpoints(ctx, svcKey, tgb.Spec.ServiceRef.Port, resolveOpts...)
if err != nil {
if errors.Is(err, backend.ErrNotFound) {
m.eventRecorder.Event(tgb, corev1.EventTypeWarning, k8s.TargetGroupBindingEventReasonBackendNotFound, err.Error())
return "", false, m.Cleanup(ctx, tgb)
return "", "", false, m.Cleanup(ctx, tgb)
}
return "", false, err
return "", "", false, err
}

currentCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)
newCheckPoint, err := calculateTGBReconcileCheckpoint(endpoints, tgb)

if err != nil {
return "", false, err
return "", "", false, err
}

savedCheckPoint := GetTGBReconcileCheckpoint(tgb)
oldCheckPoint := GetTGBReconcileCheckpoint(tgb)

if currentCheckPoint == savedCheckPoint {
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", currentCheckPoint)
return currentCheckPoint, true, nil
if newCheckPoint == oldCheckPoint {
m.logger.Info("Skipping targetgroupbinding reconcile", "TGB", k8s.NamespacedName(tgb), "calculated hash", newCheckPoint)
return newCheckPoint, oldCheckPoint, true, nil
}

tgARN := tgb.Spec.TargetGroupARN
targets, err := m.targetsManager.ListTargets(ctx, tgARN)
if err != nil {
return "", false, err
return "", "", false, err
}
notDrainingTargets, drainingTargets := partitionTargetsByDrainingStatus(targets)
notDrainingTargets, _ := partitionTargetsByDrainingStatus(targets)
_, unmatchedEndpoints, unmatchedTargets := matchNodePortEndpointWithTargets(endpoints, notDrainingTargets)

if err := m.networkingManager.ReconcileForNodePortEndpoints(ctx, tgb, endpoints); err != nil {
return "", false, err
m.logger.Error(err, "Requesting network requeue due to error from ReconcileForNodePortEndpoints")
return "", "", false, err
}

if len(unmatchedEndpoints) > 0 || len(unmatchedTargets) > 0 {
// Same thought process, see the IP target registration code as to why we clear out the check point.
err = m.updateTGBCheckPoint(ctx, tgb, "", oldCheckPoint)
if err != nil {
m.logger.Error(err, "Unable to update checkpoint before mutating change")
return "", "", false, err
}
}

if len(unmatchedTargets) > 0 {
if err := m.deregisterTargets(ctx, tgARN, unmatchedTargets); err != nil {
return "", false, err
return "", "", false, err
}
}
if len(unmatchedEndpoints) > 0 {
if err := m.registerNodePortEndpoints(ctx, tgARN, unmatchedEndpoints); err != nil {
return "", false, err
return "", "", false, err
}
}
_ = drainingTargets
return currentCheckPoint, false, nil
m.logger.Info("Successful reconcile", "checkpoint", newCheckPoint)
return newCheckPoint, oldCheckPoint, false, nil
}

func (m *defaultResourceManager) cleanupTargets(ctx context.Context, tgb *elbv2api.TargetGroupBinding) error {
Expand Down Expand Up @@ -461,6 +502,20 @@ func (m *defaultResourceManager) registerNodePortEndpoints(ctx context.Context,
return m.targetsManager.RegisterTargets(ctx, tgARN, sdkTargets)
}

func (m *defaultResourceManager) updateTGBCheckPoint(ctx context.Context, tgb *elbv2api.TargetGroupBinding, newCheckPoint, previousCheckPoint string) error {
if newCheckPoint == previousCheckPoint {
return nil
}

tgbOld := tgb.DeepCopy()
SaveTGBReconcileCheckpoint(tgb, newCheckPoint)

if err := m.k8sClient.Patch(ctx, tgb, client.MergeFrom(tgbOld)); err != nil {
return errors.Wrapf(err, "failed to update targetGroupBinding checkpoint: %v", k8s.NamespacedName(tgb))
}
return nil
}

type podEndpointAndTargetPair struct {
endpoint backend.PodEndpoint
target TargetInfo
Expand Down

0 comments on commit 37eb983

Please sign in to comment.