Skip to content

Commit

Permalink
blockservice: move session handling as part of the interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jan 15, 2024
1 parent b8ac21b commit 739e5e8
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 104 deletions.
145 changes: 59 additions & 86 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,23 @@ type BlockService interface {

// DeleteBlock deletes the given block from the blockservice.
DeleteBlock(ctx context.Context, o cid.Cid) error
}

// BoundedBlockService is a Blockservice bounded via strict multihash Allowlist.
type BoundedBlockService interface {
BlockService

Allowlist() verifcid.Allowlist
// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a [fetcher.SessionExchange], a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
NewSession(context.Context) BlockGetter

// ContextWithSession is creates a context with an embded session,
// future calls to [BlockService.GetBlock], [BlockService.GetBlocks] and [BlockService.NewSession]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
ContextWithSession(ctx context.Context) context.Context
}

var _ BoundedBlockService = (*blockService)(nil)

type blockService struct {
allowlist verifcid.Allowlist
blockstore blockstore.Blockstore
Expand Down Expand Up @@ -141,24 +147,25 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
return s.allowlist
}

// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a SessionExchange, a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
// Sessions are lazily setup, this is cheap.
func NewSession(ctx context.Context, bs BlockService) *Session {
ses := grabSessionFromContext(ctx, bs)
func (s *blockService) NewSession(ctx context.Context) BlockGetter {
ses := s.grabSessionFromContext(ctx)
if ses != nil {
return ses
}

return newSession(ctx, bs)
return s.newSession(ctx)
}

// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
func newSession(ctx context.Context, bs BlockService) *Session {
return &Session{bs: bs, sesctx: ctx}
func (s *blockService) newSession(ctx context.Context) *session {
return &session{bs: s, sesctx: ctx}
}

func (s *blockService) ContextWithSession(ctx context.Context) context.Context {
if s.grabSessionFromContext(ctx) != nil {
return ctx
}
return context.WithValue(ctx, s, s.newSession(ctx))
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
Expand Down Expand Up @@ -240,30 +247,27 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if ses := grabSessionFromContext(ctx, s); ses != nil {
if ses := s.grabSessionFromContext(ctx); ses != nil {
return ses.GetBlock(ctx, c)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s, s.getExchangeFetcher)
return s.getBlock(ctx, c, s.getExchangeFetcher)
}

// Look at what I have to do, no interface covariance :'(
func (s *blockService) getExchangeFetcher() exchange.Fetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security
func (s *blockService) getBlock(ctx context.Context, c cid.Cid, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(s.allowlist, c) // hash security
if err != nil {
return nil, err
}

blockstore := bs.Blockstore()

block, err := blockstore.Get(ctx, c)
block, err := s.blockstore.Get(ctx, c)
switch {
case err == nil:
return block, nil
Expand All @@ -285,12 +289,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}
// also write in the blockstore for caching, inform the exchange that the block is available
err = blockstore.Put(ctx, blk)
err = s.blockstore.Put(ctx, blk)
if err != nil {
return nil, err
}
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
if s.exchange != nil {
err = s.exchange.NotifyNewBlocks(ctx, blk)
if err != nil {
return nil, err
}
Expand All @@ -303,28 +307,26 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
// the returned channel.
// NB: No guarantees are made about order.
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
if ses := grabSessionFromContext(ctx, s); ses != nil {
if ses := s.grabSessionFromContext(ctx); ses != nil {
return ses.GetBlocks(ctx, ks)
}

ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s, s.getExchangeFetcher)
return s.getBlocks(ctx, ks, s.getExchangeFetcher)
}

func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
func (s *blockService) getBlocks(ctx context.Context, ks []cid.Cid, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
defer close(out)

allowlist := grabAllowlistFromBlockservice(blockservice)

var lastAllValidIndex int
var c cid.Cid
for lastAllValidIndex, c = range ks {
if err := verifcid.ValidateCid(allowlist, c); err != nil {
if err := verifcid.ValidateCid(s.allowlist, c); err != nil {
break
}
}
Expand All @@ -335,7 +337,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements
for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements
// hash security
if err := verifcid.ValidateCid(allowlist, c); err == nil {
if err := verifcid.ValidateCid(s.allowlist, c); err == nil {
ks2 = append(ks2, c)
} else {
logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err)
Expand All @@ -344,11 +346,9 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
ks = ks2
}

bs := blockservice.Blockstore()

var misses []cid.Cid
for _, c := range ks {
hit, err := bs.Get(ctx, c)
hit, err := s.blockstore.Get(ctx, c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -371,7 +371,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
return
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
Expand All @@ -386,16 +385,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
}

// write in the blockstore for caching
err = bs.Put(ctx, b)
err = s.blockstore.Put(ctx, b)
if err != nil {
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
return
}

if ex != nil {
if s.exchange != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
err = s.exchange.NotifyNewBlocks(ctx, cache[:]...)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
Expand Down Expand Up @@ -433,16 +432,16 @@ func (s *blockService) Close() error {
return s.exchange.Close()
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
// session is a helper type to provide higher level access to bitswap sessions
type session struct {
createSession sync.Once
bs BlockService
bs *blockService
ses exchange.Fetcher
sesctx context.Context
}

// grabSession is used to lazily create sessions.
func (s *Session) grabSession() exchange.Fetcher {
func (s *session) grabSession() exchange.Fetcher {
s.createSession.Do(func() {
defer func() {
s.sesctx = nil // early gc
Expand All @@ -465,64 +464,38 @@ func (s *Session) grabSession() exchange.Fetcher {
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
func (s *session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

return getBlock(ctx, c, s.bs, s.grabSession)
return s.bs.getBlock(ctx, c, s.grabSession)
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
func (s *session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "session.GetBlocks")
defer span.End()

return getBlocks(ctx, ks, s.bs, s.grabSession)
}

var _ BlockGetter = (*Session)(nil)

// ContextWithSession is a helper which creates a context with an embded session,
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
// will be redirected to this same session instead.
// Sessions are lazily setup, this is cheap.
// It wont make a new session if one exists already in the context.
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
if grabSessionFromContext(ctx, bs) != nil {
return ctx
}
return EmbedSessionInContext(ctx, newSession(ctx, bs))
return s.bs.getBlocks(ctx, ks, s.grabSession)
}

// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
return context.WithValue(ctx, ses.bs, ses)
}
var _ BlockGetter = (*session)(nil)

// grabSessionFromContext returns nil if the session was not found
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
// if this API is public it is too easy to forget to pass a [BlockService] or [session] object around in your app.
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
s := ctx.Value(bs)
func (s *blockService) grabSessionFromContext(ctx context.Context) *session {
ss := ctx.Value(s)
if s == nil {
return nil
}

ss, ok := s.(*Session)
sss, ok := ss.(*session)
if !ok {
// idk what to do here, that kinda sucks, giveup
return nil
}

return ss
}

// grabAllowlistFromBlockservice never returns nil
func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
if bbs, ok := bs.(BoundedBlockService); ok {
return bbs.Allowlist()
}
return verifcid.DefaultAllowlist
return sss
}
22 changes: 11 additions & 11 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestExchangeWrite(t *testing.T) {

for name, fetcher := range map[string]BlockGetter{
"blockservice": bserv,
"session": NewSession(context.Background(), bserv),
"session": bserv.NewSession(context.Background()),
} {
t.Run(name, func(t *testing.T) {
// GetBlock
Expand Down Expand Up @@ -133,9 +133,9 @@ func TestLazySessionInitialization(t *testing.T) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
session := offline.Exchange(bstore2)
ses := offline.Exchange(bstore2)
exch := offline.Exchange(bstore3)
sessionExch := &fakeSessionExchange{Interface: exch, session: session}
sessionExch := &fakeSessionExchange{Interface: exch, session: ses}
bservSessEx := New(bstore, sessionExch, WriteThrough())
bgen := butil.NewBlockGenerator()

Expand All @@ -149,12 +149,12 @@ func TestLazySessionInitialization(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = session.NotifyNewBlocks(ctx, block2)
err = ses.NotifyNewBlocks(ctx, block2)
if err != nil {
t.Fatal(err)
}

bsession := NewSession(ctx, bservSessEx)
bsession := bservSessEx.NewSession(ctx).(*session)
if bsession.ses != nil {
t.Fatal("Session exchange should not instantiated session immediately")
}
Expand All @@ -175,7 +175,7 @@ func TestLazySessionInitialization(t *testing.T) {
if returnedBlock.Cid() != block2.Cid() {
t.Fatal("Got incorrect block")
}
if bsession.ses != session {
if bsession.ses != ses {
t.Fatal("Should have initialized session to fetch block")
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestNilExchange(t *testing.T) {

bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bserv := New(bs, nil, WriteThrough())
sess := NewSession(ctx, bserv)
sess := bserv.NewSession(ctx)
_, err := sess.GetBlock(ctx, block.Cid())
if !ipld.IsNotFound(err) {
t.Fatal("expected block to not be found")
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestAllowlist(t *testing.T) {

blockservice := New(bs, nil, WithAllowlist(verifcid.NewAllowlist(map[uint64]bool{multihash.BLAKE3: true})))
check(blockservice.GetBlock)
check(NewSession(ctx, blockservice).GetBlock)
check(blockservice.NewSession(ctx).GetBlock)
}

type fakeIsNewSessionCreateExchange struct {
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestContextSession(t *testing.T) {

service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx)

ctx = ContextWithSession(ctx, service)
ctx = service.ContextWithSession(ctx)

b, err := service.GetBlock(ctx, block1.Cid())
a.NoError(err)
Expand All @@ -348,8 +348,8 @@ func TestContextSession(t *testing.T) {
a.False(sesEx.newSessionWasCalled, "session should be reused in context")

a.Equal(
NewSession(ctx, service),
NewSession(ContextWithSession(ctx, service), service),
service.NewSession(ctx),
service.NewSession(service.ContextWithSession(ctx)),
"session must be deduped in all invocations on the same context",
)
}
Loading

0 comments on commit 739e5e8

Please sign in to comment.