diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 7733788ecd..31f4e820ec 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -62,17 +62,23 @@ type BlockService interface { // DeleteBlock deletes the given block from the blockservice. DeleteBlock(ctx context.Context, o cid.Cid) error -} - -// BoundedBlockService is a Blockservice bounded via strict multihash Allowlist. -type BoundedBlockService interface { - BlockService - Allowlist() verifcid.Allowlist + // NewSession creates a new session that allows for + // controlled exchange of wantlists to decrease the bandwidth overhead. + // If the current exchange is a [fetcher.SessionExchange], a new exchange + // session will be created. Otherwise, the current exchange will be used + // directly. + // Sessions are lazily setup, this is cheap. + NewSession(context.Context) BlockGetter + + // ContextWithSession is creates a context with an embded session, + // future calls to [BlockService.GetBlock], [BlockService.GetBlocks] and [BlockService.NewSession] + // will be redirected to this same session instead. + // Sessions are lazily setup, this is cheap. + // It wont make a new session if one exists already in the context. + ContextWithSession(ctx context.Context) context.Context } -var _ BoundedBlockService = (*blockService)(nil) - type blockService struct { allowlist verifcid.Allowlist blockstore blockstore.Blockstore @@ -141,24 +147,25 @@ func (s *blockService) Allowlist() verifcid.Allowlist { return s.allowlist } -// NewSession creates a new session that allows for -// controlled exchange of wantlists to decrease the bandwidth overhead. -// If the current exchange is a SessionExchange, a new exchange -// session will be created. Otherwise, the current exchange will be used -// directly. -// Sessions are lazily setup, this is cheap. -func NewSession(ctx context.Context, bs BlockService) *Session { - ses := grabSessionFromContext(ctx, bs) +func (s *blockService) NewSession(ctx context.Context) BlockGetter { + ses := s.grabSessionFromContext(ctx) if ses != nil { return ses } - return newSession(ctx, bs) + return s.newSession(ctx) } // newSession is like [NewSession] but it does not attempt to reuse session from the existing context. -func newSession(ctx context.Context, bs BlockService) *Session { - return &Session{bs: bs, sesctx: ctx} +func (s *blockService) newSession(ctx context.Context) *session { + return &session{bs: s, sesctx: ctx} +} + +func (s *blockService) ContextWithSession(ctx context.Context) context.Context { + if s.grabSessionFromContext(ctx) != nil { + return ctx + } + return context.WithValue(ctx, s, s.newSession(ctx)) } // AddBlock adds a particular block to the service, Putting it into the datastore. @@ -240,30 +247,27 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - if ses := grabSessionFromContext(ctx, s); ses != nil { + if ses := s.grabSessionFromContext(ctx); ses != nil { return ses.GetBlock(ctx, c) } ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s, s.getExchangeFetcher) + return s.getBlock(ctx, c, s.getExchangeFetcher) } -// Look at what I have to do, no interface covariance :'( func (s *blockService) getExchangeFetcher() exchange.Fetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { - err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security +func (s *blockService) getBlock(ctx context.Context, c cid.Cid, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { + err := verifcid.ValidateCid(s.allowlist, c) // hash security if err != nil { return nil, err } - blockstore := bs.Blockstore() - - block, err := blockstore.Get(ctx, c) + block, err := s.blockstore.Get(ctx, c) switch { case err == nil: return block, nil @@ -285,12 +289,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } // also write in the blockstore for caching, inform the exchange that the block is available - err = blockstore.Put(ctx, blk) + err = s.blockstore.Put(ctx, blk) if err != nil { return nil, err } - if ex := bs.Exchange(); ex != nil { - err = ex.NotifyNewBlocks(ctx, blk) + if s.exchange != nil { + err = s.exchange.NotifyNewBlocks(ctx, blk) if err != nil { return nil, err } @@ -303,28 +307,26 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { - if ses := grabSessionFromContext(ctx, s); ses != nil { + if ses := s.grabSessionFromContext(ctx); ses != nil { return ses.GetBlocks(ctx, ks) } ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s, s.getExchangeFetcher) + return s.getBlocks(ctx, ks, s.getExchangeFetcher) } -func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { +func (s *blockService) getBlocks(ctx context.Context, ks []cid.Cid, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { defer close(out) - allowlist := grabAllowlistFromBlockservice(blockservice) - var lastAllValidIndex int var c cid.Cid for lastAllValidIndex, c = range ks { - if err := verifcid.ValidateCid(allowlist, c); err != nil { + if err := verifcid.ValidateCid(s.allowlist, c); err != nil { break } } @@ -335,7 +337,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements // hash security - if err := verifcid.ValidateCid(allowlist, c); err == nil { + if err := verifcid.ValidateCid(s.allowlist, c); err == nil { ks2 = append(ks2, c) } else { logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err) @@ -344,11 +346,9 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet ks = ks2 } - bs := blockservice.Blockstore() - var misses []cid.Cid for _, c := range ks { - hit, err := bs.Get(ctx, c) + hit, err := s.blockstore.Get(ctx, c) if err != nil { misses = append(misses, c) continue @@ -371,7 +371,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet return } - ex := blockservice.Exchange() var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -386,16 +385,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } // write in the blockstore for caching - err = bs.Put(ctx, b) + err = s.blockstore.Put(ctx, b) if err != nil { logger.Errorf("could not write blocks from the network to the blockstore: %s", err) return } - if ex != nil { + if s.exchange != nil { // inform the exchange that the blocks are available cache[0] = b - err = ex.NotifyNewBlocks(ctx, cache[:]...) + err = s.exchange.NotifyNewBlocks(ctx, cache[:]...) if err != nil { logger.Errorf("could not tell the exchange about new blocks: %s", err) return @@ -433,16 +432,16 @@ func (s *blockService) Close() error { return s.exchange.Close() } -// Session is a helper type to provide higher level access to bitswap sessions -type Session struct { +// session is a helper type to provide higher level access to bitswap sessions +type session struct { createSession sync.Once - bs BlockService + bs *blockService ses exchange.Fetcher sesctx context.Context } // grabSession is used to lazily create sessions. -func (s *Session) grabSession() exchange.Fetcher { +func (s *session) grabSession() exchange.Fetcher { s.createSession.Do(func() { defer func() { s.sesctx = nil // early gc @@ -465,64 +464,38 @@ func (s *Session) grabSession() exchange.Fetcher { } // GetBlock gets a block in the context of a request session -func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) +func (s *session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + ctx, span := internal.StartSpan(ctx, "session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s.bs, s.grabSession) + return s.bs.getBlock(ctx, c, s.grabSession) } // GetBlocks gets blocks in the context of a request session -func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { - ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") +func (s *session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + ctx, span := internal.StartSpan(ctx, "session.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s.bs, s.grabSession) -} - -var _ BlockGetter = (*Session)(nil) - -// ContextWithSession is a helper which creates a context with an embded session, -// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService] -// will be redirected to this same session instead. -// Sessions are lazily setup, this is cheap. -// It wont make a new session if one exists already in the context. -func ContextWithSession(ctx context.Context, bs BlockService) context.Context { - if grabSessionFromContext(ctx, bs) != nil { - return ctx - } - return EmbedSessionInContext(ctx, newSession(ctx, bs)) + return s.bs.getBlocks(ctx, ks, s.grabSession) } -// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. -func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { - // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. - return context.WithValue(ctx, ses.bs, ses) -} +var _ BlockGetter = (*session)(nil) // grabSessionFromContext returns nil if the session was not found // This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, -// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. +// if this API is public it is too easy to forget to pass a [BlockService] or [session] object around in your app. // By having this private we allow consumers to follow the trace of where the blockservice is passed and used. -func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { - s := ctx.Value(bs) +func (s *blockService) grabSessionFromContext(ctx context.Context) *session { + ss := ctx.Value(s) if s == nil { return nil } - ss, ok := s.(*Session) + sss, ok := ss.(*session) if !ok { // idk what to do here, that kinda sucks, giveup return nil } - return ss -} - -// grabAllowlistFromBlockservice never returns nil -func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { - if bbs, ok := bs.(BoundedBlockService); ok { - return bbs.Allowlist() - } - return verifcid.DefaultAllowlist + return sss } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 6591529d28..85d97124b8 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -67,7 +67,7 @@ func TestExchangeWrite(t *testing.T) { for name, fetcher := range map[string]BlockGetter{ "blockservice": bserv, - "session": NewSession(context.Background(), bserv), + "session": bserv.NewSession(context.Background()), } { t.Run(name, func(t *testing.T) { // GetBlock @@ -133,9 +133,9 @@ func TestLazySessionInitialization(t *testing.T) { bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - session := offline.Exchange(bstore2) + ses := offline.Exchange(bstore2) exch := offline.Exchange(bstore3) - sessionExch := &fakeSessionExchange{Interface: exch, session: session} + sessionExch := &fakeSessionExchange{Interface: exch, session: ses} bservSessEx := NewWriteThrough(bstore, sessionExch) bgen := butil.NewBlockGenerator() @@ -149,12 +149,12 @@ func TestLazySessionInitialization(t *testing.T) { if err != nil { t.Fatal(err) } - err = session.NotifyNewBlocks(ctx, block2) + err = ses.NotifyNewBlocks(ctx, block2) if err != nil { t.Fatal(err) } - bsession := NewSession(ctx, bservSessEx) + bsession := bservSessEx.NewSession(ctx).(*session) if bsession.ses != nil { t.Fatal("Session exchange should not instantiated session immediately") } @@ -175,7 +175,7 @@ func TestLazySessionInitialization(t *testing.T) { if returnedBlock.Cid() != block2.Cid() { t.Fatal("Got incorrect block") } - if bsession.ses != session { + if bsession.ses != ses { t.Fatal("Should have initialized session to fetch block") } } @@ -235,7 +235,7 @@ func TestNilExchange(t *testing.T) { bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bserv := NewWriteThrough(bs, nil) - sess := NewSession(ctx, bserv) + sess := bserv.NewSession(ctx) _, err := sess.GetBlock(ctx, block.Cid()) if !ipld.IsNotFound(err) { t.Fatal("expected block to not be found") @@ -286,7 +286,7 @@ func TestAllowlist(t *testing.T) { blockservice := New(bs, nil, WithAllowlist(verifcid.NewAllowlist(map[uint64]bool{multihash.BLAKE3: true}))) check(blockservice.GetBlock) - check(NewSession(ctx, blockservice).GetBlock) + check(blockservice.NewSession(ctx).GetBlock) } type fakeIsNewSessionCreateExchange struct { @@ -335,7 +335,7 @@ func TestContextSession(t *testing.T) { service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) - ctx = ContextWithSession(ctx, service) + ctx = service.ContextWithSession(ctx) b, err := service.GetBlock(ctx, block1.Cid()) a.NoError(err) @@ -348,8 +348,8 @@ func TestContextSession(t *testing.T) { a.False(sesEx.newSessionWasCalled, "session should be reused in context") a.Equal( - NewSession(ctx, service), - NewSession(ContextWithSession(ctx, service), service), + service.NewSession(ctx), + service.NewSession(service.ContextWithSession(ctx)), "session must be deduped in all invocations on the same context", ) } diff --git a/fetcher/impl/blockservice/fetcher.go b/fetcher/impl/blockservice/fetcher.go index a02e6ebbf6..b0fd4d9234 100644 --- a/fetcher/impl/blockservice/fetcher.go +++ b/fetcher/impl/blockservice/fetcher.go @@ -39,10 +39,10 @@ func NewFetcherConfig(blockService blockservice.BlockService) FetcherConfig { // NewSession creates a session from which nodes may be retrieved. // The session ends when the provided context is canceled. func (fc FetcherConfig) NewSession(ctx context.Context) fetcher.Fetcher { - return fc.FetcherWithSession(ctx, blockservice.NewSession(ctx, fc.blockService)) + return fc.FetcherWithSession(ctx, fc.blockService.NewSession(ctx)) } -func (fc FetcherConfig) FetcherWithSession(ctx context.Context, s *blockservice.Session) fetcher.Fetcher { +func (fc FetcherConfig) FetcherWithSession(ctx context.Context, s blockservice.BlockGetter) fetcher.Fetcher { ls := cidlink.DefaultLinkSystem() // while we may be loading blocks remotely, they are already hash verified by the time they load // into ipld-prime @@ -131,7 +131,7 @@ var DefaultPrototypeChooser = func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld return basicnode.Prototype.Any, nil } -func blockOpener(ctx context.Context, bs *blockservice.Session) ipld.BlockReadOpener { +func blockOpener(ctx context.Context, bs blockservice.BlockGetter) ipld.BlockReadOpener { return func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { cidLink, ok := lnk.(cidlink.Link) if !ok { diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index fe188ae717..47e2db697a 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -687,7 +687,7 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { var _ WithContextHint = (*BlocksBackend)(nil) func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { - return blockservice.ContextWithSession(ctx, bb.blockService) + return bb.blockService.ContextWithSession(ctx) } func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index a227780ff2..cf361aa95e 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -133,7 +133,7 @@ func GetLinksDirect(serv format.NodeGetter) GetLinks { } type sesGetter struct { - bs *bserv.Session + bs bserv.BlockGetter decoder *legacy.Decoder } @@ -153,7 +153,7 @@ func (sg *sesGetter) GetMany(ctx context.Context, keys []cid.Cid) <-chan *format } // WrapSession wraps a blockservice session to satisfy the format.NodeGetter interface -func WrapSession(s *bserv.Session) format.NodeGetter { +func WrapSession(s bserv.BlockGetter) format.NodeGetter { return &sesGetter{ bs: s, decoder: ipldLegacyDecoder, @@ -162,7 +162,7 @@ func WrapSession(s *bserv.Session) format.NodeGetter { // Session returns a NodeGetter using a new session for block fetches. func (n *dagService) Session(ctx context.Context) format.NodeGetter { - session := bserv.NewSession(ctx, n.Blocks) + session := n.Blocks.NewSession(ctx) return &sesGetter{ bs: session, decoder: n.decoder,