Skip to content

Commit

Permalink
chore: update boxo for bitswap providing refactor
Browse files Browse the repository at this point in the history
Keeps in sync with:
- ipfs/boxo#534
- ipfs/boxo#535
- ipfs/boxo#536
  • Loading branch information
Jorropo committed Jan 5, 2024
1 parent b215d73 commit 732d56c
Show file tree
Hide file tree
Showing 20 changed files with 108 additions and 69 deletions.
1 change: 0 additions & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ var bitswapStatCmd = &cmds.Command{
human, _ := req.Options[bitswapHumanOptionName].(bool)

fmt.Fprintln(w, "bitswap status")
fmt.Fprintf(w, "\tprovides buffer: %d / %d\n", s.ProvideBufLen, bitswap.HasBlockBufferSize)
fmt.Fprintf(w, "\tblocks received: %d\n", s.BlocksReceived)
fmt.Fprintf(w, "\tblocks sent: %d\n", s.BlocksSent)
if human {
Expand Down
8 changes: 1 addition & 7 deletions core/commands/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/ipfs/kubo/core"
"github.com/ipfs/kubo/core/commands/cmdenv"

bservice "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
ft "github.com/ipfs/boxo/ipld/unixfs"
mfs "github.com/ipfs/boxo/mfs"
Expand Down Expand Up @@ -162,11 +160,7 @@ var filesStatCmd = &cmds.Command{

var dagserv ipld.DAGService
if withLocal {
// an offline DAGService will not fetch from the network
dagserv = dag.NewDAGService(bservice.New(
node.Blockstore,
offline.Exchange(node.Blockstore),
))
dagserv = node.OfflineDAG

Check warning on line 163 in core/commands/files.go

View check run for this annotation

Codecov / codecov/patch

core/commands/files.go#L163

Added line #L163 was not covered by tests
} else {
dagserv = node.DAG
}
Expand Down
5 changes: 1 addition & 4 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"os"
"time"

bserv "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
verifcid "github.com/ipfs/boxo/verifcid"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -740,8 +738,7 @@ type pinVerifyOpts struct {
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan any, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
DAG := n.OfflineDAG

Check warning on line 741 in core/commands/pin/pin.go

View check run for this annotation

Codecov / codecov/patch

core/commands/pin/pin.go#L741

Added line #L741 was not covered by tests
getLinks := dag.GetLinksWithDAG(DAG)

var checkPin func(root cid.Cid) PinStatus
Expand Down
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ type IpfsNode struct {
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
OfflineBlocks bserv.BlockService `name:"offlineBlockService"` // blockservice which doesn't try to fetch from the network
DAG ipld.DAGService // the merkle dag service, get/add objects.
OfflineDAG ipld.DAGService `name:"offlineDagService"` // merkle dag service which doesn't try to fetch from the network
IPLDFetcherFactory fetcher.Factory `name:"ipldFetcher"` // fetcher that paths over the IPLD data model
UnixFSFetcherFactory fetcher.Factory `name:"unixfsFetcher"` // fetcher that interprets UnixFS data
OfflineIPLDFetcherFactory fetcher.Factory `name:"offlineIpldFetcher"` // fetcher that paths over the IPLD data model without fetching new blocks
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

if settings.Offline || !settings.FetchBlocks {
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
subAPI.blocks = bserv.New(subAPI.blockstore, nil, bserv.WithProvider(subAPI.provider))
subAPI.dag = dag.NewDAGService(subAPI.blocks)
}

Expand Down
9 changes: 5 additions & 4 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

blockservice "github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path"
"github.com/ipfs/boxo/provider"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
coreiface "github.com/ipfs/kubo/core/coreiface"
Expand Down Expand Up @@ -99,7 +99,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.D
}

if settings.Recursive {
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
err = provideKeysRec(ctx, api.routing, api.blockstore, api.provider, []cid.Cid{c})
} else {
err = provideKeys(ctx, api.routing, []cid.Cid{c})
}
Expand All @@ -120,12 +120,13 @@ func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
return nil
}

func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error {
func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, prov provider.Provider, cids []cid.Cid) error {
provided := cidutil.NewStreamingSet()

errCh := make(chan error)
go func() {
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
// FIXME: we are recreating a dag and blockservice, maybe offline varients should be shared ?
dserv := dag.NewDAGService(blockservice.New(bs, nil, blockservice.WithProvider(prov)))
for _, c := range cids {
err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

bserv "github.com/ipfs/boxo/blockservice"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path"
pin "github.com/ipfs/boxo/pinning/pinner"
Expand Down Expand Up @@ -195,7 +194,8 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro

visited := make(map[cid.Cid]*pinStatus)
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
// FIXME: we are recreating a dag and blockservice, maybe offline varients should be shared ?
DAG := merkledag.NewDAGService(bserv.New(bs, nil, bserv.WithProvider(api.provider)))
getLinks := merkledag.GetLinksWithDAG(DAG)

var checkPin func(root cid.Cid) *pinStatus
Expand Down
4 changes: 3 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
}
exch := api.exchange
pinning := api.pinning
prov := api.provider

if settings.OnlyHash {
node, err := getOrCreateNilNode()
Expand All @@ -115,9 +116,10 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
addblockstore = node.Blockstore
exch = node.Exchange
pinning = node.Pinning
prov = nil
}

bserv := blockservice.New(addblockstore, exch) // hash security 001
bserv := blockservice.New(addblockstore, exch, blockservice.WithProvider(prov)) // hash security 001
dserv := merkledag.NewDAGService(bserv)

// add a sync call to the DagService
Expand Down
6 changes: 2 additions & 4 deletions core/corehttp/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"net/http"
"time"

"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/gateway"
"github.com/ipfs/boxo/namesys"
Expand Down Expand Up @@ -79,7 +77,7 @@ func VersionOption() ServeOption {

func Libp2pGatewayOption() ServeOption {
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
bserv := blockservice.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
bserv := n.OfflineBlocks

Check warning on line 80 in core/corehttp/gateway.go

View check run for this annotation

Codecov / codecov/patch

core/corehttp/gateway.go#L80

Added line #L80 was not covered by tests

backend, err := gateway.NewBlocksBackend(bserv,
// GatewayOverLibp2p only returns things that are in local blockstore
Expand Down Expand Up @@ -118,7 +116,7 @@ func newGatewayBackend(n *core.IpfsNode) (gateway.IPFSBackend, error) {
pathResolver := n.UnixFSPathResolver

if cfg.Gateway.NoFetch {
bserv = blockservice.New(bserv.Blockstore(), offline.Exchange(bserv.Blockstore()))
bserv = n.OfflineBlocks

Check warning on line 119 in core/corehttp/gateway.go

View check run for this annotation

Codecov / codecov/patch

core/corehttp/gateway.go#L119

Added line #L119 was not covered by tests

cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
Expand Down
20 changes: 9 additions & 11 deletions core/node/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@ type bitswapOptionsOut struct {
BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"`
}

// BitswapOptions creates configuration options for Bitswap from the config file
// and whether to provide data.
func BitswapOptions(cfg *config.Config, provide bool) interface{} {
return func() bitswapOptionsOut {
// BitswapOptions creates configuration options for Bitswap from the config file.
func BitswapOptions(cfg *config.Config) fx.Option {
return fx.Provide(func(routing irouting.ProvideManyRouter) bitswapOptionsOut {
var internalBsCfg config.InternalBitswap
if cfg.Internal.Bitswap != nil {
internalBsCfg = *cfg.Internal.Bitswap
}

opts := []bitswap.Option{
bitswap.ProvideEnabled(provide),
bitswap.WithContentSearch(routing),
bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale
bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))),
bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))),
Expand All @@ -50,25 +49,24 @@ func BitswapOptions(cfg *config.Config, provide bool) interface{} {
}

return bitswapOptionsOut{BitswapOpts: opts}
}
})
}

type onlineExchangeIn struct {
fx.In

Mctx helpers.MetricsCtx
Host host.Host
Rt irouting.ProvideManyRouter
Bs blockstore.GCBlockstore
BitswapOpts []bitswap.Option `group:"bitswap-options"`
}

// OnlineExchange creates new LibP2P backed block exchange (BitSwap).
// Additional options to bitswap.New can be provided via the "bitswap-options"
// group.
func OnlineExchange() interface{} {
return func(in onlineExchangeIn, lc fx.Lifecycle) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(in.Host, in.Rt)
func OnlineExchange() fx.Option {
return fx.Provide(func(in onlineExchangeIn, lc fx.Lifecycle) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(in.Host)

exch := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetwork, in.Bs, in.BitswapOpts...)
lc.Append(fx.Hook{
Expand All @@ -77,5 +75,5 @@ func OnlineExchange() interface{} {
},
})
return exch
}
})
}
82 changes: 65 additions & 17 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
exchange "github.com/ipfs/boxo/exchange"
offline "github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/fetcher"
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/boxo/filestore"
Expand All @@ -17,6 +16,7 @@ import (
pathresolver "github.com/ipfs/boxo/path/resolver"
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/boxo/pinning/pinner/dspinner"
"github.com/ipfs/boxo/provider"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
format "github.com/ipfs/go-ipld-format"
Expand All @@ -29,8 +29,8 @@ import (
)

// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem)
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface, prov provider.System) blockservice.BlockService {
bsvc := blockservice.New(bs, rem, blockservice.WithProvider(prov))

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
Expand All @@ -41,6 +41,32 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
return bsvc
}

type offlineIn struct {
fx.In

Bs blockstore.Blockstore
Prov provider.System `optional:"true"`
}

type offlineOut struct {
fx.Out

Bs blockservice.BlockService `name:"offlineBlockService"`
}

// OfflineBlockservice is like [BlockService] but it makes an offline version.
func OfflineBlockservice(lc fx.Lifecycle, in offlineIn) offlineOut {
bsvc := blockservice.New(in.Bs, nil, blockservice.WithProvider(in.Prov))

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return offlineOut{Bs: bsvc}
}

// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
rootDS := repo.Datastore()
Expand Down Expand Up @@ -82,38 +108,34 @@ func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
return merkledag.NewSession(ctx, s.DAGService)
}

// FetchersOut allows injection of fetchers.
type FetchersOut struct {
// fetchersOut allows injection of fetchers.
type fetchersOut struct {
fx.Out
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
}

// FetchersIn allows using fetchers for other dependencies.
type FetchersIn struct {
type fetcherIn struct {
fx.In
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
Online blockservice.BlockService
Offline blockservice.BlockService `name:"offlineBlockService"`
}

// FetcherConfig returns a fetcher config that can build new fetcher instances
func FetcherConfig(bs blockservice.BlockService) FetchersOut {
ipldFetcher := bsfetcher.NewFetcherConfig(bs)
func FetcherConfig(in fetcherIn) fetchersOut {
ipldFetcher := bsfetcher.NewFetcherConfig(in.Online)
ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify)

// Construct offline versions which we can safely use in contexts where
// path resolution should not fetch new blocks via exchange.
offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore()))
offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs)
offlineIpldFetcher := bsfetcher.NewFetcherConfig(in.Offline)
offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify)

return FetchersOut{
return fetchersOut{
IPLDFetcher: ipldFetcher,
UnixfsFetcher: unixFSFetcher,
OfflineIPLDFetcher: offlineIpldFetcher,
Expand All @@ -130,8 +152,17 @@ type PathResolversOut struct {
OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"`
}

// PathResolverIn allows using fetchers for other dependencies.
type PathResolverIn struct {
fx.In
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
}

// PathResolverConfig creates path resolvers with the given fetchers.
func PathResolverConfig(fetchers FetchersIn) PathResolversOut {
func PathResolverConfig(fetchers PathResolverIn) PathResolversOut {
return PathResolversOut{
IPLDPathResolver: pathresolver.NewBasicResolver(fetchers.IPLDFetcher),
UnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.UnixfsFetcher),
Expand All @@ -145,6 +176,23 @@ func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs)
}

type offlineDagIn struct {
fx.In

Bs blockservice.BlockService `name:"offlineBlockService"`
}

type offlineDagOut struct {
fx.Out

DAG format.DAGService `name:"offlineDagService"`
}

// OfflineDag is like [Dag] but it makes an offline version.
func OfflineDag(lc fx.Lifecycle, in offlineDagIn) offlineDagOut {
return offlineDagOut{DAG: merkledag.NewDAGService(in.Bs)}
}

// Files loads persisted MFS root
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot")
Expand Down
Loading

0 comments on commit 732d56c

Please sign in to comment.