Skip to content

Commit

Permalink
Prestop lifecycle hook feature
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Sep 11, 2023
1 parent ff266e6 commit b5b3d7a
Show file tree
Hide file tree
Showing 9 changed files with 776 additions and 0 deletions.
3 changes: 3 additions & 0 deletions charts/aws-ebs-csi-driver/templates/clusterrole-csi-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["volumeattachments"]
verbs: ["list"]
4 changes: 4 additions & 0 deletions charts/aws-ebs-csi-driver/templates/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ spec:
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
lifecycle:
preStop:
exec:
command: ["/bin/aws-ebs-csi-driver", "pre-stop-hook"]
- name: node-driver-registrar
image: {{ printf "%s%s:%s" (default "" .Values.image.containerRegistry) .Values.sidecars.nodeDriverRegistrar.image.repository .Values.sidecars.nodeDriverRegistrar.image.tag }}
imagePullPolicy: {{ default .Values.image.pullPolicy .Values.sidecars.nodeDriverRegistrar.image.pullPolicy }}
Expand Down
113 changes: 113 additions & 0 deletions cmd/hooks/prestop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package hooks

import (
"context"
"fmt"
"os"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

func PreStop(clientset kubernetes.Interface, timeout time.Duration) error {
klog.InfoS("PreStop: executing PreStop lifecycle hook")

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

nodeName := os.Getenv("CSI_NODE_NAME")
if nodeName == "" {
return fmt.Errorf("PreStop: CSI_NODE_NAME missing")
}

node, err := fetchNodeInfo(ctx, clientset, nodeName)
if err != nil {
return err
}

if isNodeBeingDrained(node) {
klog.InfoS("PreStop: node is being drained, checking for remaining VolumeAttachments", "node", nodeName)
return waitForVolumeAttachments(ctx, clientset, nodeName)
}

klog.InfoS("PreStop: node is not being drained, skipping VolumeAttachments check", "node", nodeName)
return nil
}

func fetchNodeInfo(ctx context.Context, clientset kubernetes.Interface, nodeName string) (*v1.Node, error) {
node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("fetchNodeInfo: failed to retrieve node information: %w", err)
}
return node, nil
}

func isNodeBeingDrained(node *v1.Node) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == v1.TaintNodeUnschedulable && taint.Effect == v1.TaintEffectNoSchedule {
return true
}
}
return false
}

func waitForVolumeAttachments(ctx context.Context, clientset kubernetes.Interface, nodeName string) error {
stopCh := make(chan struct{})
defer close(stopCh)

allAttachmentsDeleted := make(chan struct{})

factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Storage().V1().VolumeAttachments().Informer()

_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
klog.V(4).InfoS("DeleteFunc: VolumeAttachment deleted", "node", nodeName)
if err := checkVolumeAttachments(ctx, clientset, nodeName, allAttachmentsDeleted); err != nil {
klog.ErrorS(err, "DeleteFunc: error checking VolumeAttachments")
return
}
},
})

if err != nil {
return fmt.Errorf("failed to add event handler to VolumeAttachment informer: %w", err)
}

go informer.Run(stopCh)

if err := checkVolumeAttachments(ctx, clientset, nodeName, allAttachmentsDeleted); err != nil {
return err
}

select {
case <-allAttachmentsDeleted:
klog.InfoS("waitForVolumeAttachments: finished waiting for VolumeAttachments to be deleted. preStopHook completed")
return nil

case <-ctx.Done():
return fmt.Errorf("waitForVolumeAttachments: timed out waiting for preStopHook to complete: %w", ctx.Err())
}
}

func checkVolumeAttachments(ctx context.Context, clientset kubernetes.Interface, nodeName string, allAttachmentsDeleted chan struct{}) error {
allAttachments, err := clientset.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("checkVolumeAttachments: failed to list VolumeAttachments: %w", err)
}

for _, attachment := range allAttachments.Items {
if attachment.Spec.NodeName == nodeName {
klog.InfoS("isVolumeAttachmentEmpty: found VolumeAttachment", "attachment", attachment, "node", nodeName)
return nil
}
}

close(allAttachmentsDeleted)
return nil
}
Loading

0 comments on commit b5b3d7a

Please sign in to comment.