diff --git a/flytepropeller/pkg/controller/interfaces/rate_limiter.go b/flytepropeller/pkg/controller/interfaces/rate_limiter.go new file mode 100644 index 0000000000..576d9736c4 --- /dev/null +++ b/flytepropeller/pkg/controller/interfaces/rate_limiter.go @@ -0,0 +1,36 @@ +package interfaces + +import ( + "context" + "time" + + "golang.org/x/time/rate" +) + +//go:generate mockery-v2 --name Limiter --output ../mocks --case=snake --with-expecter +//go:generate mockery-v2 --name Reservation --output ../mocks --case=snake --with-expecter + +type Limiter interface { + Allow() bool + AllowN(t time.Time, n int) bool + Burst() int + Limit() rate.Limit + Reserve() Reservation + ReserveN(t time.Time, n int) Reservation + SetBurst(newBurst int) + SetBurstAt(t time.Time, newBurst int) + SetLimit(newLimit rate.Limit) + SetLimitAt(t time.Time, newLimit rate.Limit) + Tokens() float64 + TokensAt(t time.Time) float64 + Wait(ctx context.Context) (err error) + WaitN(ctx context.Context, n int) (err error) +} + +type Reservation interface { + Cancel() + CancelAt(t time.Time) + Delay() time.Duration + DelayFrom(t time.Time) time.Duration + OK() bool +} diff --git a/flytepropeller/pkg/controller/mocks/limiter.go b/flytepropeller/pkg/controller/mocks/limiter.go new file mode 100644 index 0000000000..709cdd4d65 --- /dev/null +++ b/flytepropeller/pkg/controller/mocks/limiter.go @@ -0,0 +1,637 @@ +// Code generated by mockery v2.40.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + interfaces "github.com/flyteorg/flyte/flytepropeller/pkg/controller/interfaces" + mock "github.com/stretchr/testify/mock" + + rate "golang.org/x/time/rate" + + time "time" +) + +// Limiter is an autogenerated mock type for the Limiter type +type Limiter struct { + mock.Mock +} + +type Limiter_Expecter struct { + mock *mock.Mock +} + +func (_m *Limiter) EXPECT() *Limiter_Expecter { + return &Limiter_Expecter{mock: &_m.Mock} +} + +// Allow provides a mock function with given fields: +func (_m *Limiter) Allow() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Allow") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Limiter_Allow_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Allow' +type Limiter_Allow_Call struct { + *mock.Call +} + +// Allow is a helper method to define mock.On call +func (_e *Limiter_Expecter) Allow() *Limiter_Allow_Call { + return &Limiter_Allow_Call{Call: _e.mock.On("Allow")} +} + +func (_c *Limiter_Allow_Call) Run(run func()) *Limiter_Allow_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Allow_Call) Return(_a0 bool) *Limiter_Allow_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Allow_Call) RunAndReturn(run func() bool) *Limiter_Allow_Call { + _c.Call.Return(run) + return _c +} + +// AllowN provides a mock function with given fields: t, n +func (_m *Limiter) AllowN(t time.Time, n int) bool { + ret := _m.Called(t, n) + + if len(ret) == 0 { + panic("no return value specified for AllowN") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(time.Time, int) bool); ok { + r0 = rf(t, n) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Limiter_AllowN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllowN' +type Limiter_AllowN_Call struct { + *mock.Call +} + +// AllowN is a helper method to define mock.On call +// - t time.Time +// - n int +func (_e *Limiter_Expecter) AllowN(t interface{}, n interface{}) *Limiter_AllowN_Call { + return &Limiter_AllowN_Call{Call: _e.mock.On("AllowN", t, n)} +} + +func (_c *Limiter_AllowN_Call) Run(run func(t time.Time, n int)) *Limiter_AllowN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_AllowN_Call) Return(_a0 bool) *Limiter_AllowN_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_AllowN_Call) RunAndReturn(run func(time.Time, int) bool) *Limiter_AllowN_Call { + _c.Call.Return(run) + return _c +} + +// Burst provides a mock function with given fields: +func (_m *Limiter) Burst() int { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Burst") + } + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// Limiter_Burst_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Burst' +type Limiter_Burst_Call struct { + *mock.Call +} + +// Burst is a helper method to define mock.On call +func (_e *Limiter_Expecter) Burst() *Limiter_Burst_Call { + return &Limiter_Burst_Call{Call: _e.mock.On("Burst")} +} + +func (_c *Limiter_Burst_Call) Run(run func()) *Limiter_Burst_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Burst_Call) Return(_a0 int) *Limiter_Burst_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Burst_Call) RunAndReturn(run func() int) *Limiter_Burst_Call { + _c.Call.Return(run) + return _c +} + +// Limit provides a mock function with given fields: +func (_m *Limiter) Limit() rate.Limit { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Limit") + } + + var r0 rate.Limit + if rf, ok := ret.Get(0).(func() rate.Limit); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(rate.Limit) + } + + return r0 +} + +// Limiter_Limit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Limit' +type Limiter_Limit_Call struct { + *mock.Call +} + +// Limit is a helper method to define mock.On call +func (_e *Limiter_Expecter) Limit() *Limiter_Limit_Call { + return &Limiter_Limit_Call{Call: _e.mock.On("Limit")} +} + +func (_c *Limiter_Limit_Call) Run(run func()) *Limiter_Limit_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Limit_Call) Return(_a0 rate.Limit) *Limiter_Limit_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Limit_Call) RunAndReturn(run func() rate.Limit) *Limiter_Limit_Call { + _c.Call.Return(run) + return _c +} + +// Reserve provides a mock function with given fields: +func (_m *Limiter) Reserve() interfaces.Reservation { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Reserve") + } + + var r0 interfaces.Reservation + if rf, ok := ret.Get(0).(func() interfaces.Reservation); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interfaces.Reservation) + } + } + + return r0 +} + +// Limiter_Reserve_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reserve' +type Limiter_Reserve_Call struct { + *mock.Call +} + +// Reserve is a helper method to define mock.On call +func (_e *Limiter_Expecter) Reserve() *Limiter_Reserve_Call { + return &Limiter_Reserve_Call{Call: _e.mock.On("Reserve")} +} + +func (_c *Limiter_Reserve_Call) Run(run func()) *Limiter_Reserve_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Reserve_Call) Return(_a0 interfaces.Reservation) *Limiter_Reserve_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Reserve_Call) RunAndReturn(run func() interfaces.Reservation) *Limiter_Reserve_Call { + _c.Call.Return(run) + return _c +} + +// ReserveN provides a mock function with given fields: t, n +func (_m *Limiter) ReserveN(t time.Time, n int) interfaces.Reservation { + ret := _m.Called(t, n) + + if len(ret) == 0 { + panic("no return value specified for ReserveN") + } + + var r0 interfaces.Reservation + if rf, ok := ret.Get(0).(func(time.Time, int) interfaces.Reservation); ok { + r0 = rf(t, n) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(interfaces.Reservation) + } + } + + return r0 +} + +// Limiter_ReserveN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReserveN' +type Limiter_ReserveN_Call struct { + *mock.Call +} + +// ReserveN is a helper method to define mock.On call +// - t time.Time +// - n int +func (_e *Limiter_Expecter) ReserveN(t interface{}, n interface{}) *Limiter_ReserveN_Call { + return &Limiter_ReserveN_Call{Call: _e.mock.On("ReserveN", t, n)} +} + +func (_c *Limiter_ReserveN_Call) Run(run func(t time.Time, n int)) *Limiter_ReserveN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_ReserveN_Call) Return(_a0 interfaces.Reservation) *Limiter_ReserveN_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_ReserveN_Call) RunAndReturn(run func(time.Time, int) interfaces.Reservation) *Limiter_ReserveN_Call { + _c.Call.Return(run) + return _c +} + +// SetBurst provides a mock function with given fields: newBurst +func (_m *Limiter) SetBurst(newBurst int) { + _m.Called(newBurst) +} + +// Limiter_SetBurst_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetBurst' +type Limiter_SetBurst_Call struct { + *mock.Call +} + +// SetBurst is a helper method to define mock.On call +// - newBurst int +func (_e *Limiter_Expecter) SetBurst(newBurst interface{}) *Limiter_SetBurst_Call { + return &Limiter_SetBurst_Call{Call: _e.mock.On("SetBurst", newBurst)} +} + +func (_c *Limiter_SetBurst_Call) Run(run func(newBurst int)) *Limiter_SetBurst_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int)) + }) + return _c +} + +func (_c *Limiter_SetBurst_Call) Return() *Limiter_SetBurst_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetBurst_Call) RunAndReturn(run func(int)) *Limiter_SetBurst_Call { + _c.Call.Return(run) + return _c +} + +// SetBurstAt provides a mock function with given fields: t, newBurst +func (_m *Limiter) SetBurstAt(t time.Time, newBurst int) { + _m.Called(t, newBurst) +} + +// Limiter_SetBurstAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetBurstAt' +type Limiter_SetBurstAt_Call struct { + *mock.Call +} + +// SetBurstAt is a helper method to define mock.On call +// - t time.Time +// - newBurst int +func (_e *Limiter_Expecter) SetBurstAt(t interface{}, newBurst interface{}) *Limiter_SetBurstAt_Call { + return &Limiter_SetBurstAt_Call{Call: _e.mock.On("SetBurstAt", t, newBurst)} +} + +func (_c *Limiter_SetBurstAt_Call) Run(run func(t time.Time, newBurst int)) *Limiter_SetBurstAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_SetBurstAt_Call) Return() *Limiter_SetBurstAt_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetBurstAt_Call) RunAndReturn(run func(time.Time, int)) *Limiter_SetBurstAt_Call { + _c.Call.Return(run) + return _c +} + +// SetLimit provides a mock function with given fields: newLimit +func (_m *Limiter) SetLimit(newLimit rate.Limit) { + _m.Called(newLimit) +} + +// Limiter_SetLimit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLimit' +type Limiter_SetLimit_Call struct { + *mock.Call +} + +// SetLimit is a helper method to define mock.On call +// - newLimit rate.Limit +func (_e *Limiter_Expecter) SetLimit(newLimit interface{}) *Limiter_SetLimit_Call { + return &Limiter_SetLimit_Call{Call: _e.mock.On("SetLimit", newLimit)} +} + +func (_c *Limiter_SetLimit_Call) Run(run func(newLimit rate.Limit)) *Limiter_SetLimit_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(rate.Limit)) + }) + return _c +} + +func (_c *Limiter_SetLimit_Call) Return() *Limiter_SetLimit_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetLimit_Call) RunAndReturn(run func(rate.Limit)) *Limiter_SetLimit_Call { + _c.Call.Return(run) + return _c +} + +// SetLimitAt provides a mock function with given fields: t, newLimit +func (_m *Limiter) SetLimitAt(t time.Time, newLimit rate.Limit) { + _m.Called(t, newLimit) +} + +// Limiter_SetLimitAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLimitAt' +type Limiter_SetLimitAt_Call struct { + *mock.Call +} + +// SetLimitAt is a helper method to define mock.On call +// - t time.Time +// - newLimit rate.Limit +func (_e *Limiter_Expecter) SetLimitAt(t interface{}, newLimit interface{}) *Limiter_SetLimitAt_Call { + return &Limiter_SetLimitAt_Call{Call: _e.mock.On("SetLimitAt", t, newLimit)} +} + +func (_c *Limiter_SetLimitAt_Call) Run(run func(t time.Time, newLimit rate.Limit)) *Limiter_SetLimitAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time), args[1].(rate.Limit)) + }) + return _c +} + +func (_c *Limiter_SetLimitAt_Call) Return() *Limiter_SetLimitAt_Call { + _c.Call.Return() + return _c +} + +func (_c *Limiter_SetLimitAt_Call) RunAndReturn(run func(time.Time, rate.Limit)) *Limiter_SetLimitAt_Call { + _c.Call.Return(run) + return _c +} + +// Tokens provides a mock function with given fields: +func (_m *Limiter) Tokens() float64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Tokens") + } + + var r0 float64 + if rf, ok := ret.Get(0).(func() float64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(float64) + } + + return r0 +} + +// Limiter_Tokens_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Tokens' +type Limiter_Tokens_Call struct { + *mock.Call +} + +// Tokens is a helper method to define mock.On call +func (_e *Limiter_Expecter) Tokens() *Limiter_Tokens_Call { + return &Limiter_Tokens_Call{Call: _e.mock.On("Tokens")} +} + +func (_c *Limiter_Tokens_Call) Run(run func()) *Limiter_Tokens_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Limiter_Tokens_Call) Return(_a0 float64) *Limiter_Tokens_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_Tokens_Call) RunAndReturn(run func() float64) *Limiter_Tokens_Call { + _c.Call.Return(run) + return _c +} + +// TokensAt provides a mock function with given fields: t +func (_m *Limiter) TokensAt(t time.Time) float64 { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for TokensAt") + } + + var r0 float64 + if rf, ok := ret.Get(0).(func(time.Time) float64); ok { + r0 = rf(t) + } else { + r0 = ret.Get(0).(float64) + } + + return r0 +} + +// Limiter_TokensAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TokensAt' +type Limiter_TokensAt_Call struct { + *mock.Call +} + +// TokensAt is a helper method to define mock.On call +// - t time.Time +func (_e *Limiter_Expecter) TokensAt(t interface{}) *Limiter_TokensAt_Call { + return &Limiter_TokensAt_Call{Call: _e.mock.On("TokensAt", t)} +} + +func (_c *Limiter_TokensAt_Call) Run(run func(t time.Time)) *Limiter_TokensAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time)) + }) + return _c +} + +func (_c *Limiter_TokensAt_Call) Return(_a0 float64) *Limiter_TokensAt_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Limiter_TokensAt_Call) RunAndReturn(run func(time.Time) float64) *Limiter_TokensAt_Call { + _c.Call.Return(run) + return _c +} + +// Wait provides a mock function with given fields: ctx +func (_m *Limiter) Wait(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Wait") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Limiter_Wait_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Wait' +type Limiter_Wait_Call struct { + *mock.Call +} + +// Wait is a helper method to define mock.On call +// - ctx context.Context +func (_e *Limiter_Expecter) Wait(ctx interface{}) *Limiter_Wait_Call { + return &Limiter_Wait_Call{Call: _e.mock.On("Wait", ctx)} +} + +func (_c *Limiter_Wait_Call) Run(run func(ctx context.Context)) *Limiter_Wait_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *Limiter_Wait_Call) Return(err error) *Limiter_Wait_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Limiter_Wait_Call) RunAndReturn(run func(context.Context) error) *Limiter_Wait_Call { + _c.Call.Return(run) + return _c +} + +// WaitN provides a mock function with given fields: ctx, n +func (_m *Limiter) WaitN(ctx context.Context, n int) error { + ret := _m.Called(ctx, n) + + if len(ret) == 0 { + panic("no return value specified for WaitN") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int) error); ok { + r0 = rf(ctx, n) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Limiter_WaitN_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WaitN' +type Limiter_WaitN_Call struct { + *mock.Call +} + +// WaitN is a helper method to define mock.On call +// - ctx context.Context +// - n int +func (_e *Limiter_Expecter) WaitN(ctx interface{}, n interface{}) *Limiter_WaitN_Call { + return &Limiter_WaitN_Call{Call: _e.mock.On("WaitN", ctx, n)} +} + +func (_c *Limiter_WaitN_Call) Run(run func(ctx context.Context, n int)) *Limiter_WaitN_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int)) + }) + return _c +} + +func (_c *Limiter_WaitN_Call) Return(err error) *Limiter_WaitN_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Limiter_WaitN_Call) RunAndReturn(run func(context.Context, int) error) *Limiter_WaitN_Call { + _c.Call.Return(run) + return _c +} + +// NewLimiter creates a new instance of Limiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewLimiter(t interface { + mock.TestingT + Cleanup(func()) +}) *Limiter { + mock := &Limiter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/flytepropeller/pkg/controller/mocks/reservation.go b/flytepropeller/pkg/controller/mocks/reservation.go new file mode 100644 index 0000000000..d609c0b034 --- /dev/null +++ b/flytepropeller/pkg/controller/mocks/reservation.go @@ -0,0 +1,237 @@ +// Code generated by mockery v2.40.3. DO NOT EDIT. + +package mocks + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// Reservation is an autogenerated mock type for the Reservation type +type Reservation struct { + mock.Mock +} + +type Reservation_Expecter struct { + mock *mock.Mock +} + +func (_m *Reservation) EXPECT() *Reservation_Expecter { + return &Reservation_Expecter{mock: &_m.Mock} +} + +// Cancel provides a mock function with given fields: +func (_m *Reservation) Cancel() { + _m.Called() +} + +// Reservation_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel' +type Reservation_Cancel_Call struct { + *mock.Call +} + +// Cancel is a helper method to define mock.On call +func (_e *Reservation_Expecter) Cancel() *Reservation_Cancel_Call { + return &Reservation_Cancel_Call{Call: _e.mock.On("Cancel")} +} + +func (_c *Reservation_Cancel_Call) Run(run func()) *Reservation_Cancel_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Reservation_Cancel_Call) Return() *Reservation_Cancel_Call { + _c.Call.Return() + return _c +} + +func (_c *Reservation_Cancel_Call) RunAndReturn(run func()) *Reservation_Cancel_Call { + _c.Call.Return(run) + return _c +} + +// CancelAt provides a mock function with given fields: t +func (_m *Reservation) CancelAt(t time.Time) { + _m.Called(t) +} + +// Reservation_CancelAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CancelAt' +type Reservation_CancelAt_Call struct { + *mock.Call +} + +// CancelAt is a helper method to define mock.On call +// - t time.Time +func (_e *Reservation_Expecter) CancelAt(t interface{}) *Reservation_CancelAt_Call { + return &Reservation_CancelAt_Call{Call: _e.mock.On("CancelAt", t)} +} + +func (_c *Reservation_CancelAt_Call) Run(run func(t time.Time)) *Reservation_CancelAt_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time)) + }) + return _c +} + +func (_c *Reservation_CancelAt_Call) Return() *Reservation_CancelAt_Call { + _c.Call.Return() + return _c +} + +func (_c *Reservation_CancelAt_Call) RunAndReturn(run func(time.Time)) *Reservation_CancelAt_Call { + _c.Call.Return(run) + return _c +} + +// Delay provides a mock function with given fields: +func (_m *Reservation) Delay() time.Duration { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Delay") + } + + var r0 time.Duration + if rf, ok := ret.Get(0).(func() time.Duration); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// Reservation_Delay_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delay' +type Reservation_Delay_Call struct { + *mock.Call +} + +// Delay is a helper method to define mock.On call +func (_e *Reservation_Expecter) Delay() *Reservation_Delay_Call { + return &Reservation_Delay_Call{Call: _e.mock.On("Delay")} +} + +func (_c *Reservation_Delay_Call) Run(run func()) *Reservation_Delay_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Reservation_Delay_Call) Return(_a0 time.Duration) *Reservation_Delay_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Reservation_Delay_Call) RunAndReturn(run func() time.Duration) *Reservation_Delay_Call { + _c.Call.Return(run) + return _c +} + +// DelayFrom provides a mock function with given fields: t +func (_m *Reservation) DelayFrom(t time.Time) time.Duration { + ret := _m.Called(t) + + if len(ret) == 0 { + panic("no return value specified for DelayFrom") + } + + var r0 time.Duration + if rf, ok := ret.Get(0).(func(time.Time) time.Duration); ok { + r0 = rf(t) + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// Reservation_DelayFrom_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DelayFrom' +type Reservation_DelayFrom_Call struct { + *mock.Call +} + +// DelayFrom is a helper method to define mock.On call +// - t time.Time +func (_e *Reservation_Expecter) DelayFrom(t interface{}) *Reservation_DelayFrom_Call { + return &Reservation_DelayFrom_Call{Call: _e.mock.On("DelayFrom", t)} +} + +func (_c *Reservation_DelayFrom_Call) Run(run func(t time.Time)) *Reservation_DelayFrom_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(time.Time)) + }) + return _c +} + +func (_c *Reservation_DelayFrom_Call) Return(_a0 time.Duration) *Reservation_DelayFrom_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Reservation_DelayFrom_Call) RunAndReturn(run func(time.Time) time.Duration) *Reservation_DelayFrom_Call { + _c.Call.Return(run) + return _c +} + +// OK provides a mock function with given fields: +func (_m *Reservation) OK() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for OK") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Reservation_OK_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OK' +type Reservation_OK_Call struct { + *mock.Call +} + +// OK is a helper method to define mock.On call +func (_e *Reservation_Expecter) OK() *Reservation_OK_Call { + return &Reservation_OK_Call{Call: _e.mock.On("OK")} +} + +func (_c *Reservation_OK_Call) Run(run func()) *Reservation_OK_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Reservation_OK_Call) Return(_a0 bool) *Reservation_OK_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Reservation_OK_Call) RunAndReturn(run func() bool) *Reservation_OK_Call { + _c.Call.Return(run) + return _c +} + +// NewReservation creates a new instance of Reservation. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReservation(t interface { + mock.TestingT + Cleanup(func()) +}) *Reservation { + mock := &Reservation{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/flytepropeller/pkg/controller/rate_limiter.go b/flytepropeller/pkg/controller/rate_limiter.go new file mode 100644 index 0000000000..100d6aa82c --- /dev/null +++ b/flytepropeller/pkg/controller/rate_limiter.go @@ -0,0 +1,116 @@ +package controller + +import ( + "context" + "sync" + "time" + + "golang.org/x/time/rate" + "k8s.io/client-go/util/workqueue" + + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/interfaces" +) + +// limiterAdapter adapts rate.NewLimiter to use the Reservation interface so that it can be used in unittests. +type limiterAdapter struct { + limiter *rate.Limiter +} + +func NewLimiter(r rate.Limit, b int) interfaces.Limiter { + return &limiterAdapter{rate.NewLimiter(r, b)} +} + +func (l *limiterAdapter) Allow() bool { + return l.limiter.Allow() +} + +func (l *limiterAdapter) AllowN(t time.Time, n int) bool { + return l.limiter.AllowN(t, n) +} + +func (l *limiterAdapter) Burst() int { + return l.limiter.Burst() +} + +func (l *limiterAdapter) Limit() rate.Limit { + return l.limiter.Limit() +} + +func (l *limiterAdapter) Reserve() interfaces.Reservation { + return l.limiter.Reserve() +} + +func (l *limiterAdapter) ReserveN(t time.Time, n int) interfaces.Reservation { + return l.limiter.ReserveN(t, n) +} +func (l *limiterAdapter) SetBurst(newBurst int) { + l.limiter.SetBurst(newBurst) +} + +func (l *limiterAdapter) SetBurstAt(t time.Time, newBurst int) { + l.limiter.SetBurstAt(t, newBurst) +} + +func (l *limiterAdapter) SetLimit(newLimit rate.Limit) { + l.limiter.SetLimit(newLimit) +} + +func (l *limiterAdapter) SetLimitAt(t time.Time, newLimit rate.Limit) { + l.limiter.SetLimitAt(t, newLimit) +} + +func (l *limiterAdapter) Tokens() float64 { + return l.limiter.Tokens() +} + +func (l *limiterAdapter) TokensAt(t time.Time) float64 { + return l.limiter.TokensAt(t) +} + +func (l *limiterAdapter) Wait(ctx context.Context) (err error) { + return l.limiter.Wait(ctx) +} + +func (l *limiterAdapter) WaitN(ctx context.Context, n int) (err error) { + return l.limiter.WaitN(ctx, n) +} + +// Similar to the standard BucketRateLimiter but dedupes items in order to avoid reserving token slots for the +// same item multiple times. Intened to be used with a DelayingQueue, which dedupes items on insertion. +type dedupingBucketRateLimiter struct { + Limiter interfaces.Limiter + mu sync.Mutex + reservations map[interface{}]interfaces.Reservation +} + +func NewDedupingBucketRateLimiter(limiter interfaces.Limiter) workqueue.RateLimiter { + return &dedupingBucketRateLimiter{ + Limiter: limiter, + reservations: make(map[interface{}]interfaces.Reservation), + } +} + +var _ workqueue.RateLimiter = &dedupingBucketRateLimiter{} + +func (r *dedupingBucketRateLimiter) When(item interface{}) time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + // Check if this item has an outstanding reservation. If so, use it to avoid a duplicate reservation. + if res, ok := r.reservations[item]; ok && res.Delay() > 0 { + return res.Delay() + } + r.reservations[item] = r.Limiter.Reserve() + return r.reservations[item].Delay() +} + +func (r *dedupingBucketRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +func (r *dedupingBucketRateLimiter) Forget(item interface{}) { + r.mu.Lock() + defer r.mu.Unlock() + if res, ok := r.reservations[item]; ok && res.Delay() <= 0 { + delete(r.reservations, item) + } +} diff --git a/flytepropeller/pkg/controller/rate_limiter_test.go b/flytepropeller/pkg/controller/rate_limiter_test.go new file mode 100644 index 0000000000..16e5bae417 --- /dev/null +++ b/flytepropeller/pkg/controller/rate_limiter_test.go @@ -0,0 +1,98 @@ +package controller + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/mocks" +) + +type rateLimiterTests struct { + suite.Suite + limiter *mocks.Limiter + deduping *dedupingBucketRateLimiter +} + +func TestDedupingBucketRateLimiter(t *testing.T) { + suite.Run(t, &rateLimiterTests{}) +} + +func (s *rateLimiterTests) SetupTest() { + s.limiter = mocks.NewLimiter(s.T()) + s.deduping = NewDedupingBucketRateLimiter(s.limiter).(*dedupingBucketRateLimiter) +} + +func (s *rateLimiterTests) TearDownTest() { + s.limiter.AssertExpectations(s.T()) +} + +func (s *rateLimiterTests) Test_When_NotFound() { + newReservation := mocks.NewReservation(s.T()) + defer newReservation.AssertExpectations(s.T()) + newReservation.EXPECT().Delay().Return(time.Minute).Once() + s.limiter.EXPECT().Reserve().Return(newReservation).Once() + + d := s.deduping.When("item1") + + assert.Equal(s.T(), newReservation, s.deduping.reservations["item1"]) + assert.Equal(s.T(), time.Minute, d) +} + +func (s *rateLimiterTests) Test_When_FoundPast() { + pastReservation := mocks.NewReservation(s.T()) + defer pastReservation.AssertExpectations(s.T()) + pastReservation.EXPECT().Delay().Return(-time.Minute).Once() + s.deduping.reservations["item1"] = pastReservation + newReservation := mocks.NewReservation(s.T()) + defer newReservation.AssertExpectations(s.T()) + newReservation.EXPECT().Delay().Return(time.Minute).Once() + s.limiter.EXPECT().Reserve().Return(newReservation).Once() + + d := s.deduping.When("item1") + + assert.Equal(s.T(), newReservation, s.deduping.reservations["item1"]) + assert.Equal(s.T(), time.Minute, d) +} + +func (s *rateLimiterTests) Test_When_FoundFuture() { + futureReservation := mocks.NewReservation(s.T()) + defer futureReservation.AssertExpectations(s.T()) + futureReservation.EXPECT().Delay().Return(time.Minute).Twice() + s.deduping.reservations["item1"] = futureReservation + + d := s.deduping.When("item1") + + assert.Equal(s.T(), futureReservation, s.deduping.reservations["item1"]) + assert.Equal(s.T(), time.Minute, d) +} + +func (s *rateLimiterTests) Test_Forget_NotFound() { + s.deduping.Forget("item1") + + assert.NotContains(s.T(), s.deduping.reservations, "item1") +} + +func (s *rateLimiterTests) Test_Forget_PastReservation() { + pastReservation := mocks.NewReservation(s.T()) + defer pastReservation.AssertExpectations(s.T()) + pastReservation.EXPECT().Delay().Return(-time.Minute).Once() + s.deduping.reservations["item1"] = pastReservation + + s.deduping.Forget("item1") + + assert.NotContains(s.T(), s.deduping.reservations, "item1") +} + +func (s *rateLimiterTests) Test_Forget_FutureReservation() { + futureReservation := mocks.NewReservation(s.T()) + defer futureReservation.AssertExpectations(s.T()) + futureReservation.EXPECT().Delay().Return(time.Minute).Once() + s.deduping.reservations["item1"] = futureReservation + + s.deduping.Forget("item1") + + assert.Equal(s.T(), futureReservation, s.deduping.reservations["item1"]) +} diff --git a/flytepropeller/pkg/controller/workqueue.go b/flytepropeller/pkg/controller/workqueue.go index 1d10cb5f2a..fcf97c3e1c 100644 --- a/flytepropeller/pkg/controller/workqueue.go +++ b/flytepropeller/pkg/controller/workqueue.go @@ -18,10 +18,8 @@ func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) case config.WorkqueueTypeBucketRateLimiter: logger.Infof(ctx, "Using Bucket Ratelimited Workqueue, Rate [%v] Capacity [%v]", cfg.Rate, cfg.Capacity) return workqueue.NewNamedRateLimitingQueue( - // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.BucketRateLimiter{ - Limiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity), - }, name), nil + NewDedupingBucketRateLimiter(NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity)), + name), nil case config.WorkqueueTypeExponentialFailureRateLimiter: logger.Infof(ctx, "Using Exponential failure backoff Ratelimited Workqueue, Base Delay [%v], max Delay [%v]", cfg.BaseDelay, cfg.MaxDelay) return workqueue.NewNamedRateLimitingQueue( @@ -31,9 +29,7 @@ func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) logger.Infof(ctx, "Using Max-of Ratelimited Workqueue, Bucket {Rate [%v] Capacity [%v]} | FailureBackoff {Base Delay [%v], max Delay [%v]}", cfg.Rate, cfg.Capacity, cfg.BaseDelay, cfg.MaxDelay) return workqueue.NewNamedRateLimitingQueue( workqueue.NewMaxOfRateLimiter( - &workqueue.BucketRateLimiter{ - Limiter: rate.NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity), - }, + NewDedupingBucketRateLimiter(NewLimiter(rate.Limit(cfg.Rate), cfg.Capacity)), workqueue.NewItemExponentialFailureRateLimiter(cfg.BaseDelay.Duration, cfg.MaxDelay.Duration), ), name), nil