Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Integrate shrex into shwap #3554

Merged
merged 10 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions libs/utils/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package utils

import (
"io"

logging "github.com/ipfs/go-log/v2"
)

// CloseAndLog closes the closer and logs any error that occurs. The function is handy wrapping
// to group closing and logging in one call for defer statements.
func CloseAndLog(log logging.StandardLogger, name string, closer io.Closer) {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
if err := closer.Close(); err != nil {
log.Warnf("closing %s: %s", name, err)
}
}
35 changes: 10 additions & 25 deletions share/getters/utils.go → libs/utils/ctx.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package getters
package utils

import (
"context"
"errors"
"time"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
)

var (
tracer = otel.Tracer("share/getters")
log = logging.Logger("share/getters")
// ResetContextOnError returns a fresh context if the given context has an error.
func ResetContextOnError(ctx context.Context) context.Context {
if ctx.Err() != nil {
ctx = context.Background()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is an old func, but should we change to something 'safer' ? context.WithTimeout(time.Second) by example. WDYT ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense, but not in this PR. Perhaps time.Second should be passed to function in. Like

NewContextOnContextErr(ctx, newTimeout time.Time)


errOperationNotSupported = errors.New("operation is not supported")
)
return ctx
}

// ctxWithSplitTimeout will split timeout stored in context by splitFactor and return the result if
// CtxWithSplitTimeout will split timeout stored in context by splitFactor and return the result if
// it is greater than minTimeout. minTimeout == 0 will be ignored, splitFactor <= 0 will be ignored
func ctxWithSplitTimeout(
func CtxWithSplitTimeout(
ctx context.Context,
splitFactor int,
minTimeout time.Duration,
Expand All @@ -42,16 +40,3 @@ func ctxWithSplitTimeout(
}
return context.WithTimeout(ctx, splitTimeout)
}

// ErrorContains reports whether any error in err's tree matches any error in targets tree.
func ErrorContains(err, target error) bool {
if errors.Is(err, target) || target == nil {
return true
}

target = errors.Unwrap(target)
if target == nil {
return false
}
return ErrorContains(err, target)
}
110 changes: 2 additions & 108 deletions share/getters/utils_test.go → libs/utils/ctx_test.go
Original file line number Diff line number Diff line change
@@ -1,119 +1,13 @@
package getters
package utils

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_ErrorContains(t *testing.T) {
err1 := errors.New("1")
err2 := errors.New("2")

w1 := func(err error) error {
return fmt.Errorf("wrap1: %w", err)
}
w2 := func(err error) error {
return fmt.Errorf("wrap1: %w", err)
}

type args struct {
err error
target error
}
tests := []struct {
name string
args args
want bool
}{
{
"nil err",
args{
err: nil,
target: err1,
},
false,
},
{
"nil target",
args{
err: err1,
target: nil,
},
true,
},
{
"errors.Is true",
args{
err: w1(err1),
target: err1,
},
true,
},
{
"errors.Is false",
args{
err: w1(err1),
target: err2,
},
false,
},
{
"same wrap but different base error",
args{
err: w1(err1),
target: w1(err2),
},
false,
},
{
"both wrapped true",
args{
err: w1(err1),
target: w2(err1),
},
true,
},
{
"both wrapped false",
args{
err: w1(err1),
target: w2(err2),
},
false,
},
{
"multierr first in slice",
args{
err: errors.Join(w1(err1), w2(err2)),
target: w2(err1),
},
true,
},
{
"multierr second in slice",
args{
err: errors.Join(w1(err1), w2(err2)),
target: w1(err2),
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t,
tt.want,
ErrorContains(tt.args.err, tt.args.target),
"ErrorContains(%v, %v)", tt.args.err, tt.args.target)
})
}
}

func Test_ctxWithSplitTimeout(t *testing.T) {
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -216,7 +110,7 @@ func Test_ctxWithSplitTimeout(t *testing.T) {
ctx, cancel = context.WithTimeout(ctx, tt.args.ctxTimeout)
}
t.Cleanup(cancel)
got, _ := ctxWithSplitTimeout(ctx, sf, tt.args.minTimeout)
got, _ := CtxWithSplitTimeout(ctx, sf, tt.args.minTimeout)
dl, ok := got.Deadline()
// in case no deadline is found in ctx or not expected to be found, check both cases apply at the
// same time
Expand Down
14 changes: 0 additions & 14 deletions libs/utils/resetctx.go

This file was deleted.

9 changes: 5 additions & 4 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"

Check failure on line 12 in nodebuilder/share/constructors.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

could not import github.com/celestiaorg/celestia-node/share/getters (-: # github.com/celestiaorg/celestia-node/share/getters
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

func newShareModule(getter share.Getter, avail share.Availability) Module {
Expand Down Expand Up @@ -42,45 +43,45 @@
}

func lightGetter(
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
ipldGetter *getters.IPLDGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)

Check failure on line 52 in nodebuilder/share/constructors.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

cannot use shrexGetter (variable of type *shrex_getter.Getter) as "github.com/celestiaorg/celestia-node/share".Getter value in argument to append: *shrex_getter.Getter does not implement "github.com/celestiaorg/celestia-node/share".Getter (wrong type for method GetSharesByNamespace)
}
cascade = append(cascade, ipldGetter)
return getters.NewCascadeGetter(cascade)
}

// ShrexGetter is added to bridge nodes for the case that a shard is removed
// Getter is added to bridge nodes for the case that a shard is removed
// after detected shard corruption. This ensures the block is fetched and stored
// by shrex the next time the data is retrieved (meaning shard recovery is
// manual after corruption is detected).
func bridgeGetter(
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
cfg Config,
) share.Getter {
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)

Check failure on line 70 in nodebuilder/share/constructors.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

cannot use shrexGetter (variable of type *shrex_getter.Getter) as "github.com/celestiaorg/celestia-node/share".Getter value in argument to append: *shrex_getter.Getter does not implement "github.com/celestiaorg/celestia-node/share".Getter (wrong type for method GetSharesByNamespace)
}
return getters.NewCascadeGetter(cascade)
}

func fullGetter(
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
ipldGetter *getters.IPLDGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)

Check failure on line 84 in nodebuilder/share/constructors.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

cannot use shrexGetter (variable of type *shrex_getter.Getter) as "github.com/celestiaorg/celestia-node/share".Getter value in argument to append: *shrex_getter.Getter does not implement "github.com/celestiaorg/celestia-node/share".Getter (wrong type for method GetSharesByNamespace)
}
cascade = append(cascade, ipldGetter)
return getters.NewCascadeGetter(cascade)
Expand Down
9 changes: 5 additions & 4 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
Expand Down Expand Up @@ -92,19 +93,19 @@ func shrexComponents(tp node.Type, cfg *Config) fx.Option {
edsClient *shrexeds.Client,
ndClient *shrexnd.Client,
managers map[string]*peers.Manager,
) *getters.ShrexGetter {
return getters.NewShrexGetter(
) *shrex_getter.Getter {
return shrex_getter.NewGetter(
edsClient,
ndClient,
managers[fullNodesTag],
managers[archivalNodesTag],
lightprune.Window,
)
},
fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error {
fx.OnStart(func(ctx context.Context, getter *shrex_getter.Getter) error {
return getter.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error {
fx.OnStop(func(ctx context.Context, getter *shrex_getter.Getter) error {
return getter.Stop(ctx)
}),
)),
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"errors"

"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

// WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is
Expand Down Expand Up @@ -49,7 +49,7 @@ func WithShrexServerMetrics(edsServer *shrexeds.Server, ndServer *shrexnd.Server
return ndServer.WithMetrics()
}

func WithShrexGetterMetrics(sg *getters.ShrexGetter) error {
func WithShrexGetterMetrics(sg *shrex_getter.Getter) error {
return sg.WithMetrics()
}

Expand Down
3 changes: 2 additions & 1 deletion nodebuilder/tests/nd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

func TestShrexNDFromLights(t *testing.T) {
Expand Down Expand Up @@ -200,7 +201,7 @@ func replaceShareGetter() fx.Option {
host host.Host,
store *eds.Store,
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
shrexGetter *shrex_getter.Getter,
network p2p.Network,
) share.Getter {
cascade := make([]share.Getter, 0, 2)
Expand Down
8 changes: 4 additions & 4 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
)

// TestArchivalBlobSync tests whether a LN is able to sync historical blobs from
Expand Down Expand Up @@ -71,15 +71,15 @@ func TestArchivalBlobSync(t *testing.T) {
edsClient *shrexeds.Client,
ndClient *shrexnd.Client,
managers map[string]*peers.Manager,
) *getters.ShrexGetter {
return getters.NewShrexGetter(
) *shrex_getter.Getter {
return shrex_getter.NewGetter(
edsClient,
ndClient,
managers["full"],
managers["archival"],
testAvailWindow,
)
}, new(getters.ShrexGetter)),
}, new(shrex_getter.Getter)),
)

// stop the archival BN to force LN to have to discover
Expand Down
11 changes: 9 additions & 2 deletions share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"context"
"errors"

logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -15,6 +17,11 @@
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
)

var (
tracer = otel.Tracer("share/getters")
log = logging.Logger("share/getters")
)

var _ share.Getter = (*CascadeGetter)(nil)

// CascadeGetter implements custom share.Getter that composes multiple Getter implementations in
Expand Down Expand Up @@ -125,14 +132,14 @@

// we split the timeout between left getters
// once async cascadegetter is implemented, we can remove this
getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0)
getCtx, cancel := utils.CtxWithSplitTimeout(ctx, len(getters)-i, 0)
val, getErr := get(getCtx, getter)
cancel()
if getErr == nil {
return val, nil
}

if errors.Is(getErr, errOperationNotSupported) {
if errors.Is(getErr, ErrOperationNotSupported) {

Check failure on line 142 in share/getters/cascade.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

undefined: ErrOperationNotSupported) (typecheck)

Check failure on line 142 in share/getters/cascade.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

undefined: ErrOperationNotSupported) (typecheck)

Check failure on line 142 in share/getters/cascade.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

undefined: ErrOperationNotSupported) (typecheck)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
continue
}

Expand Down
Loading
Loading