diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index d2cbab72e5d..ea348d29064 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -143,6 +143,9 @@ jobs: - name: Test soc id: soc run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-soc + - name: Test gsoc + id: gsoc + run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-gsoc - name: Test pushsync (chunks) id: pushsync-chunks-1 run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-pushsync-chunks diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index b545f27d746..5aca29f9e22 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -818,6 +818,28 @@ paths: default: description: Default response + "/gsoc/subscribe/{address}": + get: + summary: Subscribe to GSOC payloads + tags: + - GSOC + - Subscribe + - Websocket + parameters: + - in: path + name: reference + schema: + $ref: "SwarmCommon.yaml#/components/schemas/SwarmReference" + required: true + description: "Single Owner Chunk address (which may have multiple payloads)" + responses: + "200": + description: Returns a WebSocket with a subscription for incoming message data on the requested SOC address. + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + default: + description: Default response + "/soc/{owner}/{id}": post: summary: Upload single owner chunk @@ -847,8 +869,6 @@ paths: schema: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" required: true - - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter" - required: false - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" diff --git a/pkg/api/accesscontrol_test.go b/pkg/api/accesscontrol_test.go index 509a2cea1b8..c00318c1afb 100644 --- a/pkg/api/accesscontrol_test.go +++ b/pkg/api/accesscontrol_test.go @@ -100,6 +100,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { resp struct { Reference swarm.Address `json:"reference"` } + direct bool }{ { name: "bzz", @@ -159,6 +160,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { data: bytes.NewReader(sch.WrappedChunk.Data()), expdata: sch.Chunk().Data(), contenttype: "binary/octet-stream", + direct: true, }, } @@ -183,13 +185,24 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) { upTestOpts = append(upTestOpts, jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True")) } t.Run(v.name, func(t *testing.T) { - client, _, _, _ := newTestServer(t, testServerOptions{ + client, _, _, chanStore := newTestServer(t, testServerOptions{ Storer: storerMock, Logger: logger, Post: mockpost.New(mockpost.WithAcceptAll()), PublicKey: pk.PublicKey, AccessControl: mockac.New(), + DirectUpload: v.direct, }) + + if chanStore != nil { + chanStore.Subscribe(func(chunk swarm.Chunk) { + err := storerMock.Put(context.Background(), chunk) + if err != nil { + t.Fatal(err) + } + }) + } + header := jsonhttptest.Request(t, client, http.MethodPost, v.upurl, http.StatusCreated, upTestOpts..., ) diff --git a/pkg/api/api.go b/pkg/api/api.go index ce90a1004f6..b226a5cff24 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -33,6 +33,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/file/pipeline" "github.com/ethersphere/bee/v2/pkg/file/pipeline/builder" "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/p2p" @@ -151,6 +152,7 @@ type Service struct { storer Storer resolver resolver.Interface pss pss.Interface + gsoc gsoc.Listener steward steward.Interface logger log.Logger loggerV1 log.Logger @@ -254,6 +256,7 @@ type ExtraOptions struct { Storer Storer Resolver resolver.Interface Pss pss.Interface + Gsoc gsoc.Listener FeedFactory feeds.Factory Post postage.Service AccessControl accesscontrol.Controller @@ -337,6 +340,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti s.storer = e.Storer s.resolver = e.Resolver s.pss = e.Pss + s.gsoc = e.Gsoc s.feedFactory = e.FeedFactory s.post = e.Post s.accesscontrol = e.AccessControl @@ -682,7 +686,12 @@ type putterSessionWrapper struct { } func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error { - stamp, err := p.stamper.Stamp(chunk.Address()) + idAddress, err := storage.IdentityAddress(chunk) + if err != nil { + return err + } + + stamp, err := p.stamper.Stamp(chunk.Address(), idAddress) if err != nil { return err } diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index e480fd0d862..63f366615f3 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -32,6 +32,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/file/pipeline" "github.com/ethersphere/bee/v2/pkg/file/pipeline/builder" "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock" @@ -93,6 +94,7 @@ type testServerOptions struct { StateStorer storage.StateStorer Resolver resolver.Interface Pss pss.Interface + Gsoc gsoc.Listener WsPath string WsPingPeriod time.Duration Logger log.Logger @@ -194,6 +196,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. Storer: o.Storer, Resolver: o.Resolver, Pss: o.Pss, + Gsoc: o.Gsoc, FeedFactory: o.Feeds, Post: o.Post, AccessControl: o.AccessControl, @@ -630,6 +633,7 @@ type chanStorer struct { lock sync.Mutex chunks map[string]struct{} quit chan struct{} + subs []func(chunk swarm.Chunk) } func newChanStore(cc <-chan *pusher.Op) *chanStorer { @@ -647,6 +651,9 @@ func (c *chanStorer) drain(cc <-chan *pusher.Op) { case op := <-cc: c.lock.Lock() c.chunks[op.Chunk.Address().ByteString()] = struct{}{} + for _, h := range c.subs { + h(op.Chunk) + } c.lock.Unlock() op.Err <- nil case <-c.quit: @@ -667,6 +674,12 @@ func (c *chanStorer) Has(addr swarm.Address) bool { return ok } +func (c *chanStorer) Subscribe(f func(chunk swarm.Chunk)) { + c.lock.Lock() + defer c.lock.Unlock() + c.subs = append(c.subs, f) +} + func createRedistributionAgentService( t *testing.T, addr swarm.Address, diff --git a/pkg/api/envelope.go b/pkg/api/envelope.go index 6a99b98f61c..ea12283ac4d 100644 --- a/pkg/api/envelope.go +++ b/pkg/api/envelope.go @@ -59,7 +59,7 @@ func (s *Service) envelopePostHandler(w http.ResponseWriter, r *http.Request) { return } - stamp, err := stamper.Stamp(paths.Address) + stamp, err := stamper.Stamp(paths.Address, paths.Address) if err != nil { logger.Debug("split write all failed", "error", err) logger.Error(nil, "split write all failed") diff --git a/pkg/api/gsoc.go b/pkg/api/gsoc.go new file mode 100644 index 00000000000..60d048ffdc0 --- /dev/null +++ b/pkg/api/gsoc.go @@ -0,0 +1,120 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api + +import ( + "net/http" + "time" + + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("gsoc_subscribe").Build() + + paths := struct { + Address swarm.Address `map:"address,resolve" validate:"required"` + }{} + + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) + return + } + + upgrader := websocket.Upgrader{ + ReadBufferSize: swarm.ChunkSize, + WriteBufferSize: swarm.ChunkSize, + CheckOrigin: s.checkOrigin, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Debug("upgrade failed", "error", err) + logger.Error(nil, "upgrade failed") + jsonhttp.InternalServerError(w, "upgrade failed") + return + } + + s.wsWg.Add(1) + go s.gsocListeningWs(conn, paths.Address) +} + +func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) { + defer s.wsWg.Done() + + var ( + dataC = make(chan []byte) + gone = make(chan struct{}) + ticker = time.NewTicker(s.WsPingPeriod) + err error + ) + defer func() { + ticker.Stop() + _ = conn.Close() + }() + cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) { + select { + case dataC <- m: + case <-gone: + return + case <-s.quit: + return + } + }) + + defer cleanup() + + conn.SetCloseHandler(func(code int, text string) error { + s.logger.Debug("gsoc ws: client gone", "code", code, "message", text) + close(gone) + return nil + }) + + for { + select { + case b := <-dataC: + err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + s.logger.Debug("gsoc ws: set write deadline failed", "error", err) + return + } + + err = conn.WriteMessage(websocket.BinaryMessage, b) + if err != nil { + s.logger.Debug("gsoc ws: write message failed", "error", err) + return + } + + case <-s.quit: + // shutdown + err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + s.logger.Debug("gsoc ws: set write deadline failed", "error", err) + return + } + err = conn.WriteMessage(websocket.CloseMessage, []byte{}) + if err != nil { + s.logger.Debug("gsoc ws: write close message failed", "error", err) + } + return + case <-gone: + // client gone + return + case <-ticker.C: + err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err != nil { + s.logger.Debug("gsoc ws: set write deadline failed", "error", err) + return + } + if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil { + // error encountered while pinging client. client probably gone + return + } + } + } +} diff --git a/pkg/api/gsoc_test.go b/pkg/api/gsoc_test.go new file mode 100644 index 00000000000..edef7a39842 --- /dev/null +++ b/pkg/api/gsoc_test.go @@ -0,0 +1,171 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api_test + +import ( + "encoding/hex" + "fmt" + "net/url" + "strings" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/gsoc" + "github.com/ethersphere/bee/v2/pkg/log" + mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" + "github.com/ethersphere/bee/v2/pkg/soc" + mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/util/testutil" + "github.com/gorilla/websocket" +) + +// TestGsocWebsocketSingleHandler creates a single websocket handler on a chunk address, and receives a message +func TestGsocWebsocketSingleHandler(t *testing.T) { + t.Parallel() + + var ( + id = make([]byte, 32) + g, cl, signer, _ = newGsocTest(t, id, 0) + respC = make(chan error, 1) + payload = []byte("hello there!") + ) + + err := cl.SetReadDeadline(time.Now().Add(2 * time.Second)) + if err != nil { + t.Fatal(err) + } + cl.SetReadLimit(swarm.ChunkSize) + + ch, _ := cac.New(payload) + socCh := soc.New(id, ch) + ch, _ = socCh.Sign(signer) + socCh, _ = soc.FromChunk(ch) + g.Handle(socCh) + + go expectMessage(t, cl, respC, payload) + if err := <-respC; err != nil { + t.Fatal(err) + } +} + +func TestGsocWebsocketMultiHandler(t *testing.T) { + t.Parallel() + + var ( + id = make([]byte, 32) + g, cl, signer, listener = newGsocTest(t, make([]byte, 32), 0) + owner, _ = signer.EthereumAddress() + chunkAddr, _ = soc.CreateAddress(id, owner.Bytes()) + u = url.URL{Scheme: "ws", Host: listener, Path: fmt.Sprintf("/gsoc/subscribe/%s", hex.EncodeToString(chunkAddr.Bytes()))} + cl2, _, err = websocket.DefaultDialer.Dial(u.String(), nil) + respC = make(chan error, 2) + ) + if err != nil { + t.Fatalf("dial: %v. url %v", err, u.String()) + } + testutil.CleanupCloser(t, cl2) + + err = cl.SetReadDeadline(time.Now().Add(2 * time.Second)) + if err != nil { + t.Fatal(err) + } + cl.SetReadLimit(swarm.ChunkSize) + + ch, _ := cac.New(payload) + socCh := soc.New(id, ch) + ch, _ = socCh.Sign(signer) + socCh, _ = soc.FromChunk(ch) + + // close the websocket before calling GSOC with the message + err = cl.WriteMessage(websocket.CloseMessage, []byte{}) + if err != nil { + t.Fatal(err) + } + + g.Handle(socCh) + + go expectMessage(t, cl, respC, payload) + go expectMessage(t, cl2, respC, payload) + if err := <-respC; err != nil { + t.Fatal(err) + } + if err := <-respC; err != nil { + t.Fatal(err) + } +} + +// TestGsocPong tests that the websocket api adheres to the websocket standard +// and sends ping-pong messages to keep the connection alive. +// The test opens a websocket, keeps it alive for 500ms, then receives a GSOC message. +func TestGsocPong(t *testing.T) { + t.Parallel() + id := make([]byte, 32) + + var ( + g, cl, signer, _ = newGsocTest(t, id, 90*time.Millisecond) + + respC = make(chan error, 1) + pongWait = 1 * time.Millisecond + ) + + cl.SetReadLimit(swarm.ChunkSize) + err := cl.SetReadDeadline(time.Now().Add(pongWait)) + if err != nil { + t.Fatal(err) + } + + time.Sleep(500 * time.Millisecond) // wait to see that the websocket is kept alive + ch, _ := cac.New([]byte("hello there!")) + socCh := soc.New(id, ch) + ch, _ = socCh.Sign(signer) + socCh, _ = soc.FromChunk(ch) + + g.Handle(socCh) + + go expectMessage(t, cl, respC, nil) + if err := <-respC; err == nil || !strings.Contains(err.Error(), "i/o timeout") { + // note: error has *websocket.netError type so we need to check error by checking message + t.Fatal("want timeout error") + } +} + +func newGsocTest(t *testing.T, socId []byte, pingPeriod time.Duration) (gsoc.Listener, *websocket.Conn, crypto.Signer, string) { + t.Helper() + if pingPeriod == 0 { + pingPeriod = 10 * time.Second + } + var ( + batchStore = mockbatchstore.New() + storer = mockstorer.New() + ) + + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + owner, err := signer.EthereumAddress() + if err != nil { + t.Fatal(err) + } + chunkAddr, _ := soc.CreateAddress(socId, owner.Bytes()) + + gsoc := gsoc.New(log.NewLogger("test")) + testutil.CleanupCloser(t, gsoc) + + _, cl, listener, _ := newTestServer(t, testServerOptions{ + Gsoc: gsoc, + WsPath: fmt.Sprintf("/gsoc/subscribe/%s", hex.EncodeToString(chunkAddr.Bytes())), + Storer: storer, + BatchStore: batchStore, + Logger: log.Noop, + WsPingPeriod: pingPeriod, + }) + + return gsoc, cl, signer, listener +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 14c9ab05ee6..8cffd8ac32e 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -340,6 +340,10 @@ func (s *Service) mountAPI() { handle("/pss/subscribe/{topic}", http.HandlerFunc(s.pssWsHandler)) + handle("/gsoc/subscribe/{address}", web.ChainHandlers( + web.FinalHandlerFunc(s.gsocWsHandler), + )) + handle("/tags", jsonhttp.MethodHandler{ "GET": http.HandlerFunc(s.listTagsHandler), "POST": web.ChainHandlers( diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 09d4f1d3ae2..a30cc1174f6 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -18,8 +18,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/soc" - storage "github.com/ethersphere/bee/v2/pkg/storage" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/gorilla/mux" ) @@ -51,7 +50,6 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { headers := struct { BatchID []byte `map:"Swarm-Postage-Batch-Id"` StampSig []byte `map:"Swarm-Postage-Stamp"` - Pin bool `map:"Swarm-Pin"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` }{} @@ -66,30 +64,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { return } - // if pinning header is set we do a deferred upload, else we do a direct upload var ( - tag uint64 - err error + putter storer.PutterSession + err error ) - if headers.Pin { - session, err := s.storer.NewSession() - if err != nil { - logger.Debug("get or create tag failed", "error", err) - logger.Error(nil, "get or create tag failed") - switch { - case errors.Is(err, storage.ErrNotFound): - jsonhttp.NotFound(w, "tag not found") - default: - jsonhttp.InternalServerError(w, "cannot get or create tag") - } - return - } - tag = session.TagID - } - - deferred := tag != 0 - var putter storer.PutterSession if len(headers.StampSig) != 0 { stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(headers.StampSig); err != nil { @@ -102,16 +81,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { putter, err = s.newStampedPutter(r.Context(), putterOptions{ BatchID: stamp.BatchID(), - TagID: tag, - Pin: headers.Pin, - Deferred: deferred, + TagID: 0, + Pin: false, + Deferred: false, }, &stamp) } else { putter, err = s.newStamperPutter(r.Context(), putterOptions{ BatchID: headers.BatchID, - TagID: tag, - Pin: headers.Pin, - Deferred: deferred, + TagID: 0, + Pin: false, + Deferred: false, }) } if err != nil { @@ -124,6 +103,8 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { jsonhttp.NotFound(w, "batch with id not found") case errors.Is(err, errInvalidPostageBatch): jsonhttp.BadRequest(w, "invalid batch id") + case errors.Is(err, errUnsupportedDevNodeOperation): + jsonhttp.NotImplemented(w, "operation is not supported in dev mode") default: jsonhttp.BadRequest(w, nil) } diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index 344204f06c6..6c0d6fa0449 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -6,6 +6,7 @@ package api_test import ( "bytes" + "context" "encoding/hex" "fmt" "io" @@ -74,13 +75,20 @@ func TestSOC(t *testing.T) { t.Run("ok", func(t *testing.T) { s := testingsoc.GenerateMockSOC(t, testData) - client, _, _, _ := newTestServer(t, testServerOptions{ + client, _, _, chanStore := newTestServer(t, testServerOptions{ Storer: mockStorer, Post: newTestPostService(), DirectUpload: true, }) + + chanStore.Subscribe(func(ch swarm.Chunk) { + err := mockStorer.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + }) + jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated, - jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"), jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())), jsonhttptest.WithExpectedJSONResponse(api.SocPostResponse{ diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index ce00ac8d782..15eddc23e3b 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -1405,7 +1405,7 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error { return nil } -func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error { +func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error { c.mu.Lock() defer c.mu.Unlock() c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) diff --git a/pkg/gsoc/gsoc.go b/pkg/gsoc/gsoc.go new file mode 100644 index 00000000000..6497c9b0c63 --- /dev/null +++ b/pkg/gsoc/gsoc.go @@ -0,0 +1,96 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package gsoc + +import ( + "sync" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// Handler defines code to be executed upon reception of a GSOC sub message. +// it is used as a parameter definition. +type Handler func([]byte) + +type Listener interface { + Subscribe(address swarm.Address, handler Handler) (cleanup func()) + Handle(c *soc.SOC) + Close() error +} + +type listener struct { + handlers map[string][]*Handler + handlersMu sync.Mutex + quit chan struct{} + logger log.Logger +} + +// New returns a new GSOC listener service. +func New(logger log.Logger) Listener { + return &listener{ + logger: logger, + handlers: make(map[string][]*Handler), + quit: make(chan struct{}), + } +} + +// Subscribe allows the definition of a Handler func on a specific GSOC address. +func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) { + l.handlersMu.Lock() + defer l.handlersMu.Unlock() + + l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler) + + return func() { + l.handlersMu.Lock() + defer l.handlersMu.Unlock() + + h := l.handlers[address.ByteString()] + for i := 0; i < len(h); i++ { + if h[i] == &handler { + l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...) + return + } + } + } +} + +// Handle is called by push/pull sync and passes the chunk its registered handler +func (l *listener) Handle(c *soc.SOC) { + addr, err := c.Address() + if err != nil { + return // no handler + } + h := l.getHandlers(addr) + if h == nil { + return // no handler + } + l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address()) + + for _, hh := range h { + go func(hh Handler) { + hh(c.WrappedChunk().Data()[swarm.SpanSize:]) + }(*hh) + } +} + +func (p *listener) getHandlers(address swarm.Address) []*Handler { + p.handlersMu.Lock() + defer p.handlersMu.Unlock() + + return p.handlers[address.ByteString()] +} + +func (l *listener) Close() error { + close(l.quit) + l.handlersMu.Lock() + defer l.handlersMu.Unlock() + + l.handlers = make(map[string][]*Handler) //unset handlers on shutdown + + return nil +} diff --git a/pkg/gsoc/gsoc_test.go b/pkg/gsoc/gsoc_test.go new file mode 100644 index 00000000000..dfbe8e03a5c --- /dev/null +++ b/pkg/gsoc/gsoc_test.go @@ -0,0 +1,124 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package gsoc_test + +import ( + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/gsoc" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/util/testutil" +) + +// TestRegister verifies that handler funcs are able to be registered correctly in pss +func TestRegister(t *testing.T) { + t.Parallel() + + var ( + g = gsoc.New(log.Noop) + h1Calls = 0 + h2Calls = 0 + h3Calls = 0 + msgChan = make(chan struct{}) + + payload1 = []byte("Hello there!") + payload2 = []byte("General Kenobi. You are a bold one. Kill him!") + socId1 = testutil.RandBytes(t, 32) + socId2 = append([]byte{socId1[0] + 1}, socId1[1:]...) + privKey, _ = crypto.GenerateSecp256k1Key() + signer = crypto.NewDefaultSigner(privKey) + owner, _ = signer.EthereumAddress() + address1, _ = soc.CreateAddress(socId1, owner.Bytes()) + address2, _ = soc.CreateAddress(socId2, owner.Bytes()) + + h1 = func(m []byte) { + h1Calls++ + msgChan <- struct{}{} + } + + h2 = func(m []byte) { + h2Calls++ + msgChan <- struct{}{} + } + + h3 = func(m []byte) { + h3Calls++ + msgChan <- struct{}{} + } + ) + _ = g.Subscribe(address1, h1) + _ = g.Subscribe(address2, h2) + + ch1, _ := cac.New(payload1) + socCh1 := soc.New(socId1, ch1) + ch1, _ = socCh1.Sign(signer) + socCh1, _ = soc.FromChunk(ch1) + + ch2, _ := cac.New(payload2) + socCh2 := soc.New(socId2, ch2) + ch2, _ = socCh2.Sign(signer) + socCh2, _ = soc.FromChunk(ch2) + + // trigger soc upload on address1, check that only h1 is called + g.Handle(socCh1) + + waitHandlerCallback(t, &msgChan, 1) + + ensureCalls(t, &h1Calls, 1) + ensureCalls(t, &h2Calls, 0) + + // register another handler on the first address + cleanup := g.Subscribe(address1, h3) + + g.Handle(socCh1) + + waitHandlerCallback(t, &msgChan, 2) + + ensureCalls(t, &h1Calls, 2) + ensureCalls(t, &h2Calls, 0) + ensureCalls(t, &h3Calls, 1) + + cleanup() // remove the last handler + + g.Handle(socCh1) + + waitHandlerCallback(t, &msgChan, 1) + + ensureCalls(t, &h1Calls, 3) + ensureCalls(t, &h2Calls, 0) + ensureCalls(t, &h3Calls, 1) + + g.Handle(socCh2) + + waitHandlerCallback(t, &msgChan, 1) + + ensureCalls(t, &h1Calls, 3) + ensureCalls(t, &h2Calls, 1) + ensureCalls(t, &h3Calls, 1) +} + +func ensureCalls(t *testing.T, calls *int, exp int) { + t.Helper() + + if exp != *calls { + t.Fatalf("expected %d calls, found %d", exp, *calls) + } +} + +func waitHandlerCallback(t *testing.T, msgChan *chan struct{}, count int) { + t.Helper() + + for received := 0; received < count; received++ { + select { + case <-*msgChan: + case <-time.After(1 * time.Second): + t.Fatal("reached timeout while waiting for handler message") + } + } +} diff --git a/pkg/node/devnode.go b/pkg/node/devnode.go index 9d34f1aaa57..9ff5363c455 100644 --- a/pkg/node/devnode.go +++ b/pkg/node/devnode.go @@ -23,6 +23,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/bzz" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/feeds/factory" + "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/log" mockP2P "github.com/ethersphere/bee/v2/pkg/p2p/mock" mockPingPong "github.com/ethersphere/bee/v2/pkg/pingpong/mock" @@ -340,6 +341,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) { Storer: localStore, Resolver: mockResolver, Pss: pssService, + Gsoc: gsoc.New(logger), FeedFactory: mockFeeds, Post: post, AccessControl: accesscontrol, diff --git a/pkg/node/node.go b/pkg/node/node.go index 8c2d98b334a..2cda81da54f 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -32,6 +32,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/config" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/feeds/factory" + "github.com/ethersphere/bee/v2/pkg/gsoc" "github.com/ethersphere/bee/v2/pkg/hive" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/metrics" @@ -103,6 +104,7 @@ type Bee struct { accountingCloser io.Closer pullSyncCloser io.Closer pssCloser io.Closer + gsocCloser io.Closer ethClientCloser func() transactionMonitorCloser io.Closer transactionCloser io.Closer @@ -898,7 +900,9 @@ func NewBee( pricing.SetPaymentThresholdObserver(acc) pssService := pss.New(pssPrivateKey, logger) + gsocService := gsoc.New(logger) b.pssCloser = pssService + b.gsocCloser = gsocService validStamp := postage.ValidStamp(batchStore) @@ -951,7 +955,7 @@ func NewBee( } } - pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, validStamp, logger, acc, pricer, signer, tracer, warmupTime) + pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime) b.pushSyncCloser = pushSyncProtocol // set the pushSyncer in the PSS @@ -965,7 +969,7 @@ func NewBee( pusherService.AddFeed(localStore.PusherFeed()) - pullSyncProtocol := pullsync.New(p2ps, localStore, pssService.TryUnwrap, validStamp, logger, pullsync.DefaultMaxPage) + pullSyncProtocol := pullsync.New(p2ps, localStore, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, pullsync.DefaultMaxPage) b.pullSyncCloser = pullSyncProtocol retrieveProtocolSpec := retrieval.Protocol() @@ -1118,6 +1122,7 @@ func NewBee( Storer: localStore, Resolver: multiResolver, Pss: pssService, + Gsoc: gsocService, FeedFactory: feedFactory, Post: post, AccessControl: accesscontrol, @@ -1253,11 +1258,15 @@ func (b *Bee) Shutdown() error { } var wg sync.WaitGroup - wg.Add(7) + wg.Add(8) go func() { defer wg.Done() tryClose(b.pssCloser, "pss") }() + go func() { + defer wg.Done() + tryClose(b.gsocCloser, "gsoc") + }() go func() { defer wg.Done() tryClose(b.pusherCloser, "pusher") diff --git a/pkg/p2p/streamtest/streamtest.go b/pkg/p2p/streamtest/streamtest.go index a9892687240..ae312624149 100644 --- a/pkg/p2p/streamtest/streamtest.go +++ b/pkg/p2p/streamtest/streamtest.go @@ -96,6 +96,13 @@ func New(opts ...Option) *Recorder { return r } +func (r *Recorder) Reset() { + r.recordsMu.Lock() + defer r.recordsMu.Unlock() + + r.records = make(map[string][]*Record) +} + func (r *Recorder) SetProtocols(protocols ...p2p.ProtocolSpec) { r.protocols = append(r.protocols, protocols...) } diff --git a/pkg/postage/mock/stamper.go b/pkg/postage/mock/stamper.go index f95700eb5e5..9fbc5268b1c 100644 --- a/pkg/postage/mock/stamper.go +++ b/pkg/postage/mock/stamper.go @@ -17,6 +17,11 @@ func NewStamper() postage.Stamper { } // Stamp implements the Stamper interface. It returns an empty postage stamp. -func (mockStamper) Stamp(_ swarm.Address) (*postage.Stamp, error) { +func (mockStamper) Stamp(_, _ swarm.Address) (*postage.Stamp, error) { return &postage.Stamp{}, nil } + +// Stamp implements the Stamper interface. It returns an empty postage stamp. +func (mockStamper) BatchId() []byte { + return nil +} diff --git a/pkg/postage/stamp_test.go b/pkg/postage/stamp_test.go index 8704cf91b65..948e5311686 100644 --- a/pkg/postage/stamp_test.go +++ b/pkg/postage/stamp_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing" ) @@ -103,7 +104,12 @@ func TestValidStamp(t *testing.T) { // stamp on execution ch := chunktesting.GenerateTestRandomChunk() - st, err := stamper.Stamp(ch.Address()) + idAddress, err := storage.IdentityAddress(ch) + if err != nil { + t.Fatal(err) + } + + st, err := stamper.Stamp(ch.Address(), idAddress) if err != nil { t.Fatal(err) } diff --git a/pkg/postage/stamper.go b/pkg/postage/stamper.go index d4f3b8187fa..bd9ce86a390 100644 --- a/pkg/postage/stamper.go +++ b/pkg/postage/stamper.go @@ -21,7 +21,9 @@ var ( // Stamper can issue stamps from the given address of chunk. type Stamper interface { - Stamp(swarm.Address) (*Stamp, error) + // addr is the request address of the chunk and idAddr is the identity address of the chunk. + Stamp(addr, idAddr swarm.Address) (*Stamp, error) + BatchId() []byte } // stamper connects a stampissuer with a signer. @@ -39,13 +41,13 @@ func NewStamper(store storage.Store, issuer *StampIssuer, signer crypto.Signer) // Stamp takes chunk, see if the chunk can be included in the batch and // signs it with the owner of the batch of this Stamp issuer. -func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) { +func (st *stamper) Stamp(addr, idAddr swarm.Address) (*Stamp, error) { st.issuer.mtx.Lock() defer st.issuer.mtx.Unlock() item := &StampItem{ BatchID: st.issuer.data.BatchID, - chunkAddress: addr, + chunkAddress: idAddr, } switch err := st.store.Get(item); { case err == nil: @@ -81,6 +83,11 @@ func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) { return NewStamp(st.issuer.data.BatchID, item.BatchIndex, item.BatchTimestamp, sig), nil } +// BatchId gives back batch id of stamper +func (st *stamper) BatchId() []byte { + return st.issuer.data.BatchID +} + type presignedStamper struct { stamp *Stamp owner []byte @@ -90,7 +97,7 @@ func NewPresignedStamper(stamp *Stamp, owner []byte) Stamper { return &presignedStamper{stamp, owner} } -func (st *presignedStamper) Stamp(addr swarm.Address) (*Stamp, error) { +func (st *presignedStamper) Stamp(addr, _ swarm.Address) (*Stamp, error) { // check stored stamp is against the chunk address // Recover the public key from the signature signerAddr, err := RecoverBatchOwner(addr, st.stamp) @@ -104,3 +111,7 @@ func (st *presignedStamper) Stamp(addr swarm.Address) (*Stamp, error) { return st.stamp, nil } + +func (st *presignedStamper) BatchId() []byte { + return st.stamp.BatchID() +} diff --git a/pkg/postage/stamper_test.go b/pkg/postage/stamper_test.go index a1c589b145c..4069a6daac6 100644 --- a/pkg/postage/stamper_test.go +++ b/pkg/postage/stamper_test.go @@ -33,7 +33,7 @@ func TestStamperStamping(t *testing.T) { t.Helper() chunkAddr := swarm.RandAddress(t) - stamp, err := stamper.Stamp(chunkAddr) + stamp, err := stamper.Stamp(chunkAddr, chunkAddr) if err != nil { t.Fatal(err) } @@ -71,12 +71,14 @@ func TestStamperStamping(t *testing.T) { // issue another 15 // collision depth is 8, committed batch depth is 12, bucket volume 2^4 for i := 0; i < 14; i++ { - _, err = stamper.Stamp(swarm.RandAddressAt(t, chunkAddr, 8)) + randAddr := swarm.RandAddressAt(t, chunkAddr, 8) + _, err = stamper.Stamp(randAddr, randAddr) if err != nil { t.Fatalf("error adding stamp at step %d: %v", i, err) } } - stamp, err := stamper.Stamp(swarm.RandAddressAt(t, chunkAddr, 8)) + randAddr := swarm.RandAddressAt(t, chunkAddr, 8) + stamp, err := stamper.Stamp(randAddr, randAddr) if err != nil { t.Fatalf("error adding last stamp: %v", err) } @@ -95,13 +97,15 @@ func TestStamperStamping(t *testing.T) { // issue another 15 // collision depth is 8, committed batch depth is 12, bucket volume 2^4 for i := 0; i < 15; i++ { - _, err = stamper.Stamp(swarm.RandAddressAt(t, chunkAddr, 8)) + randAddr := swarm.RandAddressAt(t, chunkAddr, 8) + _, err = stamper.Stamp(randAddr, randAddr) if err != nil { t.Fatalf("error adding stamp at step %d: %v", i, err) } } + randAddr := swarm.RandAddressAt(t, chunkAddr, 8) // the bucket should now be full, not allowing a stamp for the pivot chunk - if _, err = stamper.Stamp(swarm.RandAddressAt(t, chunkAddr, 8)); !errors.Is(err, postage.ErrBucketFull) { + if _, err = stamper.Stamp(randAddr, randAddr); !errors.Is(err, postage.ErrBucketFull) { t.Fatalf("expected ErrBucketFull, got %v", err) } }) @@ -117,7 +121,7 @@ func TestStamperStamping(t *testing.T) { WithBatchIndex(index) testSt := &testStore{Store: inmemstore.New(), stampItem: testItem} stamper := postage.NewStamper(testSt, st, signer) - stamp, err := stamper.Stamp(chunkAddr) + stamp, err := stamper.Stamp(chunkAddr, chunkAddr) if err != nil { t.Fatal(err) } diff --git a/pkg/pss/pss.go b/pkg/pss/pss.go index 3a09db0b633..454f5960129 100644 --- a/pkg/pss/pss.go +++ b/pkg/pss/pss.go @@ -99,7 +99,7 @@ func (p *pss) Send(ctx context.Context, topic Topic, payload []byte, stamper pos return err } - stamp, err := stamper.Stamp(tc.Address()) + stamp, err := stamper.Stamp(tc.Address(), tc.Address()) if err != nil { return err } diff --git a/pkg/pss/pss_test.go b/pkg/pss/pss_test.go index 1d04237bbaa..685adb59fb9 100644 --- a/pkg/pss/pss_test.go +++ b/pkg/pss/pss_test.go @@ -236,8 +236,15 @@ func ensureCalls(t *testing.T, calls *int, exp int) { } } -type stamper struct{} +type stamper struct { + stamp *postage.Stamp +} + +func (s *stamper) Stamp(_, _ swarm.Address) (*postage.Stamp, error) { + stamp := postagetesting.MustNewStamp() + return stamp, nil +} -func (s *stamper) Stamp(_ swarm.Address) (*postage.Stamp, error) { - return postagetesting.MustNewStamp(), nil +func (s *stamper) BatchId() []byte { + return s.stamp.BatchID() } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index afd2ee17fed..1f8d55d4495 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -71,6 +71,7 @@ type Syncer struct { store storer.Reserve quit chan struct{} unwrap func(swarm.Chunk) + gsocHandler func(*soc.SOC) validStamp postage.ValidStampFn intervalsSF singleflight.Group[string, *collectAddrsResult] syncInProgress atomic.Int32 @@ -87,21 +88,23 @@ func New( streamer p2p.Streamer, store storer.Reserve, unwrap func(swarm.Chunk), + gsocHandler func(*soc.SOC), validStamp postage.ValidStampFn, logger log.Logger, maxPage uint64, ) *Syncer { return &Syncer{ - streamer: streamer, - store: store, - metrics: newMetrics(), - unwrap: unwrap, - validStamp: validStamp, - logger: logger.WithName(loggerName).Register(), - quit: make(chan struct{}), - maxPage: maxPage, - limiter: ratelimit.New(handleRequestsLimitRate, int(maxPage)), + streamer: streamer, + store: store, + metrics: newMetrics(), + unwrap: unwrap, + gsocHandler: gsocHandler, + validStamp: validStamp, + logger: logger.WithName(loggerName).Register(), + quit: make(chan struct{}), + maxPage: maxPage, + limiter: ratelimit.New(handleRequestsLimitRate, int(maxPage)), } } @@ -356,7 +359,9 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start if cac.Valid(chunk) { go s.unwrap(chunk) - } else if !soc.Valid(chunk) { + } else if chunk, err := soc.FromChunk(chunk); err == nil { + s.gsocHandler(chunk) + } else { s.logger.Debug("invalid cac/soc chunk", "error", swarm.ErrInvalidChunk, "peer_address", peer, "chunk", chunk) chunkErr = errors.Join(chunkErr, swarm.ErrInvalidChunk) s.metrics.ReceivedInvalidChunk.Inc() diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index e77f54705ed..fc80bae137f 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -17,6 +17,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" "github.com/ethersphere/bee/v2/pkg/pullsync" + "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storage" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/storer" @@ -353,10 +354,12 @@ func newPullSyncWithStamperValidator( storage := mock.NewReserve(o...) logger := log.Noop unwrap := func(swarm.Chunk) {} + socHandler := func(*soc.SOC) {} ps := pullsync.New( s, storage, unwrap, + socHandler, validStamp, logger, maxPage, diff --git a/pkg/pusher/inflight.go b/pkg/pusher/inflight.go index 788872d2652..99ec53c96ff 100644 --- a/pkg/pusher/inflight.go +++ b/pkg/pusher/inflight.go @@ -12,28 +12,33 @@ import ( type inflight struct { mtx sync.Mutex - inflight map[string]struct{} + inflight map[[64]byte]struct{} } func newInflight() *inflight { return &inflight{ - inflight: make(map[string]struct{}), + inflight: make(map[[64]byte]struct{}), } } -func (i *inflight) delete(ch swarm.Chunk) { - key := ch.Address().ByteString() + string(ch.Stamp().BatchID()) +func (i *inflight) delete(idAddress swarm.Address, batchID []byte) { + var key [64]byte + copy(key[:32], idAddress.Bytes()) + copy(key[32:], batchID) + i.mtx.Lock() delete(i.inflight, key) i.mtx.Unlock() } -func (i *inflight) set(ch swarm.Chunk) bool { +func (i *inflight) set(idAddress swarm.Address, batchID []byte) bool { + var key [64]byte + copy(key[:32], idAddress.Bytes()) + copy(key[32:], batchID) i.mtx.Lock() defer i.mtx.Unlock() - key := ch.Address().ByteString() + string(ch.Stamp().BatchID()) if _, ok := i.inflight[key]; ok { return true } @@ -50,16 +55,16 @@ type attempts struct { // try to log a chunk sync attempt. returns false when // maximum amount of attempts have been reached. -func (a *attempts) try(ch swarm.Address) bool { +func (a *attempts) try(idAddress swarm.Address) bool { a.mtx.Lock() defer a.mtx.Unlock() - key := ch.ByteString() + key := idAddress.ByteString() a.attempts[key]++ return a.attempts[key] < a.retryCount } -func (a *attempts) delete(ch swarm.Address) { +func (a *attempts) delete(idAddress swarm.Address) { a.mtx.Lock() - delete(a.attempts, ch.ByteString()) + delete(a.attempts, idAddress.ByteString()) a.mtx.Unlock() } diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 7defcfb37cd..71ffed77ac3 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -35,6 +35,8 @@ type Op struct { Err chan error Direct bool Span opentracing.Span + + identityAddress swarm.Address } type OpChan <-chan *Op @@ -214,7 +216,13 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { for { select { case op := <-cc: - if s.inflight.set(op.Chunk) { + idAddress, err := storage.IdentityAddress(op.Chunk) + if err != nil { + op.Err <- err + continue + } + op.identityAddress = idAddress + if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) { if op.Direct { select { case op.Err <- nil: @@ -241,7 +249,7 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) { loggerV1 := logger.V(1).Build() - defer s.inflight.delete(op.Chunk) + defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) if _, err := s.validStamp(op.Chunk); err != nil { loggerV1.Warning( @@ -254,7 +262,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( return false, errors.Join(err, s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync)) } - switch receipt, err := s.pushSyncer.PushChunkToClosest(ctx, op.Chunk); { + switch _, err := s.pushSyncer.PushChunkToClosest(ctx, op.Chunk); { case errors.Is(err, topology.ErrWantSelf): // store the chunk loggerV1.Debug("chunk stays here, i'm the closest node", "chunk_address", op.Chunk.Address()) @@ -269,7 +277,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( return true, err } case errors.Is(err, pushsync.ErrShallowReceipt): - if retry := s.shallowReceipt(receipt); retry { + if retry := s.shallowReceipt(op.identityAddress); retry { return true, err } if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil { @@ -295,7 +303,7 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err var err error defer func() { - s.inflight.delete(op.Chunk) + s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) select { case op.Err <- err: default: @@ -329,11 +337,11 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err return err } -func (s *Service) shallowReceipt(receipt *pushsync.Receipt) bool { - if s.attempts.try(receipt.Address) { +func (s *Service) shallowReceipt(idAddress swarm.Address) bool { + if s.attempts.try(idAddress) { return true } - s.attempts.delete(receipt.Address) + s.attempts.delete(idAddress) return false } diff --git a/pkg/pusher/pusher_test.go b/pkg/pusher/pusher_test.go index 3925e80ec31..fbdfb3a9acd 100644 --- a/pkg/pusher/pusher_test.go +++ b/pkg/pusher/pusher_test.go @@ -333,7 +333,7 @@ func TestPusherRetryShallow(t *testing.T) { // generate a chunk at PO 1 with closestPeer, meaning that we get a // receipt which is shallower than the pivot peer's depth, resulting // in retries - chunk := testingc.GenerateTestRandomChunkAt(t, closestPeer, 1) + chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, 1) storer.chunks <- chunk diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index c687a544727..94f3c8868af 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -85,6 +85,7 @@ type PushSync struct { store Storer topologyDriver topology.Driver unwrap func(swarm.Chunk) + gsocHandler func(*soc.SOC) logger log.Logger accounting accounting.Interface pricer pricer.Interface @@ -114,6 +115,7 @@ func New( topology topology.Driver, fullNode bool, unwrap func(swarm.Chunk), + gsocHandler func(*soc.SOC), validStamp postage.ValidStampFn, logger log.Logger, accounting accounting.Interface, @@ -132,6 +134,7 @@ func New( topologyDriver: topology, fullNode: fullNode, unwrap: unwrap, + gsocHandler: gsocHandler, logger: logger.WithName(loggerName).Register(), accounting: accounting, pricer: pricer, @@ -225,7 +228,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) if cac.Valid(chunk) { go ps.unwrap(chunk) - } else if !soc.Valid(chunk) { + } else if chunk, err := soc.FromChunk(chunk); err == nil { + ps.gsocHandler(chunk) + } else { return swarm.ErrInvalidChunk } @@ -319,7 +324,6 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re Nonce: r.Nonce, }, err } - if err != nil { return nil, err } @@ -357,6 +361,11 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo sentErrorsLeft = maxPushErrors } + idAddress, err := storage.IdentityAddress(ch) + if err != nil { + return nil, err + } + resultChan := make(chan receiptResult) retryC := make(chan struct{}, max(1, parallelForwards)) @@ -393,10 +402,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo // If no peer can be found from an origin peer, the origin peer may store the chunk. // Non-origin peers store the chunk if the chunk is within depth. // For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk. - fullSkip := append(skip.ChunkPeers(ch.Address()), ps.errSkip.ChunkPeers(ch.Address())...) + fullSkip := append(skip.ChunkPeers(idAddress), ps.errSkip.ChunkPeers(idAddress)...) peer, err := ps.closestPeer(ch.Address(), origin, fullSkip) if errors.Is(err, topology.ErrNotFound) { - if skip.PruneExpiresAfter(ch.Address(), overDraftRefresh) == 0 { //no overdraft peers, we have depleted ALL peers + if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { //no overdraft peers, we have depleted ALL peers if inflight == 0 { if ps.fullNode { if cac.Valid(ch) { @@ -433,7 +442,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo // all future requests should land directly into the neighborhood if neighborsOnly && peerPO < rad { - skip.Forever(ch.Address(), peer) + skip.Forever(idAddress, peer) continue } @@ -450,10 +459,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo action, err := ps.prepareCredit(ctx, peer, ch, origin) if err != nil { retry() - skip.Add(ch.Address(), peer, overDraftRefresh) + skip.Add(idAddress, peer, overDraftRefresh) continue } - skip.Forever(ch.Address(), peer) + skip.Forever(idAddress, peer) ps.metrics.TotalSendAttempts.Inc() inflight++ @@ -461,7 +470,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo go ps.push(ctx, resultChan, peer, ch, action) case result := <-resultChan: - inflight-- ps.measurePushPeer(result.pushTime, result.err) @@ -471,16 +479,16 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo case err == nil: return result.receipt, nil case errors.Is(err, ErrShallowReceipt): - ps.errSkip.Add(ch.Address(), result.peer, skiplistDur) + ps.errSkip.Add(idAddress, result.peer, skiplistDur) return result.receipt, err } } ps.metrics.TotalFailedSendAttempts.Inc() - ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err) + ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "id_address", idAddress, "peer_address", result.peer, "error", result.err) sentErrorsLeft-- - ps.errSkip.Add(ch.Address(), result.peer, skiplistDur) + ps.errSkip.Add(idAddress, result.peer, skiplistDur) retry() } diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 9b1ee648d3f..8773c693de0 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -25,6 +25,7 @@ import ( pricermock "github.com/ethersphere/bee/v2/pkg/pricer/mock" "github.com/ethersphere/bee/v2/pkg/pushsync" "github.com/ethersphere/bee/v2/pkg/pushsync/pb" + "github.com/ethersphere/bee/v2/pkg/soc" storage "github.com/ethersphere/bee/v2/pkg/storage" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -110,6 +111,96 @@ func TestPushClosest(t *testing.T) { } } +// TestSocListener listens all payload of a SOC. This triggers sending a chunk to the closest node +// and expects a receipt. The message is intercepted in the outgoing stream to check for correctness. +func TestSocListener(t *testing.T) { + t.Parallel() + defaultSigner := cryptomock.New(cryptomock.WithSignFunc(func(addr []byte) ([]byte, error) { + key, _ := crypto.GenerateSecp256k1Key() + signer := crypto.NewDefaultSigner(key) + signature, _ := signer.Sign(addr) + + return signature, nil + })) + + // chunk data to upload + privKey, err := crypto.DecodeSecp256k1PrivateKey(swarm.MustParseHexAddress("b0baf37700000000000000000000000000000000000000000000000000000000").Bytes()) + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + chunk1 := testingc.FixtureChunk("7000") + chunk2 := testingc.FixtureChunk("0033") + id := make([]byte, swarm.HashSize) + s1 := soc.New(id, chunk1) + s2 := soc.New(id, chunk2) + sch1, err := s1.Sign(signer) + if err != nil { + t.Fatal(err) + } + sch1 = sch1.WithStamp(chunk1.Stamp()) + sch2, err := s2.Sign(signer) + if err != nil { + t.Fatal(err) + } + sch2 = sch2.WithStamp(chunk2.Stamp()) + expectedPayload := chunk1.Data() + gsocListener := func(soc *soc.SOC) { + if !bytes.Equal(soc.WrappedChunk().Data(), expectedPayload) { + t.Fatalf("unexpected SOC payload on GSOC listener. got %s, want %s", soc.WrappedChunk().Data(), expectedPayload) + } + } + + // create a pivot node and a mocked closest node + pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 + closestPeer := swarm.MustParseHexAddress("8000000000000000000000000000000000000000000000000000000000000000") // binary 1000 -> po 1 + + // peer is the node responding to the chunk receipt message + // mock should return ErrWantSelf since there's no one to forward to + psPeer, _, _ := createGsocPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) + + recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) + + // pivot node needs the streamer since the chunk is intercepted by + // the chunk worker, then gets sent by opening a new stream + psPivot, _, _ := createGsocPushSyncNode(t, pivotNode, defaultPrices, recorder, gsocListener, defaultSigner, mock.WithClosestPeer(closestPeer)) + + // Trigger the sending of chunk to the closest node + receipt, err := psPivot.PushChunkToClosest(context.Background(), sch1) + if err != nil { + t.Fatal(err) + } + + if !sch1.Address().Equal(receipt.Address) { + t.Fatal("invalid receipt") + } + + // this intercepts the outgoing delivery message + waitOnRecordAndTest(t, closestPeer, recorder, sch1.Address(), sch1.Data()) + + // this intercepts the incoming receipt message + waitOnRecordAndTest(t, closestPeer, recorder, sch1.Address(), nil) + + recorder.Reset() + expectedPayload = chunk2.Data() + + // Trigger the sending of chunk to the closest node + receipt, err = psPivot.PushChunkToClosest(context.Background(), sch2) + if err != nil { + t.Fatal(err) + } + + if !sch2.Address().Equal(receipt.Address) { + t.Fatal("invalid receipt") + } + + // this intercepts the outgoing delivery message + waitOnRecordAndTest(t, closestPeer, recorder, sch2.Address(), sch2.Data()) + + // this intercepts the incoming receipt message + waitOnRecordAndTest(t, closestPeer, recorder, sch2.Address(), nil) +} + // TestShallowReceipt forces the peer to send back a shallow receipt to a pushsync request. In return, the origin node returns the error along with the received receipt. func TestShallowReceipt(t *testing.T) { t.Parallel() @@ -377,7 +468,7 @@ func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) { }), ) - psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), pivotAccounting, log.Noop, mock.WithPeers(peer1, peer2, peer3, peer4)) + psPivot, pivotStorer := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), pivotAccounting, log.Noop, func(*soc.SOC) {}, mock.WithPeers(peer1, peer2, peer3, peer4)) // Trigger the sending of chunk to the closest node receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) @@ -554,15 +645,15 @@ func TestPropagateErrMsg(t *testing.T) { captureLogger := log.NewLogger("test", log.WithSink(buf)) // Create the closest peer - psClosestPeer, _ := createPushSyncNodeWithAccounting(t, closestPeer, defaultPrices, nil, nil, faultySigner, accountingmock.NewAccounting(), log.Noop, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psClosestPeer, _ := createPushSyncNodeWithAccounting(t, closestPeer, defaultPrices, nil, nil, faultySigner, accountingmock.NewAccounting(), log.Noop, func(*soc.SOC) {}, mock.WithClosestPeerErr(topology.ErrWantSelf)) // creating the pivot peer - psPivot, _ := createPushSyncNodeWithAccounting(t, pivotPeer, defaultPrices, nil, nil, defaultSigner(chunk), accountingmock.NewAccounting(), log.Noop, mock.WithPeers(closestPeer)) + psPivot, _ := createPushSyncNodeWithAccounting(t, pivotPeer, defaultPrices, nil, nil, defaultSigner(chunk), accountingmock.NewAccounting(), log.Noop, func(*soc.SOC) {}, mock.WithPeers(closestPeer)) combinedRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol(), psClosestPeer.Protocol()), streamtest.WithBaseAddr(triggerPeer)) // Creating the trigger peer - psTriggerPeer, _ := createPushSyncNodeWithAccounting(t, triggerPeer, defaultPrices, combinedRecorder, nil, defaultSigner(chunk), accountingmock.NewAccounting(), captureLogger, mock.WithPeers(pivotPeer)) + psTriggerPeer, _ := createPushSyncNodeWithAccounting(t, triggerPeer, defaultPrices, combinedRecorder, nil, defaultSigner(chunk), accountingmock.NewAccounting(), captureLogger, func(*soc.SOC) {}, mock.WithPeers(pivotPeer)) _, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk) if err == nil { @@ -738,7 +829,22 @@ func createPushSyncNode( ) (*pushsync.PushSync, *testStorer, accounting.Interface) { t.Helper() mockAccounting := accountingmock.NewAccounting() - ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, log.Noop, mockOpts...) + ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, log.Noop, func(*soc.SOC) {}, mockOpts...) + return ps, mstorer, mockAccounting +} + +func createGsocPushSyncNode( + t *testing.T, + addr swarm.Address, + prices pricerParameters, + recorder *streamtest.Recorder, + gsocListener func(*soc.SOC), + signer crypto.Signer, + mockOpts ...mock.Option, +) (*pushsync.PushSync, *testStorer, accounting.Interface) { + t.Helper() + mockAccounting := accountingmock.NewAccounting() + ps, mstorer := createPushSyncNodeWithAccounting(t, addr, prices, recorder, nil, signer, mockAccounting, log.Noop, gsocListener, mockOpts...) return ps, mstorer, mockAccounting } @@ -772,7 +878,7 @@ func createPushSyncNodeWithRadius( radiusFunc := func() (uint8, error) { return radius, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, -1) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, -1) t.Cleanup(func() { ps.Close() }) return ps, storer @@ -787,6 +893,7 @@ func createPushSyncNodeWithAccounting( signer crypto.Signer, acct accounting.Interface, logger log.Logger, + gsocListener func(*soc.SOC), mockOpts ...mock.Option, ) (*pushsync.PushSync, *testStorer) { t.Helper() @@ -802,6 +909,9 @@ func createPushSyncNodeWithAccounting( if unwrap == nil { unwrap = func(swarm.Chunk) {} } + if gsocListener == nil { + gsocListener = func(*soc.SOC) {} + } validStamp := func(ch swarm.Chunk) (swarm.Chunk, error) { return ch, nil @@ -809,7 +919,7 @@ func createPushSyncNodeWithAccounting( radiusFunc := func() (uint8, error) { return 0, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, -1) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, -1) t.Cleanup(func() { ps.Close() }) return ps, storer diff --git a/pkg/steward/steward.go b/pkg/steward/steward.go index 9726ed1baa3..f318711d8ff 100644 --- a/pkg/steward/steward.go +++ b/pkg/steward/steward.go @@ -61,7 +61,7 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post return err } - stamp, err := stamper.Stamp(c.Address()) + stamp, err := stamper.Stamp(c.Address(), c.Address()) if err != nil { return fmt.Errorf("stamping chunk %s: %w", c.Address(), err) } diff --git a/pkg/storage/chunkstore.go b/pkg/storage/chunkstore.go index 72f8b9ba784..68a9d10652a 100644 --- a/pkg/storage/chunkstore.go +++ b/pkg/storage/chunkstore.go @@ -42,7 +42,7 @@ type Hasser interface { // Replacer is the interface that wraps the basic Replace method. type Replacer interface { // Replace a chunk in the store. - Replace(context.Context, swarm.Chunk) error + Replace(context.Context, swarm.Chunk, bool) error } // PutterFunc type is an adapter to allow the use of diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index 7d49e63c279..3ec2b8e8a6d 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -77,13 +77,17 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { return nil } -func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error { +func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error { c.mu.Lock() defer c.mu.Unlock() chunkCount := c.chunks[ch.Address().ByteString()] chunkCount.chunk = ch + if emplace { + chunkCount.count++ + } c.chunks[ch.Address().ByteString()] = chunkCount + return nil } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 1373e23e4e9..312364e2676 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -18,6 +18,7 @@ import ( var ( ErrOverwriteNewerChunk = errors.New("overwriting chunk with newer timestamp") + ErrUnknownChunkType = errors.New("unknown chunk type") ) // Result represents the item returned by the read operation, which returns @@ -293,3 +294,35 @@ func ChunkType(ch swarm.Chunk) swarm.ChunkType { } return swarm.ChunkTypeUnspecified } + +// IdentityAddress returns the internally used address for the chunk +// since the single owner chunk address is not a unique identifier for the chunk, +// but hashing the soc address and the wrapped chunk address is. +// it is used in the reserve sampling and other places where a key is needed to represent a chunk. +func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) { + + if cac.Valid(chunk) { + return chunk.Address(), nil + } + + // check the chunk is single owner chunk or cac + if sch, err := soc.FromChunk(chunk); err == nil { + socAddress, err := sch.Address() + if err != nil { + return swarm.ZeroAddress, err + } + h := swarm.NewHasher() + _, err = h.Write(socAddress.Bytes()) + if err != nil { + return swarm.ZeroAddress, err + } + _, err = h.Write(sch.WrappedChunk().Address().Bytes()) + if err != nil { + return swarm.ZeroAddress, err + } + + return swarm.NewAddress(h.Sum(nil)), nil + } + + return swarm.ZeroAddress, fmt.Errorf("identity address failed on chunk %s: %w", chunk, ErrUnknownChunkType) +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go new file mode 100644 index 00000000000..6ca04a8342f --- /dev/null +++ b/pkg/storage/storage_test.go @@ -0,0 +1,79 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package storage_test + +import ( + "encoding/hex" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestIdentityAddress(t *testing.T) { + t.Run("single owner chunk", func(t *testing.T) { + // Create a single owner chunk (SOC) + owner := common.HexToAddress("8d3766440f0d7b949a5e32995d09619a7f86e632") + // signature of hash(id + chunk address of foo) + sig, err := hex.DecodeString("5acd384febc133b7b245e5ddc62d82d2cded9182d2716126cd8844509af65a053deb418208027f548e3e88343af6f84a8772fb3cebc0a1833a0ea7ec0c1348311b") + if err != nil { + t.Fatal(err) + } + id := make([]byte, swarm.HashSize) + copy(id, []byte("id")) + payload := []byte("foo") + ch, err := cac.New(payload) + if err != nil { + t.Fatal(err) + } + sch, err := soc.NewSigned(id, ch, owner.Bytes(), sig) + if err != nil { + t.Fatal(err) + } + schChunk, err := sch.Chunk() + if err != nil { + t.Fatal(err) + } + schAddress, err := sch.Address() + if err != nil { + t.Fatal(err) + } + + idAddr, err := storage.IdentityAddress(schChunk) + if err != nil { + t.Fatalf("IdentityAddress returned error: %v", err) + } + + if idAddr.IsZero() { + t.Fatalf("expected non-zero address, got zero address") + } + + if idAddr.Equal(schAddress) { + t.Fatalf("expected identity address to be different from SOC address") + } + }) + + t.Run("content addressed chunk", func(t *testing.T) { + // Create a content addressed chunk (CAC) + data := []byte("data") + cacChunk, err := cac.New(data) + if err != nil { + t.Fatalf("failed to create content addressed chunk: %v", err) + } + + // Call IdentityAddress with the CAC + addr, err := storage.IdentityAddress(cacChunk) + if err != nil { + t.Fatalf("IdentityAddress returned error: %v", err) + } + + // Verify the address matches the CAC address + if !addr.Equal(cacChunk.Address()) { + t.Fatalf("expected address %s, got %s", cacChunk.Address(), addr) + } + }) +} diff --git a/pkg/storage/testing/chunk.go b/pkg/storage/testing/chunk.go index f51c1079974..c5e83f845aa 100644 --- a/pkg/storage/testing/chunk.go +++ b/pkg/storage/testing/chunk.go @@ -91,10 +91,13 @@ func GenerateTestRandomChunkAt(tb testing.TB, target swarm.Address, po int) swar addr := swarm.RandAddressAt(tb, target, po) stamp := postagetesting.MustNewStamp() return swarm.NewChunk(addr, data).WithStamp(stamp) + } // GenerateTestRandomChunkAt generates an invalid (!) chunk with address of proximity order po wrt target. -func GenerateValidRandomChunkAt(target swarm.Address, po int) swarm.Chunk { +func GenerateValidRandomChunkAt(tb testing.TB, target swarm.Address, po int) swarm.Chunk { + tb.Helper() + data := make([]byte, swarm.ChunkSize) var ch swarm.Chunk diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 2f3857d2ed3..16b7f011c3e 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -181,6 +181,44 @@ func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID return stamp, nil } +// LoadWithStampHash returns swarm.Stamp related to the given address and stamphash. +func LoadWithStampHash(s storage.Reader, scope string, addr swarm.Address, hash []byte) (swarm.Stamp, error) { + var stamp swarm.Stamp + + found := false + err := s.Iterate( + storage.Query{ + Factory: func() storage.Item { + return &Item{ + scope: []byte(scope), + address: addr, + } + }, + }, + func(res storage.Result) (bool, error) { + item := res.Entry.(*Item) + h, err := item.stamp.Hash() + if err != nil { + return false, err + } + if bytes.Equal(hash, h) { + stamp = item.stamp + found = true + return true, nil + } + return false, nil + }, + ) + if err != nil { + return nil, err + } + if !found { + return nil, fmt.Errorf("stamp not found for hash %x: %w", hash, storage.ErrNotFound) + } + + return stamp, nil +} + // Store creates new or updated an existing stamp index // record related to the given scope and chunk. func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error { diff --git a/pkg/storer/internal/chunkstamp/chunkstamp_test.go b/pkg/storer/internal/chunkstamp/chunkstamp_test.go index 1167a56f10a..a000cfb8694 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp_test.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp_test.go @@ -185,7 +185,7 @@ func TestStoreLoadDelete(t *testing.T) { } }) - t.Run("load stored chunk stamp with batch id", func(t *testing.T) { + t.Run("load stored chunk stamp with batch id and hash", func(t *testing.T) { want := chunk.Stamp() have, err := chunkstamp.LoadWithBatchID(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()) @@ -196,6 +196,20 @@ func TestStoreLoadDelete(t *testing.T) { if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" { t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff) } + + h, err := want.Hash() + if err != nil { + t.Fatal(err) + } + + have, err = chunkstamp.LoadWithStampHash(ts.IndexStore(), ns, chunk.Address(), h) + if err != nil { + t.Fatalf("LoadWithBatchID(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" { + t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff) + } }) t.Run("delete stored stamp", func(t *testing.T) { diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index 67fee1e6d77..cc2743b97c4 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -94,7 +94,7 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. return s.Put(rIdx) } -func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { +func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk, emplace bool) error { rIdx := &RetrievalIndexItem{Address: ch.Address()} err := s.Get(rIdx) if err != nil { @@ -112,6 +112,9 @@ func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch sw } rIdx.Location = loc rIdx.Timestamp = uint64(time.Now().Unix()) + if emplace { + rIdx.RefCnt++ + } return s.Put(rIdx) } diff --git a/pkg/storer/internal/chunkstore/chunkstore_test.go b/pkg/storer/internal/chunkstore/chunkstore_test.go index 9e30c1af876..6787cf826ab 100644 --- a/pkg/storer/internal/chunkstore/chunkstore_test.go +++ b/pkg/storer/internal/chunkstore/chunkstore_test.go @@ -5,6 +5,7 @@ package chunkstore_test import ( + "bytes" "context" "errors" "fmt" @@ -13,7 +14,9 @@ import ( "os" "testing" + "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/sharky" + soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/storage" @@ -336,6 +339,77 @@ func TestChunkStore(t *testing.T) { } }) + t.Run("replace chunk", func(t *testing.T) { + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + ctx := context.Background() + + ch1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer).Chunk() + err = st.Run(context.Background(), func(s transaction.Store) error { + return s.ChunkStore().Put(ctx, ch1) + }) + if err != nil { + t.Fatal(err) + } + + tests := []struct { + data string + emplace bool + wantRefCount uint32 + }{ + { + data: "data1", + emplace: true, + wantRefCount: 2, + }, + { + data: "data2", + emplace: false, + wantRefCount: 2, + }, + { + data: "data3", + emplace: true, + wantRefCount: 3, + }, + } + + for _, tt := range tests { + ch2 := soctesting.GenerateMockSocWithSigner(t, []byte(tt.data), signer).Chunk() + if !ch1.Address().Equal(ch2.Address()) { + t.Fatal("chunk addresses don't match") + } + + err = st.Run(ctx, func(s transaction.Store) error { + return s.ChunkStore().Replace(ctx, ch2, tt.emplace) + }) + if err != nil { + t.Fatal(err) + } + + ch, err := st.ChunkStore().Get(ctx, ch2.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(ch.Data(), ch2.Data()) { + t.Fatalf("expected data override") + } + + rIdx := &chunkstore.RetrievalIndexItem{Address: ch2.Address()} + err = st.IndexStore().Get(rIdx) + if err != nil { + t.Fatal(err) + } + + if rIdx.RefCnt != tt.wantRefCount { + t.Fatalf("expected ref count %d, got %d", tt.wantRefCount, rIdx.RefCnt) + } + } + }) + t.Run("close store", func(t *testing.T) { err := st.Close() if err != nil { diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index d5d001e3440..da020784832 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -91,13 +91,14 @@ func New( } // Reserve Put has to handle multiple possible scenarios. -// 1. Since the same chunk may belong to different postage batches, the reserve will support one chunk to many postage -// batches relationship. +// 1. Since the same chunk may belong to different postage stamp indices, the reserve will support one chunk to many postage +// stamp indices relationship. // 2. A new chunk that shares the same stamp index belonging to the same batch with an already stored chunk will overwrite // the existing chunk if the new chunk has a higher stamp timestamp (regardless of batch type). -// 3. A new chunk that has the same address belonging to the same batch with an already stored chunk will overwrite the existing chunk +// 3. A new chunk that has the same address belonging to the same stamp index with an already stored chunk will overwrite the existing chunk // if the new chunk has a higher stamp timestamp (regardless of batch type and chunk type, eg CAC & SOC). func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { + // batchID lock, Put vs Eviction r.multx.Lock(string(chunk.Stamp().BatchID())) defer r.multx.Unlock(string(chunk.Stamp().BatchID())) @@ -117,126 +118,106 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } chunkType := storage.ChunkType(chunk) + bin := swarm.Proximity(r.baseAddr.Bytes(), chunk.Address().Bytes()) // bin lock r.multx.Lock(strconv.Itoa(int(bin))) defer r.multx.Unlock(strconv.Itoa(int(bin))) - var shouldIncReserveSize, shouldDecrReserveSize bool + var shouldIncReserveSize bool err = r.st.Run(ctx, func(s transaction.Store) error { + oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) if err != nil { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } - sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID()) - if err != nil && !errors.Is(err, storage.ErrNotFound) { - return err - } + // index collision + if loadedStampIndex { - // same chunk address, same batch - if sameAddressOldStamp != nil { - sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) - if err != nil { - return err - } - prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) + prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } - // index collision with another chunk - if loadedStampIndex { - prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) - if prev >= curr { - return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + r.logger.Debug( + "replacing chunk stamp index", + "old_chunk", oldStampIndex.ChunkAddress, + "new_chunk", chunk.Address(), + "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), + ) + + // same chunk address + if oldStampIndex.ChunkAddress.Equal(chunk.Address()) { + + oldStamp, err := chunkstamp.LoadWithStampHash(s.IndexStore(), reserveScope, oldStampIndex.ChunkAddress, oldStampIndex.StampHash) + if err != nil { + return err + } + + oldBatchRadiusItem := &BatchRadiusItem{ + Bin: bin, + Address: oldStampIndex.ChunkAddress, + BatchID: oldStampIndex.BatchID, + StampHash: oldStampIndex.StampHash, } - if !chunk.Address().Equal(oldStampIndex.ChunkAddress) { - r.logger.Debug( - "replacing chunk stamp index", - "old_chunk", oldStampIndex.ChunkAddress, - "new_chunk", chunk.Address(), - "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), - ) - // remove index items and chunk data - err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, oldStampIndex.BatchID, oldStampIndex.StampHash) - if err != nil { - return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) - } - shouldDecrReserveSize = true + // load item to get the binID + err = s.IndexStore().Get(oldBatchRadiusItem) + if err != nil { + return err } - } - oldBatchRadiusItem := &BatchRadiusItem{ - Bin: bin, - Address: chunk.Address(), - BatchID: sameAddressOldStampIndex.BatchID, - StampHash: sameAddressOldStampIndex.StampHash, - } - // load item to get the binID - err = s.IndexStore().Get(oldBatchRadiusItem) - if err != nil { - return err - } + // delete old chunk index items + err = errors.Join( + s.IndexStore().Delete(oldBatchRadiusItem), + s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), + stampindex.Delete(s.IndexStore(), reserveScope, oldStamp), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, oldStamp), + ) + if err != nil { + return err + } - // delete old chunk index items - err = errors.Join( - s.IndexStore().Delete(oldBatchRadiusItem), - s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), - stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp), - chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp), - ) - if err != nil { - return err - } + binID, err := r.IncBinID(s.IndexStore(), bin) + if err != nil { + return err + } - binID, err := r.IncBinID(s.IndexStore(), bin) - if err != nil { - return err - } + err = errors.Join( + stampindex.Store(s.IndexStore(), reserveScope, chunk), + chunkstamp.Store(s.IndexStore(), reserveScope, chunk), + s.IndexStore().Put(&BatchRadiusItem{ + Bin: bin, + BinID: binID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + StampHash: stampHash, + }), + s.IndexStore().Put(&ChunkBinItem{ + Bin: bin, + BinID: binID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + ChunkType: chunkType, + StampHash: stampHash, + }), + ) + if err != nil { + return err + } - err = errors.Join( - stampindex.Store(s.IndexStore(), reserveScope, chunk), - chunkstamp.Store(s.IndexStore(), reserveScope, chunk), - s.IndexStore().Put(&BatchRadiusItem{ - Bin: bin, - BinID: binID, - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - StampHash: stampHash, - }), - s.IndexStore().Put(&ChunkBinItem{ - Bin: bin, - BinID: binID, - Address: chunk.Address(), - BatchID: chunk.Stamp().BatchID(), - ChunkType: chunkType, - StampHash: stampHash, - }), - ) - if err != nil { - return err - } + if chunkType == swarm.ChunkTypeSingleOwner { + r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) + return s.ChunkStore().Replace(ctx, chunk, false) + } - if chunkType != swarm.ChunkTypeSingleOwner { return nil } - r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) - return s.ChunkStore().Replace(ctx, chunk) - } - - // different address, same batch, index collision - if loadedStampIndex { - prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev >= curr { - return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) - } - // An older (same or different) chunk with the same batchID and stamp index has been previously + // An older and different chunk with the same batchID and stamp index has been previously // saved to the reserve. We must do the below before saving the new chunk: // 1. Delete the old chunk from the chunkstore. // 2. Delete the old chunk's stamp data. @@ -248,13 +229,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) } - r.logger.Warning( - "replacing chunk stamp index", - "old_chunk", oldStampIndex.ChunkAddress, - "new_chunk", chunk.Address(), - "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), - ) - // replace old stamp index. err = stampindex.Store(s.IndexStore(), reserveScope, chunk) if err != nil { @@ -284,12 +258,31 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { ChunkType: chunkType, StampHash: stampHash, }), - s.ChunkStore().Put(ctx, chunk), ) if err != nil { return err } + var has bool + if chunkType == swarm.ChunkTypeSingleOwner { + has, err = s.ChunkStore().Has(ctx, chunk.Address()) + if err != nil { + return err + } + if has { + r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) + err = s.ChunkStore().Replace(ctx, chunk, true) + } else { + err = s.ChunkStore().Put(ctx, chunk) + } + } else { + err = s.ChunkStore().Put(ctx, chunk) + } + + if err != nil { + return err + } + if !loadedStampIndex { shouldIncReserveSize = true } @@ -302,9 +295,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { if shouldIncReserveSize { r.size.Add(1) } - if shouldDecrReserveSize { - r.size.Add(-1) - } return nil } @@ -323,7 +313,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s return nil, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, addr, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, addr, stampHash) if err != nil { return nil, err } @@ -425,7 +415,7 @@ func RemoveChunkWithItem( ) error { var errs error - stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveScope, item.Address, item.BatchID) + stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash) if stamp != nil { errs = errors.Join( stampindex.Delete(trx.IndexStore(), reserveScope, stamp), @@ -475,7 +465,7 @@ func (r *Reserve) IterateChunks(startBin uint8, cb func(swarm.Chunk) (bool, erro return false, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, item.Address, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, item.Address, item.StampHash) if err != nil { return false, err } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index d13bb47ab75..65ba6d55822 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -197,12 +197,14 @@ func TestSameChunkAddress(t *testing.T) { bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) binBinIDs[bin] += 1 err = r.Put(ctx, ch2) - if !errors.Is(err, storage.ErrOverwriteNewerChunk) { - t.Fatal("expected error") + if err != nil { + t.Fatal(err) } + bin2 := swarm.Proximity(baseAddr.Bytes(), ch2.Address().Bytes()) + binBinIDs[bin2] += 1 size2 := r.Size() - if size2-size1 != 1 { - t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + if size2-size1 != 2 { + t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1) } }) @@ -269,11 +271,20 @@ func TestSameChunkAddress(t *testing.T) { s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 6)) bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) + err := r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } binBinIDs[bin] += 2 - replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin]) + checkChunkInIndexStore(t, ts.IndexStore(), bin, binBinIDs[bin]-1, ch1) + checkChunkInIndexStore(t, ts.IndexStore(), bin, binBinIDs[bin], ch2) size2 := r.Size() - if size2-size1 != 1 { - t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + if size2-size1 != 2 { + t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1) } }) @@ -314,7 +325,7 @@ func TestSameChunkAddress(t *testing.T) { }) t.Run("chunk with different batchID remains untouched", func(t *testing.T) { - noReplace := func(ch1, ch2 swarm.Chunk) { + checkReplace := func(ch1, ch2 swarm.Chunk, replace bool) { t.Helper() err = r.Put(ctx, ch1) if err != nil { @@ -344,13 +355,12 @@ func TestSameChunkAddress(t *testing.T) { checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binBinIDs[bin] - 1}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binBinIDs[bin]}, false) - // expect new chunk to NOT replace old one ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(ch.Data(), ch1.Data()) { - t.Fatalf("expected chunk data to not be updated") + if replace && bytes.Equal(ch.Data(), ch1.Data()) { + t.Fatalf("expected chunk data to be updated") } } @@ -368,7 +378,7 @@ func TestSameChunkAddress(t *testing.T) { if !bytes.Equal(ch1.Address().Bytes(), ch2.Address().Bytes()) { t.Fatalf("expected chunk addresses to be the same") } - noReplace(ch1, ch2) + checkReplace(ch1, ch2, true) // cac batch = postagetesting.MustNewBatch() @@ -378,7 +388,7 @@ func TestSameChunkAddress(t *testing.T) { if !bytes.Equal(ch1.Address().Bytes(), ch2.Address().Bytes()) { t.Fatalf("expected chunk addresses to be the same") } - noReplace(ch1, ch2) + checkReplace(ch1, ch2, false) size2 := r.Size() if size2-size1 != 4 { t.Fatalf("expected reserve size to increase by 4, got %d", size2-size1) @@ -388,7 +398,7 @@ func TestSameChunkAddress(t *testing.T) { t.Run("same address but index collision with different chunk", func(t *testing.T) { size1 := r.Size() batch := postagetesting.MustNewBatch() - ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) err = r.Put(ctx, ch1) if err != nil { t.Fatal(err) @@ -403,7 +413,7 @@ func TestSameChunkAddress(t *testing.T) { signer := getSigner(t) s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) - ch2 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) + ch2 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 2)) err = r.Put(ctx, ch2) if err != nil { t.Fatal(err) @@ -421,8 +431,16 @@ func TestSameChunkAddress(t *testing.T) { checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: binBinIDs[bin1], StampHash: ch1StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: binBinIDs[bin2], StampHash: ch2StampHash}, false) + // attempt to replace existing (unrelated) chunk that has timestamp s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) - ch3 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 2)) + ch3 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + err = r.Put(ctx, ch3) + if !errors.Is(err, storage.ErrOverwriteNewerChunk) { + t.Fatal("expected error") + } + + s2 = soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch3 = s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 3)) err = r.Put(ctx, ch3) if err != nil { t.Fatal(err) @@ -435,17 +453,18 @@ func TestSameChunkAddress(t *testing.T) { ch3BinID := binBinIDs[bin2] checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, true) + // different index, same batch + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch3.Stamp().BatchID(), Address: ch3.Address(), StampHash: ch3StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: ch1BinID, StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch3BinID, StampHash: ch3StampHash}, false) size2 := r.Size() - // (ch1 + ch2) == 2 and then ch3 reduces reserve size by 1 - if size2-size1 != 1 { - t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + // (ch1 + ch2) == 2 + if size2-size1 != 2 { + t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1) } }) @@ -583,6 +602,73 @@ func TestEvict(t *testing.T) { } } +func TestEvictSOC(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + signer := getSigner(t) + + var chunks []swarm.Chunk + + for i := 0; i < 10; i++ { + ch := soctesting.GenerateMockSocWithSigner(t, []byte{byte(i)}, signer).Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, uint64(i), uint64(i))) + chunks = append(chunks, ch) + err := r.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + bin := swarm.Proximity(baseAddr.Bytes(), chunks[0].Address().Bytes()) + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, false) + checkChunk(t, ts, ch, false) + } + + _, err = r.EvictBatchBin(context.Background(), batch.ID, 1, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0 + + evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + if evicted != 9 { + t.Fatalf("wanted evicted count 10, got %d", evicted) + } + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, true) + checkChunk(t, ts, ch, true) + } +} + func TestEvictMaxCount(t *testing.T) { t.Parallel() @@ -899,7 +985,12 @@ func checkChunk(t *testing.T, s transaction.ReadOnlyStore, ch swarm.Chunk, gone t.Fatal(err) } - _, err = chunkstamp.LoadWithBatchID(s.IndexStore(), "reserve", ch.Address(), ch.Stamp().BatchID()) + hash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + _, err = chunkstamp.LoadWithStampHash(s.IndexStore(), "reserve", ch.Address(), hash) if !gone && err != nil { t.Fatal(err) } @@ -923,3 +1014,14 @@ func getSigner(t *testing.T) crypto.Signer { } return crypto.NewDefaultSigner(privKey) } + +func checkChunkInIndexStore(t *testing.T, s storage.Reader, bin uint8, binId uint64, ch swarm.Chunk) { + t.Helper() + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + checkStore(t, s, &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false) + checkStore(t, s, &reserve.ChunkBinItem{Bin: bin, BinID: binId, StampHash: stampHash}, false) +} diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 2dc0c7c4bcd..7fb247da391 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -242,11 +242,11 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } -func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) { +func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk, emplace bool) (err error) { defer handleMetric("chunkstore_replace", c.metrics)(&err) unlock := c.lock(ch.Address()) defer unlock() - return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch) + return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch, emplace) } func (c *chunkStoreTrx) lock(addr swarm.Address) func() { diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 8a45774e704..ae6838fc1e7 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -199,7 +199,7 @@ func ReserveRepairer( item.BinID = newID(int(item.Bin)) if bytes.Equal(item.StampHash, swarm.EmptyAddress.Bytes()) { - stamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), "reserve", item.Address, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(s.IndexStore(), "reserve", item.Address, item.StampHash) if err != nil { return err } diff --git a/pkg/storer/mock/mockstorer.go b/pkg/storer/mock/mockstorer.go index 6ab457ab759..d0b5b7e6ad0 100644 --- a/pkg/storer/mock/mockstorer.go +++ b/pkg/storer/mock/mockstorer.go @@ -10,9 +10,9 @@ import ( "time" "github.com/ethersphere/bee/v2/pkg/pusher" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "go.uber.org/atomic" ) @@ -231,3 +231,7 @@ func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) { func (m *mockStorer) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) { return nil, nil } + +func (m *mockStorer) Put(ctx context.Context, ch swarm.Chunk) error { + return m.chunkStore.Put(ctx, ch) +} diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 79b38ac2ecd..c354f92cdbe 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -695,7 +695,7 @@ func TestNeighborhoodStats(t *testing.T) { putChunks := func(addr swarm.Address, startingRadius int, st *storer.DB) { putter := st.ReservePutter() for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(addr, startingRadius) + ch := chunk.GenerateValidRandomChunkAt(t, addr, startingRadius) err := putter.Put(context.Background(), ch) if err != nil { t.Fatal(err) diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go index 2f97aaab13f..07f4e36cd6d 100644 --- a/pkg/storer/sample_test.go +++ b/pkg/storer/sample_test.go @@ -27,7 +27,7 @@ func TestReserveSampler(t *testing.T) { var chs []swarm.Chunk for po := 0; po < maxPO; po++ { for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(3, 2, false) + ch := chunk.GenerateValidRandomChunkAt(t, baseAddr, po).WithBatch(3, 2, false) if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC ch = chunk.GenerateTestRandomSoChunk(t, ch) } @@ -156,7 +156,7 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { var chs []swarm.Chunk for po := startingRadius; po < maxPO; po++ { for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(3, 2, false) + ch := chunk.GenerateValidRandomChunkAt(t, baseAddr, po).WithBatch(3, 2, false) if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC ch = chunk.GenerateTestRandomSoChunk(t, ch) }