Skip to content

Commit

Permalink
feat(share)!: add functional options to share pkg (#1798)
Browse files Browse the repository at this point in the history
## Overview

This PR introduces functional options to the share package and
deprecates usage of default values directly in code, in favor of using
parameterized values.

## Breaking

This PR breaks the configuration. The on-disk configuration becomes
(_note that `Share.Availability` is only available for light nodes_):
```
[Share]
  [Share.Availability]
    SampleAmount = 16
  [Share.Discovery]
    PeersLimit = 5
    AdvertiseInterval = "8h0m0s"
```
## Checklist

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
derrandz authored May 8, 2023

Verified

This commit was signed with the committer’s verified signature.
navidkpr Navid Pour
1 parent ff65dff commit 7f556f0
Showing 16 changed files with 235 additions and 117 deletions.
2 changes: 1 addition & 1 deletion nodebuilder/config.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ func DefaultConfig(tp node.Type) *Config {
P2P: p2p.DefaultConfig(tp),
RPC: rpc.DefaultConfig(),
Gateway: gateway.DefaultConfig(),
Share: share.DefaultConfig(),
Share: share.DefaultConfig(tp),
Header: header.DefaultConfig(tp),
}

51 changes: 39 additions & 12 deletions nodebuilder/share/config.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,69 @@
package share

import (
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"fmt"

"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

// TODO: some params are pointers and other are not, Let's fix this.
type Config struct {
// UseShareExchange is a flag toggling the usage of shrex protocols for blocksync.
UseShareExchange bool
// ShrExEDSParams sets shrexeds client and server configuration parameters
ShrExEDSParams *shrexeds.Parameters
// ShrExNDParams sets shrexnd client and server configuration parameters
ShrExNDParams *shrexnd.Parameters
// PeerManagerParams sets peer-manager configuration parameters
PeerManagerParams peers.Parameters
// Discovery sets peer discovery configuration parameters.
Discovery disc.Parameters

LightAvailability light.Parameters `toml:",omitempty"`
Discovery discovery.Parameters
}

func DefaultConfig() Config {
return Config{
UseShareExchange: true,
func DefaultConfig(tp node.Type) Config {
cfg := Config{
Discovery: discovery.DefaultParameters(),
ShrExEDSParams: shrexeds.DefaultParameters(),
ShrExNDParams: shrexnd.DefaultParameters(),
UseShareExchange: true,
PeerManagerParams: peers.DefaultParameters(),
Discovery: disc.DefaultParameters(),
}

if tp == node.Light {
cfg.LightAvailability = light.DefaultParameters()
}

return cfg
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
func (cfg *Config) Validate(tp node.Type) error {
if tp == node.Light {
if err := cfg.LightAvailability.Validate(); err != nil {
return fmt.Errorf("nodebuilder/share: %w", err)
}
}

if err := cfg.Discovery.Validate(); err != nil {
return fmt.Errorf("nodebuilder/share: %w", err)
}

if err := cfg.ShrExNDParams.Validate(); err != nil {
return err
return fmt.Errorf("nodebuilder/share: %w", err)
}

if err := cfg.ShrExEDSParams.Validate(); err != nil {
return err
return fmt.Errorf("nodebuilder/share: %w", err)
}
return cfg.PeerManagerParams.Validate()

if err := cfg.PeerManagerParams.Validate(); err != nil {
return fmt.Errorf("nodebuilder/share: %w", err)
}

return nil
}
5 changes: 3 additions & 2 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
@@ -21,15 +21,16 @@ import (
"github.com/celestiaorg/celestia-node/share/getters"
)

func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
func newDiscovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
return func(
r routing.ContentRouting,
h host.Host,
) *disc.Discovery {
return disc.NewDiscovery(
h,
routingdisc.NewRoutingDiscovery(r),
cfg.Discovery,
disc.WithPeersLimit(cfg.Discovery.PeersLimit),
disc.WithAdvertiseInterval(cfg.Discovery.AdvertiseInterval),
)
}
}
9 changes: 7 additions & 2 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import (

func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()
cfgErr := cfg.Validate(tp)

baseComponents := fx.Options(
fx.Supply(*cfg),
@@ -33,7 +33,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(newModule),
fx.Invoke(func(disc *disc.Discovery) {}),
fx.Provide(fx.Annotate(
discovery(*cfg),
newDiscovery(*cfg),
fx.OnStart(func(ctx context.Context, d *disc.Discovery) error {
return d.Start(ctx)
}),
@@ -170,6 +170,11 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return fx.Module(
"share",
baseComponents,
fx.Provide(func() []light.Option {
return []light.Option{
light.WithSampleAmount(cfg.LightAvailability.SampleAmount),
}
}),
shrexGetterComponents,
fx.Invoke(share.EnsureEmptySquareExists),
fx.Provide(getters.NewIPLDGetter),
6 changes: 2 additions & 4 deletions share/availability.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"context"
"errors"

"github.com/celestiaorg/celestia-app/pkg/da"
da "github.com/celestiaorg/celestia-app/pkg/da"
)

// ErrNotAvailable is returned whenever DA sampling fails.
@@ -15,11 +15,9 @@ var ErrNotAvailable = errors.New("share: data not available")
type Root = da.DataAvailabilityHeader

// Availability defines interface for validation of Shares' availability.
//
//go:generate mockgen -destination=availability/mocks/availability.go -package=mocks . Availability
type Availability interface {
// SharesAvailable subjectively validates if Shares committed to the given Root are available on
// the Network by requesting the EDS from the provided peers.
// the Network.
SharesAvailable(context.Context, *Root) error
// ProbabilityOfAvailability calculates the probability of the data square
// being available based on the number of samples collected.
20 changes: 9 additions & 11 deletions share/availability/cache/availability.go
Original file line number Diff line number Diff line change
@@ -15,17 +15,12 @@ import (
"github.com/celestiaorg/celestia-node/share"
)

var log = logging.Logger("share/cache")

var (
// DefaultWriteBatchSize defines the size of the batched header write.
// Headers are written in batches not to thrash the underlying Datastore with writes.
// TODO(@Wondertan, @renaynay): Those values must be configurable and proper defaults should be set
// for specific node type. (#709)
DefaultWriteBatchSize = 2048
cacheAvailabilityPrefix = datastore.NewKey("sampling_result")

log = logging.Logger("share/cache")
minRoot = da.MinDataAvailabilityHeader()

cacheAvailabilityPrefix = datastore.NewKey("sampling_result")
writeBatchSize = 2048
)

// ShareAvailability wraps a given share.Availability (whether it's light or full)
@@ -42,9 +37,12 @@ type ShareAvailability struct {

// NewShareAvailability wraps the given share.Availability with an additional datastore
// for sampling result caching.
func NewShareAvailability(avail share.Availability, ds datastore.Batching) *ShareAvailability {
func NewShareAvailability(
avail share.Availability,
ds datastore.Batching,
) *ShareAvailability {
ds = namespace.Wrap(ds, cacheAvailabilityPrefix)
autoDS := autobatch.NewAutoBatching(ds, DefaultWriteBatchSize)
autoDS := autobatch.NewAutoBatching(ds, writeBatchSize)

return &ShareAvailability{
avail: avail,
59 changes: 18 additions & 41 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -35,53 +35,24 @@ const (
defaultRetryTimeout = time.Second
)

type Parameters struct {
// PeersLimit defines the soft limit of FNs to connect to via discovery.
// Set 0 to disable.
PeersLimit uint
// AdvertiseInterval is a interval between advertising sessions.
// Set -1 to disable.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
// discoveryRetryTimeout is an interval between discovery attempts
// when we discovered lower than PeersLimit peers.
// Set -1 to disable.
discoveryRetryTimeout time.Duration
}

func (p Parameters) withDefaults() Parameters {
def := DefaultParameters()
if p.AdvertiseInterval == 0 {
p.AdvertiseInterval = def.AdvertiseInterval
}
if p.discoveryRetryTimeout == 0 {
p.discoveryRetryTimeout = defaultRetryTimeout
}
return p
}

func DefaultParameters() Parameters {
return Parameters{
PeersLimit: 5,
// based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793
AdvertiseInterval: time.Hour * 22,
}
}
// defaultRetryTimeout defines time interval between discovery attempts.
var discoveryRetryTimeout = defaultRetryTimeout

// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
params Parameters

set *limitedSet
host host.Host
disc discovery.Discovery
connector *backoffConnector
set *limitedSet
host host.Host
disc discovery.Discovery
connector *backoffConnector
// onUpdatedPeers will be called on peer set changes
onUpdatedPeers OnUpdatedPeers

triggerDisc chan struct{}

cancel context.CancelFunc

params Parameters
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)
@@ -90,15 +61,21 @@ type OnUpdatedPeers func(peerID peer.ID, isAdded bool)
func NewDiscovery(
h host.Host,
d discovery.Discovery,
params Parameters,
opts ...Option,
) *Discovery {
params := DefaultParameters()

for _, opt := range opts {
opt(&params)
}

return &Discovery{
params: params.withDefaults(),
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: func(peer.ID, bool) {},
params: params,
triggerDisc: make(chan struct{}),
}
}
@@ -191,7 +168,7 @@ func (d *Discovery) Advertise(ctx context.Context) {
// discoveryLoop ensures we always have '~peerLimit' connected peers.
// It starts peer discovery per request and restarts the process until the soft limit reached.
func (d *Discovery) discoveryLoop(ctx context.Context) {
t := time.NewTicker(d.params.discoveryRetryTimeout)
t := time.NewTicker(discoveryRetryTimeout)
defer t.Stop()
for {
// drain all previous ticks from channel
21 changes: 9 additions & 12 deletions share/availability/discovery/discovery_test.go
Original file line number Diff line number Diff line change
@@ -24,11 +24,11 @@ func TestDiscovery(t *testing.T) {

tn := newTestnet(ctx, t)

peerA := tn.discovery(Parameters{
PeersLimit: nodes,
discoveryRetryTimeout: time.Millisecond * 100,
AdvertiseInterval: -1, // we don't want to be found but only find
})
peerA := tn.discovery(
WithPeersLimit(nodes),
WithAdvertiseInterval(-1),
)
discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go

type peerUpdate struct {
peerID peer.ID
@@ -41,11 +41,8 @@ func TestDiscovery(t *testing.T) {

discs := make([]*Discovery, nodes)
for i := range discs {
discs[i] = tn.discovery(Parameters{
PeersLimit: 0,
discoveryRetryTimeout: -1,
AdvertiseInterval: time.Millisecond * 100,
})
discs[i] = tn.discovery(WithPeersLimit(0), WithAdvertiseInterval(time.Millisecond*100))
discoveryRetryTimeout = -1 // defined in discovery.go

select {
case res := <-updateCh:
@@ -98,9 +95,9 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet {
return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)}
}

func (t *testnet) discovery(params Parameters) *Discovery {
func (t *testnet) discovery(opts ...Option) *Discovery {
hst, routingDisc := t.peer()
disc := NewDiscovery(hst, routingDisc, params)
disc := NewDiscovery(hst, routingDisc, opts...)
err := disc.Start(t.ctx)
require.NoError(t.T, err)
t.T.Cleanup(func() {
59 changes: 59 additions & 0 deletions share/availability/discovery/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package discovery

import (
"fmt"
"time"
)

// Parameters is the set of Parameters that must be configured for the Discovery module
type Parameters struct {
// PeersLimit defines the soft limit of FNs to connect to via discovery.
// Set 0 to disable.
PeersLimit uint
// AdvertiseInterval is a interval between advertising sessions.
// Set -1 to disable.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
}

// Option is a function that configures Discovery Parameters
type Option func(*Parameters)

// DefaultParameters returns the default Parameters' configuration values
// for the Discovery module
func DefaultParameters() Parameters {
return Parameters{
PeersLimit: 5,
// based on https://github.com/libp2p/go-libp2p-kad-dht/pull/793
AdvertiseInterval: time.Hour * 22,
}
}

// Validate validates the values in Parameters
func (p *Parameters) Validate() error {
if p.AdvertiseInterval <= 0 {
return fmt.Errorf(
"discovery: invalid option: value AdvertiseInterval %s, %s",
"is 0 or negative.",
"value must be positive",
)
}

return nil
}

// WithPeersLimit is a functional option that Discovery
// uses to set the PeersLimit configuration param
func WithPeersLimit(peersLimit uint) Option {
return func(p *Parameters) {
p.PeersLimit = peersLimit
}
}

// WithAdvertiseInterval is a functional option that Discovery
// uses to set the AdvertiseInterval configuration param
func WithAdvertiseInterval(advInterval time.Duration) Option {
return func(p *Parameters) {
p.AdvertiseInterval = advInterval
}
}
Loading

0 comments on commit 7f556f0

Please sign in to comment.