diff --git a/pkg/cache/context.go b/pkg/cache/context.go index 6b7919237..c46880930 100644 --- a/pkg/cache/context.go +++ b/pkg/cache/context.go @@ -29,13 +29,16 @@ import ( "strings" "sync" "sync/atomic" + "time" "go.uber.org/zap" v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" @@ -754,13 +757,15 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error { // convert nil to empty array volumes.DynamicProvisions = make([]*v1.PersistentVolumeClaim, 0) } - err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(), assumedPod, volumes) - if err != nil { - log.Log(log.ShimContext).Error("Failed to bind pod volumes", + + if err = ctx.bindPodVolumesWithRetry(assumedPod, volumes, 5, time.Second); err != nil { + // log failed after 5 retries + log.Log(log.ShimContext).Error("Failed to bind pod volumes after max 5 retries", zap.String("podName", assumedPod.Name), zap.String("nodeName", assumedPod.Spec.NodeName), zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)), - zap.Int("staticBindings", len(volumes.StaticBindings))) + zap.Int("staticBindings", len(volumes.StaticBindings)), + zap.Error(err)) return err } } @@ -768,6 +773,37 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error { return nil } +func (ctx *Context) bindPodVolumesWithRetry( + assumedPod *v1.Pod, + volumes *volumebinding.PodVolumes, + maxRetries int, + duration time.Duration, +) error { + // Here we set the max retry count to 5, and use the default retry strategy + // Details: + // max sleep time is 1s, 2s, 4s, 8s and the last one will not sleep + + backoff := wait.Backoff{ + Steps: maxRetries, + Duration: duration, + Factor: 2.0, + Jitter: 0, + } + err := retry.OnError(backoff, func(_ error) bool { + return true // retry on all error + }, func() error { + // log some debug info when retrying + log.Log(log.ShimContext).Debug("Retrying to bind pod volumes", + zap.String("podName", assumedPod.Name), + zap.String("nodeName", assumedPod.Spec.NodeName), + zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)), + zap.Int("staticBindings", len(volumes.StaticBindings))) + // call volume binder to bind pod volumes + return ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(), assumedPod, volumes) + }) + return err +} + // assume a pod will be running on a node, in scheduler, we maintain // a cache where stores info for each node what pods are supposed to // be running on it. And we keep this cache in-sync between core and the shim. diff --git a/pkg/cache/context_test.go b/pkg/cache/context_test.go index 6d7bfafa4..1394af30c 100644 --- a/pkg/cache/context_test.go +++ b/pkg/cache/context_test.go @@ -2433,3 +2433,82 @@ func assertListerPods(pods []*v1.Pod, count int) bool { } return count == counted } + +func TestBindPodVolumesWithRetry(t *testing.T) { + mockVolumeBinder := test.NewVolumeBinderMock() + ctx, api := initContextAndAPIProviderForTest() + api.SetVolumeBinder(mockVolumeBinder) + + pod := &v1.Pod{Spec: v1.PodSpec{NodeName: "test-node"}} + volumes := &volumebinding.PodVolumes{} + + testCases := []struct { + name string + setupMock func() + maxRetries int + expectedErr bool + expectedCalls int + expectedSleeps time.Duration + }{ + { + name: "success on first try", + setupMock: func() { + mockVolumeBinder.Reset() + mockVolumeBinder.SetBindError("") + }, + maxRetries: 3, + expectedErr: false, + expectedCalls: 1, + }, + { + name: "failure after max retries", + setupMock: func() { + mockVolumeBinder.Reset() + mockVolumeBinder.SetBindError("bind error") + }, + maxRetries: 3, + expectedErr: true, + expectedCalls: 3, + }, + { + name: "retry successfully for the second time", + setupMock: func() { + mockVolumeBinder.Reset() + mockVolumeBinder.SetBindError("") + // Use a custom implementation for BindPodVolumes + mockVolumeBinder.SetBindPodVolumesFunc(func(pod *v1.Pod, volumes *volumebinding.PodVolumes) error { + mockVolumeBinder.IncrementBindCallCount() + if mockVolumeBinder.GetBindCallCount() == 2 { + return nil + } else { + return fmt.Errorf("bind error") + } + }) + }, + maxRetries: 5, + expectedErr: false, + expectedCalls: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Set up the mock behavior for this test case + tc.setupMock() + + // Execute the function with retry logic + // make the mock interval shorter for testing + err := ctx.bindPodVolumesWithRetry(pod, volumes, tc.maxRetries, 1*time.Millisecond) + + // Assert the result + if tc.expectedErr { + assert.Error(t, err, "bind error") + } else { + assert.NilError(t, err) + } + + // Assert the number of calls made to BindPodVolumes + assert.Equal(t, tc.expectedCalls, mockVolumeBinder.GetBindCallCount()) + }) + } +} diff --git a/pkg/common/test/volumebinder_mock.go b/pkg/common/test/volumebinder_mock.go index 3bc382768..565f1230c 100644 --- a/pkg/common/test/volumebinder_mock.go +++ b/pkg/common/test/volumebinder_mock.go @@ -40,6 +40,28 @@ type VolumeBinderMock struct { podVolumeClaim *volumebinding.PodVolumeClaims podVolumes *volumebinding.PodVolumes allBound bool + + bindCallCount int + + // Function field to override BindPodVolumes behavior + bindPodVolumesFunc func(pod *v1.Pod, volumes *volumebinding.PodVolumes) error +} + +// Set a custom implementation for BindPodVolumes +func (v *VolumeBinderMock) SetBindPodVolumesFunc(fn func(pod *v1.Pod, volumes *volumebinding.PodVolumes) error) { + v.bindPodVolumesFunc = fn +} + +func (v *VolumeBinderMock) Reset() { + v.volumeClaimError = nil + v.findPodVolumesError = nil + v.assumeVolumeError = nil + v.bindError = nil + v.conflictReasons = nil + v.podVolumeClaim = nil + v.podVolumes = nil + v.allBound = true + v.bindCallCount = 0 } func NewVolumeBinderMock() *VolumeBinderMock { @@ -83,7 +105,13 @@ func (v *VolumeBinderMock) AssumePodVolumes(_ klog.Logger, _ *v1.Pod, _ string, func (v *VolumeBinderMock) RevertAssumedPodVolumes(_ *volumebinding.PodVolumes) { } -func (v *VolumeBinderMock) BindPodVolumes(_ context.Context, _ *v1.Pod, _ *volumebinding.PodVolumes) error { +// Default implementation of BindPodVolumes +func (v *VolumeBinderMock) BindPodVolumes(ctx context.Context, pod *v1.Pod, volumes *volumebinding.PodVolumes) error { + if v.bindPodVolumesFunc != nil { + return v.bindPodVolumesFunc(pod, volumes) + } + // Default behavior + v.bindCallCount++ return v.bindError } @@ -106,3 +134,19 @@ func (v *VolumeBinderMock) SetConflictReasons(reasons ...string) { func (v *VolumeBinderMock) SetAssumePodVolumesError(message string) { v.assumeVolumeError = errors.New(message) } + +func (v *VolumeBinderMock) GetBindCallCount() int { + return v.bindCallCount +} + +func (v *VolumeBinderMock) IncrementBindCallCount() { + v.bindCallCount++ +} + +func (v *VolumeBinderMock) SetBindError(message string) { + if message == "" { + v.bindError = nil + return + } + v.bindError = errors.New(message) +}