Skip to content

Commit

Permalink
Send 64+64+1 ULIDs to process will clog up the ConcurrentLister
Browse files Browse the repository at this point in the history
Signed-off-by: Miklós Földényi <[email protected]>
  • Loading branch information
mfoldenyi committed Dec 7, 2024
1 parent d0d93db commit cc86791
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"io"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -1211,3 +1213,80 @@ func Test_ParseRelabelConfig(t *testing.T) {
testutil.NotOk(t, err)
testutil.Equals(t, "unsupported relabel action: labelmap", err.Error())
}

func Test_ConcurrentLister_channel_deadlock(t *testing.T) {
lister := ConcurrentLister{bkt: InstrumentedBucketReaderMock{},
logger: nil,
}

outputChannel := make(chan ulid.ULID)
defer close(outputChannel)

timeout, _ := context.WithTimeout(context.Background(), time.Second*5)

_, err := lister.GetActiveAndPartialBlockIDs(timeout, outputChannel)

assert.NoError(t, err)
}

type InstrumentedBucketReaderMock struct{}

func (InstrumentedBucketReaderMock) Iter(ctx context.Context, dir string, f func(name string) error, options ...objstore.IterOption) error {
// Concurrency is 64 and the queue has capacity of 64
// Sending 64+64+1 ulids is enough
// 64 to terminate all 64 workers
// 64 more to fill the 64 capacity channel
// 1 extra for the channel writer to block on
for i := 1; i <= 129; i++ {
err := f(ULID(i).String())
if err != nil {
return err
}
}
return nil
}

func (InstrumentedBucketReaderMock) Exists(ctx context.Context, name string) (bool, error) {
// simulating an objstore error
return false, errors.New("Simulated")
}

func (InstrumentedBucketReaderMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
panic("not required")
}

func (InstrumentedBucketReaderMock) Close() error { panic("not required") }

func (InstrumentedBucketReaderMock) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) SupportedIterOptions() []objstore.IterOptionType {
panic("not required")
}

func (InstrumentedBucketReaderMock) Get(ctx context.Context, name string) (io.ReadCloser, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) IsObjNotFoundErr(err error) bool { panic("not required") }

func (InstrumentedBucketReaderMock) IsAccessDeniedErr(err error) bool { panic("not required") }

func (InstrumentedBucketReaderMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
panic("not required")
}

func (InstrumentedBucketReaderMock) Upload(ctx context.Context, name string, r io.Reader) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) Delete(ctx context.Context, name string) error {
panic("not required")
}

func (InstrumentedBucketReaderMock) Name() string { panic("not required") }

0 comments on commit cc86791

Please sign in to comment.