diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index f2257813a7..58490669f4 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -252,7 +252,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch - w.workqueue.Add(&b) + w.workqueue.AddRateLimited(&b) } return nil diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/auto_refresh_test.go index 4535e8c465..7707b593ff 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/auto_refresh_test.go @@ -19,11 +19,12 @@ import ( const fakeCacheItemValueLimit = 10 type fakeCacheItem struct { - val int + val int + isTerminal bool } func (f fakeCacheItem) IsTerminal() bool { - return false + return f.isTerminal } type terminalCacheItem struct { @@ -42,11 +43,15 @@ func syncFakeItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error) { // After the item has gone through ten update cycles, leave it unchanged continue } - + isTerminal := false + if item.val == fakeCacheItemValueLimit-1 { + isTerminal = true + } items = append(items, ItemSyncResponse{ ID: obj.GetID(), Item: fakeCacheItem{ - val: item.val + 1, + val: item.val + 1, + isTerminal: isTerminal, }, Action: Update, }) @@ -60,7 +65,7 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error } func TestCacheFour(t *testing.T) { - testResyncPeriod := time.Millisecond + testResyncPeriod := 10 * time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() t.Run("normal operation", func(t *testing.T) {