diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index 40858d8d278..24d7c4e0999 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -254,7 +254,8 @@ func TestEncryptDecrypt(t *testing.T) { if err != nil { t.Fatal(err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() pipe := builder.NewPipelineBuilder(ctx, store, true, 0) testDataReader := bytes.NewReader(testData) resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader) @@ -336,7 +337,8 @@ func TestSeek(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() store := inmemchunkstore.New() testutil.CleanupCloser(t, store) @@ -613,7 +615,8 @@ func TestPrefetch(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() store := inmemchunkstore.New() testutil.CleanupCloser(t, store) @@ -917,7 +920,8 @@ func TestJoinerIterateChunkAddresses_Encrypted(t *testing.T) { if err != nil { t.Fatal(err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() pipe := builder.NewPipelineBuilder(ctx, store, true, 0) testDataReader := bytes.NewReader(testData) resultAddress, err := builder.FeedPipeline(ctx, pipe, testDataReader) @@ -963,6 +967,7 @@ type mockPutter struct { storage.ChunkStore shards, parities chan swarm.Chunk done chan struct{} + mu sync.Mutex } func newMockPutter(store storage.ChunkStore, shardCnt, parityCnt int) *mockPutter { @@ -975,6 +980,8 @@ func newMockPutter(store storage.ChunkStore, shardCnt, parityCnt int) *mockPutte } func (m *mockPutter) Put(ctx context.Context, ch swarm.Chunk) error { + m.mu.Lock() + defer m.mu.Unlock() if len(m.shards) < cap(m.shards) { m.shards <- ch return nil @@ -983,7 +990,7 @@ func (m *mockPutter) Put(ctx context.Context, ch swarm.Chunk) error { m.parities <- ch return nil } - err := m.ChunkStore.Put(context.Background(), ch) + err := m.ChunkStore.Put(ctx, ch) // use passed context select { case m.done <- struct{}{}: default: @@ -996,12 +1003,16 @@ func (m *mockPutter) wait(ctx context.Context) { case <-m.done: case <-ctx.Done(): } + m.mu.Lock() close(m.parities) close(m.shards) + m.mu.Unlock() } func (m *mockPutter) store(cnt int) error { n := 0 + m.mu.Lock() + defer m.mu.Unlock() for ch := range m.parities { if err := m.ChunkStore.Put(context.Background(), ch); err != nil { return err @@ -1024,7 +1035,7 @@ func (m *mockPutter) store(cnt int) error { } // nolint:thelper -func TestJoinerRedundancy_FLAKY(t *testing.T) { +func TestJoinerRedundancy(t *testing.T) { t.Parallel() for _, tc := range []struct { rLevel redundancy.Level @@ -1111,7 +1122,8 @@ func TestJoinerRedundancy_FLAKY(t *testing.T) { } // all data can be read back readCheck := func(t *testing.T, expErr error) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() decodeTimeoutStr := time.Second.String() fallback := true @@ -1133,6 +1145,7 @@ func TestJoinerRedundancy_FLAKY(t *testing.T) { } i := 0 eg, ectx := errgroup.WithContext(ctx) + scnt: for ; i < shardCnt; i++ { select { @@ -1235,7 +1248,8 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { t.Fatal(err) } dataReader := pseudorand.NewReader(seed, size*swarm.ChunkSize) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // ctx = redundancy.SetLevelInContext(ctx, rLevel) ctx = redundancy.SetLevelInContext(ctx, redundancy.NONE) pipe := builder.NewPipelineBuilder(ctx, store, encrypt, rLevel) @@ -1247,9 +1261,10 @@ func TestJoinerRedundancyMultilevel(t *testing.T) { buf := make([]byte, expRead) offset := mrand.Intn(size) * expRead canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, canRead bool) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - decodingTimeoutStr := (200 * time.Millisecond).String() + decodingTimeoutStr := time.Second.String() ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, log.Noop) if err != nil { diff --git a/pkg/storer/mock/forgetting.go b/pkg/storer/mock/forgetting.go index e91560c60fb..22965ce9f38 100644 --- a/pkg/storer/mock/forgetting.go +++ b/pkg/storer/mock/forgetting.go @@ -35,14 +35,17 @@ func (d *DelayedStore) Delay(addr swarm.Address, delay time.Duration) { func (d *DelayedStore) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) { d.mu.Lock() - defer d.mu.Unlock() - if delay, ok := d.cache[addr.String()]; ok && delay > 0 { + delay, ok := d.cache[addr.String()] + if ok && delay > 0 { + delete(d.cache, addr.String()) + d.mu.Unlock() select { case <-time.After(delay): - delete(d.cache, addr.String()) case <-ctx.Done(): return nil, ctx.Err() } + } else { + d.mu.Unlock() } return d.ChunkStore.Get(ctx, addr) } @@ -64,10 +67,14 @@ func (f *ForgettingStore) Stored() int64 { } func (f *ForgettingStore) Record() { + f.mu.Lock() + defer f.mu.Unlock() f.record.Store(true) } func (f *ForgettingStore) Unrecord() { + f.mu.Lock() + defer f.mu.Unlock() f.record.Store(false) } @@ -97,10 +104,14 @@ func (f *ForgettingStore) isMiss(addr swarm.Address) bool { } func (f *ForgettingStore) Reset() { + f.mu.Lock() + defer f.mu.Unlock() f.missed = make(map[string]struct{}) } func (f *ForgettingStore) Missed() int { + f.mu.Lock() + defer f.mu.Unlock() return len(f.missed) }