Skip to content

Commit

Permalink
Remove preStop LCH context timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Jan 17, 2024
1 parent c1d5eff commit 670c683
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 73 deletions.
41 changes: 16 additions & 25 deletions cmd/hooks/prestop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
"time"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
Expand All @@ -25,36 +24,33 @@ all VolumeAttachment objects associated with that node are removed, thereby indi
No unnecessary delay is added to the termination workflow, as the PreStop hook logic is only executed when the node is being drained
(thus preventing delays in termination where the node pod is killed due to a rolling restart, or during driver upgrades, but the workload pods are expected to be running).
If the PreStop hook hangs during its execution, the driver node pod will be forcefully terminated after 32 seconds (30s terminationGracePeriodSeconds + 2 second grace period extension from Kubelet).
If the PreStop hook hangs during its execution, the driver node pod will be forcefully terminated after terminationGracePeriodSeconds, defined in the pod spec.
*/

func PreStop(clientset kubernetes.Interface, timeout time.Duration) error {
klog.InfoS("PreStop: executing PreStop lifecycle hook", "timeout", timeout)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
func PreStop(clientset kubernetes.Interface) error {
klog.InfoS("PreStop: executing PreStop lifecycle hook")

nodeName := os.Getenv("CSI_NODE_NAME")
if nodeName == "" {
return fmt.Errorf("PreStop: CSI_NODE_NAME missing")
}

node, err := fetchNode(ctx, clientset, nodeName)
node, err := fetchNode(clientset, nodeName)
if err != nil {
return err
}

if isNodeBeingDrained(node) {
klog.InfoS("PreStop: node is being drained, checking for remaining VolumeAttachments", "node", nodeName)
return waitForVolumeAttachments(ctx, clientset, nodeName)
return waitForVolumeAttachments(clientset, nodeName)
}

klog.InfoS("PreStop: node is not being drained, skipping VolumeAttachments check", "node", nodeName)
return nil
}

func fetchNode(ctx context.Context, clientset kubernetes.Interface, nodeName string) (*v1.Node, error) {
node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
func fetchNode(clientset kubernetes.Interface, nodeName string) (*v1.Node, error) {
node, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("fetchNode: failed to retrieve node information: %w", err)
}
Expand All @@ -70,7 +66,7 @@ func isNodeBeingDrained(node *v1.Node) bool {
return false
}

func waitForVolumeAttachments(ctx context.Context, clientset kubernetes.Interface, nodeName string) error {
func waitForVolumeAttachments(clientset kubernetes.Interface, nodeName string) error {
allAttachmentsDeleted := make(chan struct{})

factory := informers.NewSharedInformerFactory(clientset, 0)
Expand All @@ -81,7 +77,7 @@ func waitForVolumeAttachments(ctx context.Context, clientset kubernetes.Interfac
klog.V(5).InfoS("DeleteFunc: VolumeAttachment deleted", "node", nodeName)
va := obj.(*storagev1.VolumeAttachment)
if va.Spec.NodeName == nodeName {
if err := checkVolumeAttachments(ctx, clientset, nodeName, allAttachmentsDeleted); err != nil {
if err := checkVolumeAttachments(clientset, nodeName, allAttachmentsDeleted); err != nil {
klog.ErrorS(err, "DeleteFunc: error checking VolumeAttachments")
}
}
Expand All @@ -90,7 +86,7 @@ func waitForVolumeAttachments(ctx context.Context, clientset kubernetes.Interfac
klog.V(5).InfoS("UpdateFunc: VolumeAttachment updated", "node", nodeName)
va := newObj.(*storagev1.VolumeAttachment)
if va.Spec.NodeName == nodeName {
if err := checkVolumeAttachments(ctx, clientset, nodeName, allAttachmentsDeleted); err != nil {
if err := checkVolumeAttachments(clientset, nodeName, allAttachmentsDeleted); err != nil {
klog.ErrorS(err, "UpdateFunc: error checking VolumeAttachments")
}
}
Expand All @@ -102,22 +98,17 @@ func waitForVolumeAttachments(ctx context.Context, clientset kubernetes.Interfac

go informer.Run(allAttachmentsDeleted)

if err := checkVolumeAttachments(ctx, clientset, nodeName, allAttachmentsDeleted); err != nil {
if err := checkVolumeAttachments(clientset, nodeName, allAttachmentsDeleted); err != nil {
klog.ErrorS(err, "waitForVolumeAttachments: error checking VolumeAttachments")
}

select {
case <-allAttachmentsDeleted:
klog.InfoS("waitForVolumeAttachments: finished waiting for VolumeAttachments to be deleted. preStopHook completed")
return nil

case <-ctx.Done():
return fmt.Errorf("waitForVolumeAttachments: timed out waiting for preStopHook to complete: %w", ctx.Err())
}
<-allAttachmentsDeleted
klog.InfoS("waitForVolumeAttachments: finished waiting for VolumeAttachments to be deleted. preStopHook completed")
return nil
}

func checkVolumeAttachments(ctx context.Context, clientset kubernetes.Interface, nodeName string, allAttachmentsDeleted chan struct{}) error {
allAttachments, err := clientset.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{})
func checkVolumeAttachments(clientset kubernetes.Interface, nodeName string, allAttachmentsDeleted chan struct{}) error {
allAttachments, err := clientset.StorageV1().VolumeAttachments().List(context.Background(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("checkVolumeAttachments: failed to list VolumeAttachments: %w", err)
}
Expand Down
43 changes: 1 addition & 42 deletions cmd/hooks/prestop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package hooks
import (
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
Expand Down Expand Up @@ -153,46 +152,6 @@ func TestPreStopHook(t *testing.T) {
return nil
},
},
{
name: "TestPreStopHook: node is being drained, volume attachments remain -- timeout exceeded",
nodeName: "test-node",
expErr: fmt.Errorf("waitForVolumeAttachments: timed out waiting for preStopHook to complete: context deadline exceeded"),
mockFunc: func(nodeName string, mockClient *driver.MockKubernetesClient, mockCoreV1 *driver.MockCoreV1Interface, mockNode *driver.MockNodeInterface, mockVolumeAttachments *driver.MockVolumeAttachmentInterface, mockStorageV1Interface *driver.MockStorageV1Interface) error {

fakeNode := &v1.Node{
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: v1.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
},
},
},
}

fakeVolumeAttachments := &storagev1.VolumeAttachmentList{
Items: []storagev1.VolumeAttachment{
{
Spec: storagev1.VolumeAttachmentSpec{
NodeName: "test-node",
},
},
},
}

mockClient.EXPECT().CoreV1().Return(mockCoreV1).AnyTimes()
mockClient.EXPECT().StorageV1().Return(mockStorageV1Interface).AnyTimes()

mockCoreV1.EXPECT().Nodes().Return(mockNode).AnyTimes()
mockNode.EXPECT().Get(gomock.Any(), gomock.Eq(nodeName), gomock.Any()).Return(fakeNode, nil).AnyTimes()

mockStorageV1Interface.EXPECT().VolumeAttachments().Return(mockVolumeAttachments).AnyTimes()
mockVolumeAttachments.EXPECT().List(gomock.Any(), gomock.Any()).Return(fakeVolumeAttachments, nil).AnyTimes()
mockVolumeAttachments.EXPECT().Watch(gomock.Any(), gomock.Any()).Return(watch.NewFake(), nil).AnyTimes()

return nil
},
},
{
name: "TestPreStopHook: Node is drained before timeout",
nodeName: "test-node",
Expand Down Expand Up @@ -274,7 +233,7 @@ func TestPreStopHook(t *testing.T) {
t.Setenv("CSI_NODE_NAME", tc.nodeName)
}

err := PreStop(mockClient, 5*time.Second)
err := PreStop(mockClient)

if tc.expErr != nil {
assert.Error(t, err)
Expand Down
7 changes: 1 addition & 6 deletions cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"os"
"strings"
"time"

flag "github.com/spf13/pflag"

Expand All @@ -34,10 +33,6 @@ import (
"k8s.io/klog/v2"
)

const (
preStopTimeout = 30 * time.Second
)

// Options is the combined set of options for all operating modes.
type Options struct {
DriverMode driver.Mode
Expand Down Expand Up @@ -107,7 +102,7 @@ func GetOptions(fs *flag.FlagSet) *Options {
if clientErr != nil {
klog.ErrorS(err, "unable to communicate with k8s API")
} else {
err = hooks.PreStop(clientset, preStopTimeout)
err = hooks.PreStop(clientset)
if err != nil {
klog.ErrorS(err, "failed to execute PreStop lifecycle hook")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
Expand Down

0 comments on commit 670c683

Please sign in to comment.