diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index ce00ac8d782..15eddc23e3b 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -1405,7 +1405,7 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error { return nil } -func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error { +func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error { c.mu.Lock() defer c.mu.Unlock() c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) diff --git a/pkg/storage/chunkstore.go b/pkg/storage/chunkstore.go index 72f8b9ba784..68a9d10652a 100644 --- a/pkg/storage/chunkstore.go +++ b/pkg/storage/chunkstore.go @@ -42,7 +42,7 @@ type Hasser interface { // Replacer is the interface that wraps the basic Replace method. type Replacer interface { // Replace a chunk in the store. - Replace(context.Context, swarm.Chunk) error + Replace(context.Context, swarm.Chunk, bool) error } // PutterFunc type is an adapter to allow the use of diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index 7d49e63c279..3ec2b8e8a6d 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -77,13 +77,17 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { return nil } -func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error { +func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error { c.mu.Lock() defer c.mu.Unlock() chunkCount := c.chunks[ch.Address().ByteString()] chunkCount.chunk = ch + if emplace { + chunkCount.count++ + } c.chunks[ch.Address().ByteString()] = chunkCount + return nil } diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 2f3857d2ed3..5334983c13a 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -181,6 +181,44 @@ func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID return stamp, nil } +// LoadWithBatchID returns swarm.Stamp related to the given address and batchID. +func LoadWithStampHash(s storage.Reader, scope string, addr swarm.Address, hash []byte) (swarm.Stamp, error) { + var stamp swarm.Stamp + + found := false + err := s.Iterate( + storage.Query{ + Factory: func() storage.Item { + return &Item{ + scope: []byte(scope), + address: addr, + } + }, + }, + func(res storage.Result) (bool, error) { + item := res.Entry.(*Item) + h, err := item.stamp.Hash() + if err != nil { + return false, err + } + if bytes.Equal(hash, h) { + stamp = item.stamp + found = true + return true, nil + } + return false, nil + }, + ) + if err != nil { + return nil, err + } + if !found { + return nil, fmt.Errorf("stamp not found for hash %x: %w", hash, storage.ErrNotFound) + } + + return stamp, nil +} + // Store creates new or updated an existing stamp index // record related to the given scope and chunk. func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error { diff --git a/pkg/storer/internal/chunkstamp/chunkstamp_test.go b/pkg/storer/internal/chunkstamp/chunkstamp_test.go index 1167a56f10a..a000cfb8694 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp_test.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp_test.go @@ -185,7 +185,7 @@ func TestStoreLoadDelete(t *testing.T) { } }) - t.Run("load stored chunk stamp with batch id", func(t *testing.T) { + t.Run("load stored chunk stamp with batch id and hash", func(t *testing.T) { want := chunk.Stamp() have, err := chunkstamp.LoadWithBatchID(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()) @@ -196,6 +196,20 @@ func TestStoreLoadDelete(t *testing.T) { if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" { t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff) } + + h, err := want.Hash() + if err != nil { + t.Fatal(err) + } + + have, err = chunkstamp.LoadWithStampHash(ts.IndexStore(), ns, chunk.Address(), h) + if err != nil { + t.Fatalf("LoadWithBatchID(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" { + t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff) + } }) t.Run("delete stored stamp", func(t *testing.T) { diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index 67fee1e6d77..cc2743b97c4 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -94,7 +94,7 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. return s.Put(rIdx) } -func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { +func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk, emplace bool) error { rIdx := &RetrievalIndexItem{Address: ch.Address()} err := s.Get(rIdx) if err != nil { @@ -112,6 +112,9 @@ func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch sw } rIdx.Location = loc rIdx.Timestamp = uint64(time.Now().Unix()) + if emplace { + rIdx.RefCnt++ + } return s.Put(rIdx) } diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index f45df03ea41..92fa3f188fd 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -212,7 +212,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) - return s.ChunkStore().Replace(ctx, chunk) + return s.ChunkStore().Replace(ctx, chunk, false) } } @@ -282,7 +282,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } if sameAddressSoc { - err = s.ChunkStore().Replace(ctx, chunk) + err = s.ChunkStore().Replace(ctx, chunk, true) } else { err = s.ChunkStore().Put(ctx, chunk) } @@ -321,7 +321,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s return nil, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, addr, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, addr, stampHash) if err != nil { return nil, err } @@ -423,7 +423,7 @@ func RemoveChunkWithItem( ) error { var errs error - stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveScope, item.Address, item.BatchID) + stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash) if stamp != nil { errs = errors.Join( stampindex.Delete(trx.IndexStore(), reserveScope, stamp), @@ -473,7 +473,7 @@ func (r *Reserve) IterateChunks(startBin uint8, cb func(swarm.Chunk) (bool, erro return false, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, item.Address, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, item.Address, item.StampHash) if err != nil { return false, err } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 15993fd7de8..8e83ae38c03 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -603,6 +603,73 @@ func TestEvict(t *testing.T) { } } +func TestEvictSOC(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + signer := getSigner(t) + + var chunks []swarm.Chunk + + for i := 0; i < 10; i++ { + ch := soctesting.GenerateMockSocWithSigner(t, []byte{byte(i)}, signer).Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, uint64(i), uint64(i))) + chunks = append(chunks, ch) + err := r.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + bin := swarm.Proximity(baseAddr.Bytes(), chunks[0].Address().Bytes()) + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, false) + checkChunk(t, ts, ch, false) + } + + _, err = r.EvictBatchBin(context.Background(), batch.ID, 1, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0 + + evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + if evicted != 9 { + t.Fatalf("wanted evicted count 10, got %d", evicted) + } + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, true) + checkChunk(t, ts, ch, true) + } +} + func TestEvictMaxCount(t *testing.T) { t.Parallel() diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 2dc0c7c4bcd..7fb247da391 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -242,11 +242,11 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } -func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) { +func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk, emplace bool) (err error) { defer handleMetric("chunkstore_replace", c.metrics)(&err) unlock := c.lock(ch.Address()) defer unlock() - return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch) + return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch, emplace) } func (c *chunkStoreTrx) lock(addr swarm.Address) func() {