From 3ff62583d40d21ed8cfa889b490b9c926a93cbba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikl=C3=B3s=20F=C3=B6ld=C3=A9nyi?= Date: Sat, 7 Dec 2024 17:02:31 +0100 Subject: [PATCH] Send 64+64+1 ULIDs to process will clog up the ConcurrentLister MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miklós Földényi --- pkg/block/fetcher_test.go | 80 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 5e24e26538..c7a06e18b7 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -8,6 +8,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/stretchr/testify/assert" + "io" "math/rand" "os" "path" @@ -1211,3 +1213,81 @@ func Test_ParseRelabelConfig(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "unsupported relabel action: labelmap", err.Error()) } + +func Test_ConcurrentLister_channel_deadlock(t *testing.T) { + lister := ConcurrentLister{ + bkt: InstrumentedBucketReaderMock{}, + logger: nil, + } + + outputChannel := make(chan ulid.ULID) + defer close(outputChannel) + + timeout, _ := context.WithTimeout(context.Background(), time.Second*5) + + _, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel) + + assert.NoError(t, err) +} + +type InstrumentedBucketReaderMock struct{} + +func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error { + // Concurrency is 64 and the queue has capacity of 64 + // Sending 64+64+1 ulids is enough + // 64 to terminate all 64 workers + // 64 more to fill the 64 capacity channel + // 1 extra for the channel writer to block on + for i := 1; i <= 129; i++ { + err := f(ULID(i).String()) + if err != nil { + return err + } + } + return nil +} + +func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) { + // simulating an objstore error + return false, errors.New("Simulated") +} + +func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Close() error { panic("not required") } + +func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") } + +func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error { + panic("not required") +} + +func (InstrumentedBucketReaderMock) Name() string { panic("not required") }