Skip to content

Commit

Permalink
blockservice: add WithContentBlocker option
Browse files Browse the repository at this point in the history
The goal is to help with ipfs-shipyard/nopfs#34.
  • Loading branch information
Jorropo committed Jan 15, 2024
1 parent 4d08813 commit ec8bcb6
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes:
### Added

- `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context.
- `blockservice` now has `WithContentBlocker` option which allows to filter Add and Get requests by CID.

### Changed

Expand Down
76 changes: 72 additions & 4 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,24 @@ type BoundedBlockService interface {
Allowlist() verifcid.Allowlist
}

// Blocker returns err != nil if the CID is disallowed to be fetched or stored in blockservice.
// It returns an error so error messages could be passed.
type Blocker func(cid.Cid) error

// BlockedBlockService is a Blockservice bounded via an arbitrary cid [Blocker].
type BlockedBlockService interface {
BlockService

// Blocker might return [nil], then no blocking is to be done.
Blocker() Blocker
}

var _ BoundedBlockService = (*blockService)(nil)
var _ BlockedBlockService = (*blockService)(nil)

type blockService struct {
allowlist verifcid.Allowlist
blocker Blocker
blockstore blockstore.Blockstore
exchange exchange.Interface
// If checkFirst is true then first check that a block doesn't
Expand All @@ -99,6 +113,13 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option {
}
}

// WithContentBlocker allows to filter what blocks can be fetched or added to the blockservice.
func WithContentBlocker(blocker Blocker) Option {
return func(bs *blockService) {
bs.blocker = blocker
}
}

// New creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService {
if exchange == nil {
Expand Down Expand Up @@ -141,6 +162,10 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
return s.allowlist
}

func (s *blockService) Blocker() Blocker {
return s.blocker
}

// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a SessionExchange, a new exchange
Expand Down Expand Up @@ -171,6 +196,13 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
if err != nil {
return err
}

if s.blocker != nil {
if err := s.blocker(c); err != nil {
return err
}
}

if s.checkFirst {
if has, err := s.blockstore.Has(ctx, c); has || err != nil {
return err
Expand Down Expand Up @@ -198,10 +230,17 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {

// hash security
for _, b := range bs {
err := verifcid.ValidateCid(s.allowlist, b.Cid())
c := b.Cid()
err := verifcid.ValidateCid(s.allowlist, c)
if err != nil {
return err
}

if s.blocker != nil {
if err := s.blocker(c); err != nil {
return err
}
}
}
var toput []blocks.Block
if s.checkFirst {
Expand Down Expand Up @@ -261,6 +300,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}

if blocker := grabBlockerFromBlockservice(bs); blocker != nil {
if err := blocker(c); err != nil {
return nil, err
}
}

blockstore := bs.Blockstore()

block, err := blockstore.Get(ctx, c)
Expand Down Expand Up @@ -320,25 +365,41 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
defer close(out)

allowlist := grabAllowlistFromBlockservice(blockservice)
blocker := grabBlockerFromBlockservice(blockservice)

allValid := true
for _, c := range ks {
if err := verifcid.ValidateCid(allowlist, c); err != nil {
allValid = false
break
}

if blocker != nil {
if err := blocker(c); err != nil {
allValid = false
break
}
}
}

if !allValid {
// can't shift in place because we don't want to clobber callers.
ks2 := make([]cid.Cid, 0, len(ks))
for _, c := range ks {
// hash security
if err := verifcid.ValidateCid(allowlist, c); err == nil {
ks2 = append(ks2, c)
} else {
if err := verifcid.ValidateCid(allowlist, c); err != nil {
logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err)
continue

Check warning on line 392 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L392

Added line #L392 was not covered by tests
}

if blocker != nil {
if err := blocker(c); err != nil {
logger.Errorf("blocked CID (%s) passed to blockService.GetBlocks: %s", c, err)
continue
}
}

ks2 = append(ks2, c)
}
ks = ks2
}
Expand Down Expand Up @@ -525,3 +586,10 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
}
return verifcid.DefaultAllowlist

Check warning on line 587 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L587

Added line #L587 was not covered by tests
}

func grabBlockerFromBlockservice(bs BlockService) Blocker {
if bbs, ok := bs.(BlockedBlockService); ok {
return bbs.Blocker()
}
return nil

Check warning on line 594 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L594

Added line #L594 was not covered by tests
}
70 changes: 70 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockservice

import (
"context"
"errors"
"testing"

blockstore "github.com/ipfs/boxo/blockstore"
Expand Down Expand Up @@ -353,3 +354,72 @@ func TestContextSession(t *testing.T) {
"session must be deduped in all invocations on the same context",
)
}

func TestBlocker(t *testing.T) {
t.Parallel()
a := assert.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bgen := butil.NewBlockGenerator()
allowed := bgen.Next()
notAllowed := bgen.Next()

var disallowed = errors.New("disallowed")

bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
service := New(bs, nil, WithContentBlocker(func(c cid.Cid) error {
if c == notAllowed.Cid() {
return disallowed
}
return nil
}))

// try putting
a.NoError(service.AddBlock(ctx, allowed))
has, err := bs.Has(ctx, allowed.Cid())
a.NoError(err)
a.True(has, "block was not added even tho it is not blocked")
a.NoError(service.DeleteBlock(ctx, allowed.Cid()))

a.ErrorIs(service.AddBlock(ctx, notAllowed), disallowed)
has, err = bs.Has(ctx, notAllowed.Cid())
a.NoError(err)
a.False(has, "block was added even tho it is blocked")

a.NoError(service.AddBlocks(ctx, []blocks.Block{allowed}))
has, err = bs.Has(ctx, allowed.Cid())
a.NoError(err)
a.True(has, "block was not added even tho it is not blocked")
a.NoError(service.DeleteBlock(ctx, allowed.Cid()))

a.ErrorIs(service.AddBlocks(ctx, []blocks.Block{notAllowed}), disallowed)
has, err = bs.Has(ctx, notAllowed.Cid())
a.NoError(err)
a.False(has, "block was added even tho it is blocked")

// now try fetch
a.NoError(bs.Put(ctx, allowed))
a.NoError(bs.Put(ctx, notAllowed))

block, err := service.GetBlock(ctx, allowed.Cid())
a.NoError(err)
a.Equal(block.RawData(), allowed.RawData())

_, err = service.GetBlock(ctx, notAllowed.Cid())
a.ErrorIs(err, disallowed)

var gotAllowed bool
for block := range service.GetBlocks(ctx, []cid.Cid{allowed.Cid(), notAllowed.Cid()}) {
switch block.Cid() {
case allowed.Cid():
gotAllowed = true
case notAllowed.Cid():
t.Error("got disallowed block")
default:
t.Fatalf("got unrelated block: %s", block.Cid())
}
}
a.True(gotAllowed, "did not got allowed block")
}

0 comments on commit ec8bcb6

Please sign in to comment.