Skip to content

Commit

Permalink
fix: TestJoinerRedundancy failing flaky test (ethersphere#4726)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinconic authored Jul 19, 2024
1 parent bc3cdba commit 0099216
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
35 changes: 25 additions & 10 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func TestEncryptDecrypt(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pipe := builder.NewPipelineBuilder(ctx, store, true, 0)
testDataReader := bytes.NewReader(testData)
resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader)
Expand Down Expand Up @@ -336,7 +337,8 @@ func TestSeek(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

store := inmemchunkstore.New()
testutil.CleanupCloser(t, store)
Expand Down Expand Up @@ -613,7 +615,8 @@ func TestPrefetch(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

store := inmemchunkstore.New()
testutil.CleanupCloser(t, store)
Expand Down Expand Up @@ -917,7 +920,8 @@ func TestJoinerIterateChunkAddresses_Encrypted(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pipe := builder.NewPipelineBuilder(ctx, store, true, 0)
testDataReader := bytes.NewReader(testData)
resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader)
Expand Down Expand Up @@ -963,6 +967,7 @@ type mockPutter struct {
storage.ChunkStore
shards, parities chan swarm.Chunk
done chan struct{}
mu sync.Mutex
}

func newMockPutter(store storage.ChunkStore, shardCnt, parityCnt int) *mockPutter {
Expand All @@ -975,6 +980,8 @@ func newMockPutter(store storage.ChunkStore, shardCnt, parityCnt int) *mockPutte
}

func (m *mockPutter) Put(ctx context.Context, ch swarm.Chunk) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.shards) < cap(m.shards) {
m.shards <- ch
return nil
Expand All @@ -983,7 +990,7 @@ func (m *mockPutter) Put(ctx context.Context, ch swarm.Chunk) error {
m.parities <- ch
return nil
}
err := m.ChunkStore.Put(context.Background(), ch)
err := m.ChunkStore.Put(ctx, ch) // use passed context
select {
case m.done <- struct{}{}:
default:
Expand All @@ -996,12 +1003,16 @@ func (m *mockPutter) wait(ctx context.Context) {
case <-m.done:
case <-ctx.Done():
}
m.mu.Lock()
close(m.parities)
close(m.shards)
m.mu.Unlock()
}

func (m *mockPutter) store(cnt int) error {
n := 0
m.mu.Lock()
defer m.mu.Unlock()
for ch := range m.parities {
if err := m.ChunkStore.Put(context.Background(), ch); err != nil {
return err
Expand All @@ -1024,7 +1035,7 @@ func (m *mockPutter) store(cnt int) error {
}

// nolint:thelper
func TestJoinerRedundancy_FLAKY(t *testing.T) {
func TestJoinerRedundancy(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
rLevel redundancy.Level
Expand Down Expand Up @@ -1111,7 +1122,8 @@ func TestJoinerRedundancy_FLAKY(t *testing.T) {
}
// all data can be read back
readCheck := func(t *testing.T, expErr error) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

decodeTimeoutStr := time.Second.String()
fallback := true
Expand All @@ -1133,6 +1145,7 @@ func TestJoinerRedundancy_FLAKY(t *testing.T) {
}
i := 0
eg, ectx := errgroup.WithContext(ctx)

scnt:
for ; i < shardCnt; i++ {
select {
Expand Down Expand Up @@ -1235,7 +1248,8 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
t.Fatal(err)
}
dataReader := pseudorand.NewReader(seed, size*swarm.ChunkSize)
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// ctx = redundancy.SetLevelInContext(ctx, rLevel)
ctx = redundancy.SetLevelInContext(ctx, redundancy.NONE)
pipe := builder.NewPipelineBuilder(ctx, store, encrypt, rLevel)
Expand All @@ -1247,9 +1261,10 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
buf := make([]byte, expRead)
offset := mrand.Intn(size) * expRead
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

decodingTimeoutStr := (200 * time.Millisecond).String()
decodingTimeoutStr := time.Second.String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop)
if err != nil {
Expand Down
17 changes: 14 additions & 3 deletions pkg/storer/mock/forgetting.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ func (d *DelayedStore) Delay(addr swarm.Address, delay time.Duration) {

func (d *DelayedStore) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) {
d.mu.Lock()
defer d.mu.Unlock()
if delay, ok := d.cache[addr.String()]; ok && delay > 0 {
delay, ok := d.cache[addr.String()]
if ok && delay > 0 {
delete(d.cache, addr.String())
d.mu.Unlock()
select {
case <-time.After(delay):
delete(d.cache, addr.String())
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
d.mu.Unlock()
}
return d.ChunkStore.Get(ctx, addr)
}
Expand All @@ -64,10 +67,14 @@ func (f *ForgettingStore) Stored() int64 {
}

func (f *ForgettingStore) Record() {
f.mu.Lock()
defer f.mu.Unlock()
f.record.Store(true)
}

func (f *ForgettingStore) Unrecord() {
f.mu.Lock()
defer f.mu.Unlock()
f.record.Store(false)
}

Expand Down Expand Up @@ -97,10 +104,14 @@ func (f *ForgettingStore) isMiss(addr swarm.Address) bool {
}

func (f *ForgettingStore) Reset() {
f.mu.Lock()
defer f.mu.Unlock()
f.missed = make(map[string]struct{})
}

func (f *ForgettingStore) Missed() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.missed)
}

Expand Down

0 comments on commit 0099216

Please sign in to comment.