From 2b1c552fcad027c5bdc7f938e45450a6f7d750e1 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 16 Feb 2024 14:46:21 +0100 Subject: [PATCH] blockservice: add session workaround to work with wrapped blockservices --- blockservice/blockservice.go | 23 ++++++++---- blockservice/blockservice_test.go | 56 ++++++++++++++++++++++------ blockservice/providing_blockstore.go | 37 ++++++++++++++++++ 3 files changed, 96 insertions(+), 20 deletions(-) create mode 100644 blockservice/providing_blockstore.go diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 6fff661cb..aac14ba84 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -140,6 +140,11 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) // Blockstore returns the blockstore behind this blockservice. func (s *blockService) Blockstore() blockstore.Blockstore { + if s.provider != nil { + // FIXME: this is a hack remove once ipfs/boxo#567 is solved. + return providingBlockstore{s.blockstore, s.provider} + } + return s.blockstore } @@ -275,7 +280,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } - blockstore := bs.Blockstore() + provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs) block, err := blockstore.Get(ctx, c) switch { @@ -309,7 +314,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } } - if provider := grabProviderFromBlockservice(bs); provider != nil { + if provider != nil { err = provider.Provide(blk.Cid()) if err != nil { return nil, err @@ -360,7 +365,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet ks = ks2 } - bs := blockservice.Blockstore() + provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice) var misses []cid.Cid for _, c := range ks { @@ -388,7 +393,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } ex := blockservice.Exchange() - provider := grabProviderFromBlockservice(blockservice) var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -515,10 +519,13 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { return verifcid.DefaultAllowlist } -// grabProviderFromBlockservice can return nil if no provider is used. -func grabProviderFromBlockservice(bs BlockService) provider.Provider { +// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used. +func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) { + if bbs, ok := bs.(*blockService); ok { + return bbs.provider, bbs.blockstore + } if bbs, ok := bs.(ProvidingBlockService); ok { - return bbs.Provider() + return bbs.Provider(), bbs.Blockstore() } - return nil + return nil, bs.Blockstore() } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 2a2cb831b..b04c3df8d 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -289,18 +289,26 @@ func TestAllowlist(t *testing.T) { check(NewSession(ctx, blockservice).GetBlock) } +type wrappedBlockservice struct { + BlockService +} + type mockProvider []cid.Cid func (p *mockProvider) Provide(c cid.Cid) error { *p = append(*p, c) return nil } + func TestProviding(t *testing.T) { t.Parallel() a := assert.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bgen := butil.NewBlockGenerator() - blocks := bgen.Blocks(9) + blocks := bgen.Blocks(12) exchange := blockstore.NewBlockstore(ds.NewMapDatastore()) @@ -309,51 +317,75 @@ func TestProviding(t *testing.T) { var added []cid.Cid // Adding one block provide it. - a.NoError(blockservice.AddBlock(context.Background(), blocks[0])) + a.NoError(blockservice.AddBlock(ctx, blocks[0])) added = append(added, blocks[0].Cid()) blocks = blocks[1:] // Adding multiple blocks provide them. - a.NoError(blockservice.AddBlocks(context.Background(), blocks[0:2])) + a.NoError(blockservice.AddBlocks(ctx, blocks[0:2])) added = append(added, blocks[0].Cid(), blocks[1].Cid()) blocks = blocks[2:] // Downloading one block provide it. - a.NoError(exchange.Put(context.Background(), blocks[0])) - _, err := blockservice.GetBlock(context.Background(), blocks[0].Cid()) + a.NoError(exchange.Put(ctx, blocks[0])) + _, err := blockservice.GetBlock(ctx, blocks[0].Cid()) a.NoError(err) added = append(added, blocks[0].Cid()) blocks = blocks[1:] // Downloading multiple blocks provide them. - a.NoError(exchange.PutMany(context.Background(), blocks[0:2])) + a.NoError(exchange.PutMany(ctx, blocks[0:2])) cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} var got []cid.Cid - for b := range blockservice.GetBlocks(context.Background(), cids) { + for b := range blockservice.GetBlocks(ctx, cids) { got = append(got, b.Cid()) } added = append(added, cids...) a.ElementsMatch(cids, got) blocks = blocks[2:] - session := NewSession(context.Background(), blockservice) + session := NewSession(ctx, blockservice) // Downloading one block over a session provide it. - a.NoError(exchange.Put(context.Background(), blocks[0])) - _, err = session.GetBlock(context.Background(), blocks[0].Cid()) + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) a.NoError(err) added = append(added, blocks[0].Cid()) blocks = blocks[1:] // Downloading multiple blocks over a session provide them. - a.NoError(exchange.PutMany(context.Background(), blocks[0:2])) + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + blocks = blocks[2:] + + // Test wrapping the blockservice like nopfs does. + session = NewSession(ctx, wrappedBlockservice{blockservice}) + + // Downloading one block over a wrapped blockservice session provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a wrapped blockservice session provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} got = nil - for b := range session.GetBlocks(context.Background(), cids) { + for b := range session.GetBlocks(ctx, cids) { got = append(got, b.Cid()) } a.ElementsMatch(cids, got) added = append(added, cids...) + blocks = blocks[2:] + + a.Empty(blocks) a.ElementsMatch(added, []cid.Cid(prov)) } diff --git a/blockservice/providing_blockstore.go b/blockservice/providing_blockstore.go new file mode 100644 index 000000000..7435f8ae2 --- /dev/null +++ b/blockservice/providing_blockstore.go @@ -0,0 +1,37 @@ +package blockservice + +import ( + "context" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/provider" + blocks "github.com/ipfs/go-block-format" +) + +var _ blockstore.Blockstore = providingBlockstore{} + +type providingBlockstore struct { + blockstore.Blockstore + provider provider.Provider +} + +func (pbs providingBlockstore) Put(ctx context.Context, b blocks.Block) error { + if err := pbs.Blockstore.Put(ctx, b); err != nil { + return err + } + + return pbs.provider.Provide(b.Cid()) +} + +func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) error { + if err := pbs.Blockstore.PutMany(ctx, b); err != nil { + return err // what are the semantics here, did some blocks were put ? assume PutMany is atomic + } + + for _, b := range b { + if err := pbs.provider.Provide(b.Cid()); err != nil { + return err // this can only error if the whole provider is done for + } + } + return nil +}