Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2253] Support retry when bind volume failed case instead of… #890

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"strings"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -754,20 +755,73 @@
// convert nil to empty array
volumes.DynamicProvisions = make([]*v1.PersistentVolumeClaim, 0)
}
err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(), assumedPod, volumes)
if err != nil {
log.Log(log.ShimContext).Error("Failed to bind pod volumes",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)),
zap.Int("staticBindings", len(volumes.StaticBindings)))
return err
}
// Here we set the max retry count to 5, and use the default retry strategy
// Details:
// max sleep time is 1s, 2s, 4s, 8s and the last one will not sleep
return ctx.bindPodVolumesWithRetry(assumedPod, volumes, 5, &DefaultRetryStrategy{})

Check warning on line 761 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L761

Added line #L761 was not covered by tests
}
}
return nil
}

type RetryStrategy interface {
// Sleep function used for retry delays
Sleep(duration time.Duration)
}

// DefaultRetryStrategy is a simple retry strategy that sleeps for a fixed duration
// We can extend this to support more advanced retry strategies in the future and also for testing purposes
type DefaultRetryStrategy struct{}

func (r *DefaultRetryStrategy) Sleep(duration time.Duration) {
time.Sleep(duration)

Check warning on line 777 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L776-L777

Added lines #L776 - L777 were not covered by tests
}
Copy link
Contributor

@pbacsko pbacsko Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lot of extra code, not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pbacsko for review, i added this to expose to testing code to mock the sleep time interval, we don't have a the context mock class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the new API in latest PR, also removed the extra code.


func (ctx *Context) bindPodVolumesWithRetry(
assumedPod *v1.Pod,
volumes *volumebinding.PodVolumes,
maxRetries int,
retryStrategy RetryStrategy,
) error {
const baseDelay = time.Second
const maxDelay = 8 * time.Second

var err error
for i := 0; i < maxRetries; i++ {
err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(), assumedPod, volumes)
if err == nil {
return nil
}

log.Log(log.ShimContext).Error("Failed to bind pod volumes",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)),
zap.Int("staticBindings", len(volumes.StaticBindings)),
zap.Int("retryCount", i+1),
zap.Error(err))

if i == maxRetries-1 {
log.Log(log.ShimContext).Error("Failed to bind pod volumes after retry",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)),
zap.Int("staticBindings", len(volumes.StaticBindings)),
zap.Error(err))
return err
}

delay := baseDelay * time.Duration(1<<uint(i))
if delay > maxDelay {
delay = maxDelay

Check warning on line 816 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L816

Added line #L816 was not covered by tests
}

retryStrategy.Sleep(delay) // Use the retry strategy
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a retry logic in the K8s codebase that we can reuse:

import "k8s.io/client-go/util/retry"

backoff := wait.Backoff{
	Steps:    5,
	Duration: time.Second,
	Factor:   2.0,
	Jitter:   0,
}
err := retry.OnError(backoff, func(_ error) bool {
	return true // retry on all error
}, func() error {
	return ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(context.Background(), assumedPod, volumes)
})

There's retry.DefaultRetry and retry.DefaultBackoff but those don't look suitable for us. With no network delay this retries 5 times with a total wait time of 30 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was think about this, perhaps we're better off with a normal, non-exponential retry (steps: 5, factor: 1.0, Duration: 10*time.Second).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good wrapper, i will take a look to replace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think exponential retry is enough, because each retry with internal timeout for k8s itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the new API in latest PR.


return err

Check warning on line 822 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L822

Added line #L822 was not covered by tests
}

// assume a pod will be running on a node, in scheduler, we maintain
// a cache where stores info for each node what pods are supposed to
// be running on it. And we keep this cache in-sync between core and the shim.
Expand Down
108 changes: 108 additions & 0 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2433,3 +2433,111 @@ func assertListerPods(pods []*v1.Pod, count int) bool {
}
return count == counted
}

type MockRetryStrategy struct {
totalSleep time.Duration
}

func (m *MockRetryStrategy) Sleep(duration time.Duration) {
m.totalSleep += duration
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra code, not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to expose to testing code to mock the sleep time interval, we don't have a the context mock class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the new API in latest PR, also removed the extra code.


func TestBindPodVolumesWithRetry(t *testing.T) {
mockRetryStrategy := &MockRetryStrategy{}
mockVolumeBinder := test.NewVolumeBinderMock()
ctx, api := initContextAndAPIProviderForTest()
api.SetVolumeBinder(mockVolumeBinder)

pod := &v1.Pod{Spec: v1.PodSpec{NodeName: "test-node"}}
volumes := &volumebinding.PodVolumes{}

testCases := []struct {
name string
setupMock func()
maxRetries int
expectedErr bool
expectedCalls int
expectedSleeps time.Duration
}{
{
name: "success on first try",
setupMock: func() {
mockVolumeBinder.Reset()
mockRetryStrategy.totalSleep = 0
mockVolumeBinder.SetBindError("")
},
maxRetries: 3,
expectedErr: false,
expectedCalls: 1,
expectedSleeps: 0 * time.Second, // No sleep
},
{
name: "failure after max retries",
setupMock: func() {
mockVolumeBinder.Reset()
mockRetryStrategy.totalSleep = 0
mockVolumeBinder.SetBindError("bind error")
},
maxRetries: 3,
expectedErr: true,
expectedCalls: 3,
expectedSleeps: 3 * time.Second,
},
{
name: "failure after max retries",
setupMock: func() {
mockVolumeBinder.Reset()
mockRetryStrategy.totalSleep = 0
mockVolumeBinder.SetBindError("bind error")
},
maxRetries: 4,
expectedErr: true,
expectedCalls: 4,
expectedSleeps: 7 * time.Second,
},
{
name: "retry successfully for the second time",
setupMock: func() {
mockVolumeBinder.Reset()
mockRetryStrategy.totalSleep = 0
mockVolumeBinder.SetBindError("")
// Use a custom implementation for BindPodVolumes
mockVolumeBinder.SetBindPodVolumesFunc(func(pod *v1.Pod, volumes *volumebinding.PodVolumes) error {
mockVolumeBinder.IncrementBindCallCount()
if mockVolumeBinder.GetBindCallCount() == 2 {
return nil
} else {
return fmt.Errorf("bind error")
}
})
},
maxRetries: 5,
expectedErr: false,
expectedCalls: 2,
expectedSleeps: 1 * time.Second,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set up the mock behavior for this test case
tc.setupMock()

// Execute the function with retry logic
err := ctx.bindPodVolumesWithRetry(pod, volumes, tc.maxRetries, mockRetryStrategy)

// Assert the result
if tc.expectedErr {
assert.Error(t, err, "bind error")
} else {
assert.NilError(t, err)
}

// Assert the number of calls made to BindPodVolumes
assert.Equal(t, tc.expectedCalls, mockVolumeBinder.GetBindCallCount())

// Use custom assertion
assert.Equal(t, tc.expectedSleeps, mockRetryStrategy.totalSleep)
})
}
}
46 changes: 45 additions & 1 deletion pkg/common/test/volumebinder_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ type VolumeBinderMock struct {
podVolumeClaim *volumebinding.PodVolumeClaims
podVolumes *volumebinding.PodVolumes
allBound bool

bindCallCount int

// Function field to override BindPodVolumes behavior
bindPodVolumesFunc func(pod *v1.Pod, volumes *volumebinding.PodVolumes) error
}

// Set a custom implementation for BindPodVolumes
func (v *VolumeBinderMock) SetBindPodVolumesFunc(fn func(pod *v1.Pod, volumes *volumebinding.PodVolumes) error) {
v.bindPodVolumesFunc = fn
}

func (v *VolumeBinderMock) Reset() {
v.volumeClaimError = nil
v.findPodVolumesError = nil
v.assumeVolumeError = nil
v.bindError = nil
v.conflictReasons = nil
v.podVolumeClaim = nil
v.podVolumes = nil
v.allBound = true
v.bindCallCount = 0
}

func NewVolumeBinderMock() *VolumeBinderMock {
Expand Down Expand Up @@ -83,7 +105,13 @@ func (v *VolumeBinderMock) AssumePodVolumes(_ klog.Logger, _ *v1.Pod, _ string,
func (v *VolumeBinderMock) RevertAssumedPodVolumes(_ *volumebinding.PodVolumes) {
}

func (v *VolumeBinderMock) BindPodVolumes(_ context.Context, _ *v1.Pod, _ *volumebinding.PodVolumes) error {
// Default implementation of BindPodVolumes
func (v *VolumeBinderMock) BindPodVolumes(ctx context.Context, pod *v1.Pod, volumes *volumebinding.PodVolumes) error {
if v.bindPodVolumesFunc != nil {
return v.bindPodVolumesFunc(pod, volumes)
}
// Default behavior
v.bindCallCount++
return v.bindError
}

Expand All @@ -106,3 +134,19 @@ func (v *VolumeBinderMock) SetConflictReasons(reasons ...string) {
func (v *VolumeBinderMock) SetAssumePodVolumesError(message string) {
v.assumeVolumeError = errors.New(message)
}

func (v *VolumeBinderMock) GetBindCallCount() int {
return v.bindCallCount
}

func (v *VolumeBinderMock) IncrementBindCallCount() {
v.bindCallCount++
}

func (v *VolumeBinderMock) SetBindError(message string) {
if message == "" {
v.bindError = nil
return
}
v.bindError = errors.New(message)
}