Skip to content

Commit

Permalink
Log stack trace when refresh cache sync recovers from panic (flyteorg…
Browse files Browse the repository at this point in the history
…#5623)

Signed-off-by: Vladyslav Libov <[email protected]>
  • Loading branch information
Sovietaced authored and VladyslavLibov committed Aug 16, 2024
1 parent 12a228c commit 1c8b060
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
5 changes: 3 additions & 2 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -290,9 +291,9 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
}

if err, isErr = rVal.(error); isErr {
err = fmt.Errorf("worker panic'd and is shutting down. Error: %w", err)
err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack()))
} else {
err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v", rVal)
err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack()))
}

logger.Error(ctx, err)
Expand Down
37 changes: 37 additions & 0 deletions flytestdlib/cache/auto_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ func syncTerminalItem(_ context.Context, batch Batch) ([]ItemSyncResponse, error
panic("This should never be called")
}

type panickingSyncer struct {
callCount atomic.Int32
}

func (p *panickingSyncer) sync(_ context.Context, _ Batch) ([]ItemSyncResponse, error) {
p.callCount.Inc()
panic("testing")
}

func TestCacheFour(t *testing.T) {
testResyncPeriod := 10 * time.Millisecond
rateLimiter := workqueue.DefaultControllerRateLimiter()
Expand Down Expand Up @@ -172,6 +181,34 @@ func TestCacheFour(t *testing.T) {

cancel()
})

t.Run("Test panic on sync and shutdown", func(t *testing.T) {
syncer := &panickingSyncer{}
cache, err := NewAutoRefreshCache("fake3", syncer.sync, rateLimiter, testResyncPeriod, 10, 2, promutils.NewTestScope())
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, cache.Start(ctx))

itemID := "dummy_id"
_, err = cache.GetOrCreate(itemID, fakeCacheItem{
val: 0,
})
assert.NoError(t, err)

// wait for all workers to run
assert.Eventually(t, func() bool {
return syncer.callCount.Load() == int32(10)
}, 5*time.Second, time.Millisecond)

// wait some more time
time.Sleep(500 * time.Millisecond)

// all workers should have shut down.
assert.Equal(t, int32(10), syncer.callCount.Load())

cancel()
})
}

func TestQueueBuildUp(t *testing.T) {
Expand Down

0 comments on commit 1c8b060

Please sign in to comment.