Skip to content

Commit

Permalink
feat(shwap): Integrate shrex into shwap (#3554)
Browse files Browse the repository at this point in the history
This PR add shrex integration with shwap. Key changes:

Shrex uses shwap-id to request data
Added shwap-id verification for bot client and server
bumped shrex version to prevent old clients from hitting new protocol servers
move shrex into shwap protocols folder (closer to bitswap)
move shrex getter into shwap/shrex
update shrex getter implementation and tests
  • Loading branch information
walldiss authored Jul 30, 2024
1 parent 062f499 commit c4e602a
Show file tree
Hide file tree
Showing 69 changed files with 1,354 additions and 2,286 deletions.
15 changes: 15 additions & 0 deletions libs/utils/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package utils

import (
"io"

logging "github.com/ipfs/go-log/v2"
)

// CloseAndLog closes the closer and logs any error that occurs. The function is handy wrapping
// to group closing and logging in one call for defer statements.
func CloseAndLog(log logging.StandardLogger, name string, closer io.Closer) {
if err := closer.Close(); err != nil {
log.Warnf("closing %s: %s", name, err)
}
}
35 changes: 10 additions & 25 deletions share/getters/utils.go → libs/utils/ctx.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package getters
package utils

import (
"context"
"errors"
"time"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
)

var (
tracer = otel.Tracer("share/getters")
log = logging.Logger("share/getters")
// ResetContextOnError returns a fresh context if the given context has an error.
func ResetContextOnError(ctx context.Context) context.Context {
if ctx.Err() != nil {
ctx = context.Background()
}

errOperationNotSupported = errors.New("operation is not supported")
)
return ctx
}

// ctxWithSplitTimeout will split timeout stored in context by splitFactor and return the result if
// CtxWithSplitTimeout will split timeout stored in context by splitFactor and return the result if
// it is greater than minTimeout. minTimeout == 0 will be ignored, splitFactor <= 0 will be ignored
func ctxWithSplitTimeout(
func CtxWithSplitTimeout(
ctx context.Context,
splitFactor int,
minTimeout time.Duration,
Expand All @@ -42,16 +40,3 @@ func ctxWithSplitTimeout(
}
return context.WithTimeout(ctx, splitTimeout)
}

// ErrorContains reports whether any error in err's tree matches any error in targets tree.
func ErrorContains(err, target error) bool {
if errors.Is(err, target) || target == nil {
return true
}

target = errors.Unwrap(target)
if target == nil {
return false
}
return ErrorContains(err, target)
}
110 changes: 2 additions & 108 deletions share/getters/utils_test.go → libs/utils/ctx_test.go
Original file line number Diff line number Diff line change
@@ -1,119 +1,13 @@
package getters
package utils

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_ErrorContains(t *testing.T) {
err1 := errors.New("1")
err2 := errors.New("2")

w1 := func(err error) error {
return fmt.Errorf("wrap1: %w", err)
}
w2 := func(err error) error {
return fmt.Errorf("wrap1: %w", err)
}

type args struct {
err error
target error
}
tests := []struct {
name string
args args
want bool
}{
{
"nil err",
args{
err: nil,
target: err1,
},
false,
},
{
"nil target",
args{
err: err1,
target: nil,
},
true,
},
{
"errors.Is true",
args{
err: w1(err1),
target: err1,
},
true,
},
{
"errors.Is false",
args{
err: w1(err1),
target: err2,
},
false,
},
{
"same wrap but different base error",
args{
err: w1(err1),
target: w1(err2),
},
false,
},
{
"both wrapped true",
args{
err: w1(err1),
target: w2(err1),
},
true,
},
{
"both wrapped false",
args{
err: w1(err1),
target: w2(err2),
},
false,
},
{
"multierr first in slice",
args{
err: errors.Join(w1(err1), w2(err2)),
target: w2(err1),
},
true,
},
{
"multierr second in slice",
args{
err: errors.Join(w1(err1), w2(err2)),
target: w1(err2),
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t,
tt.want,
ErrorContains(tt.args.err, tt.args.target),
"ErrorContains(%v, %v)", tt.args.err, tt.args.target)
})
}
}

func Test_ctxWithSplitTimeout(t *testing.T) {
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -216,7 +110,7 @@ func Test_ctxWithSplitTimeout(t *testing.T) {
ctx, cancel = context.WithTimeout(ctx, tt.args.ctxTimeout)
}
t.Cleanup(cancel)
got, _ := ctxWithSplitTimeout(ctx, sf, tt.args.minTimeout)
got, _ := CtxWithSplitTimeout(ctx, sf, tt.args.minTimeout)
dl, ok := got.Deadline()
// in case no deadline is found in ctx or not expected to be found, check both cases apply at the
// same time
Expand Down
14 changes: 0 additions & 14 deletions libs/utils/resetctx.go

This file was deleted.

9 changes: 5 additions & 4 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

func newShareModule(getter share.Getter, avail share.Availability) Module {
Expand Down Expand Up @@ -42,7 +43,7 @@ func ensureEmptyEDSInBS(ctx context.Context, bServ blockservice.BlockService) er
}

func lightGetter(
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
ipldGetter *getters.IPLDGetter,
cfg Config,
) share.Getter {
Expand All @@ -54,13 +55,13 @@ func lightGetter(
return getters.NewCascadeGetter(cascade)
}

// ShrexGetter is added to bridge nodes for the case that a shard is removed
// Getter is added to bridge nodes for the case that a shard is removed
// after detected shard corruption. This ensures the block is fetched and stored
// by shrex the next time the data is retrieved (meaning shard recovery is
// manual after corruption is detected).
func bridgeGetter(
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
cfg Config,
) share.Getter {
var cascade []share.Getter
Expand All @@ -73,7 +74,7 @@ func bridgeGetter(

func fullGetter(
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
ipldGetter *getters.IPLDGetter,
cfg Config,
) share.Getter {
Expand Down
9 changes: 5 additions & 4 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
Expand Down Expand Up @@ -92,19 +93,19 @@ func shrexComponents(tp node.Type, cfg *Config) fx.Option {
edsClient *shrexeds.Client,
ndClient *shrexnd.Client,
managers map[string]*peers.Manager,
) *getters.ShrexGetter {
return getters.NewShrexGetter(
) *shrex_getter.Getter {
return shrex_getter.NewGetter(
edsClient,
ndClient,
managers[fullNodesTag],
managers[archivalNodesTag],
lightprune.Window,
)
},
fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error {
fx.OnStart(func(ctx context.Context, getter *shrex_getter.Getter) error {
return getter.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error {
fx.OnStop(func(ctx context.Context, getter *shrex_getter.Getter) error {
return getter.Stop(ctx)
}),
)),
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"errors"

"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

// WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is
Expand Down Expand Up @@ -49,7 +49,7 @@ func WithShrexServerMetrics(edsServer *shrexeds.Server, ndServer *shrexnd.Server
return ndServer.WithMetrics()
}

func WithShrexGetterMetrics(sg *getters.ShrexGetter) error {
func WithShrexGetterMetrics(sg *shrex_getter.Getter) error {
return sg.WithMetrics()
}

Expand Down
3 changes: 2 additions & 1 deletion nodebuilder/tests/nd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

func TestShrexNDFromLights(t *testing.T) {
Expand Down Expand Up @@ -200,7 +201,7 @@ func replaceShareGetter() fx.Option {
host host.Host,
store *eds.Store,
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
network p2p.Network,
) share.Getter {
cascade := make([]share.Getter, 0, 2)
Expand Down
8 changes: 4 additions & 4 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

// TestArchivalBlobSync tests whether a LN is able to sync historical blobs from
Expand Down Expand Up @@ -71,15 +71,15 @@ func TestArchivalBlobSync(t *testing.T) {
edsClient *shrexeds.Client,
ndClient *shrexnd.Client,
managers map[string]*peers.Manager,
) *getters.ShrexGetter {
return getters.NewShrexGetter(
) *shrex_getter.Getter {
return shrex_getter.NewGetter(
edsClient,
ndClient,
managers["full"],
managers["archival"],
testAvailWindow,
)
}, new(getters.ShrexGetter)),
}, new(shrex_getter.Getter)),
)

// stop the archival BN to force LN to have to discover
Expand Down
11 changes: 9 additions & 2 deletions share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -15,6 +17,11 @@ import (
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)

var (
tracer = otel.Tracer("share/getters")
log = logging.Logger("share/getters")
)

var _ share.Getter = (*CascadeGetter)(nil)

// CascadeGetter implements custom share.Getter that composes multiple Getter implementations in
Expand Down Expand Up @@ -125,14 +132,14 @@ func cascadeGetters[V any](

// we split the timeout between left getters
// once async cascadegetter is implemented, we can remove this
getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0)
getCtx, cancel := utils.CtxWithSplitTimeout(ctx, len(getters)-i, 0)
val, getErr := get(getCtx, getter)
cancel()
if getErr == nil {
return val, nil
}

if errors.Is(getErr, errOperationNotSupported) {
if errors.Is(getErr, ErrOperationNotSupported) {
continue
}

Expand Down
Loading

0 comments on commit c4e602a

Please sign in to comment.