diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index bb23ef9369..8218e577a8 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -3,6 +3,7 @@ package cache import ( "context" "fmt" + "runtime/debug" "sync" "time" @@ -290,9 +291,9 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } if err, isErr = rVal.(error); isErr { - err = fmt.Errorf("worker panic'd and is shutting down. Error: %w", err) + err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack())) } else { - err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v", rVal) + err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack())) } logger.Error(ctx, err) diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/auto_refresh_test.go index e798300f5d..5e1c49777e 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/auto_refresh_test.go @@ -64,6 +64,15 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error panic("This should never be called") } +type panickingSyncer struct { + callCount atomic.Int32 +} + +func (p *panickingSyncer) sync(_ context.Context, _ Batch) ([]ItemSyncResponse, error) { + p.callCount.Inc() + panic("testing") +} + func TestCacheFour(t *testing.T) { testResyncPeriod := 10 * time.Millisecond rateLimiter := workqueue.DefaultControllerRateLimiter() @@ -172,6 +181,34 @@ func TestCacheFour(t *testing.T) { cancel() }) + + t.Run("Test panic on sync and shutdown", func(t *testing.T) { + syncer := &panickingSyncer{} + cache, err := NewAutoRefreshCache("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope()) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, cache.Start(ctx)) + + itemID := "dummy_id" + _, err = cache.GetOrCreate(itemID, fakeCacheItem{ + val: 0, + }) + assert.NoError(t, err) + + // wait for all workers to run + assert.Eventually(t, func() bool { + return syncer.callCount.Load() == int32(10) + }, 5*time.Second, time.Millisecond) + + // wait some more time + time.Sleep(500 * time.Millisecond) + + // all workers should have shut down. + assert.Equal(t, int32(10), syncer.callCount.Load()) + + cancel() + }) } func TestQueueBuildUp(t *testing.T) {