Skip to content

Commit

Permalink
kademlia/iterator: allow for the configuration of a lookup request ti…
Browse files Browse the repository at this point in the history
…meout

kademlia/protocol: allow for the configuration of a ping request timeout
node: resolve public address if passed in as an option, and pass its parameters towards NewID on Listen
noise: bump to v1.1.2
  • Loading branch information
iwasaki-kenta committed Jan 30, 2020
1 parent 8a67549 commit 64da665
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 11 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ Releases are marked with a version number formatted as MAJOR.MINOR.PATCH. Major

Therefore, **noise** _mostly_ respects semantic versioning.

The rationale behind this is due to improper tagging of prior releases (v0.1.0, v1.0.0, and v1.1.0), which has caused for the improper caching of module information on _proxy.golang.org_ and _sum.golang.org_.
The rationale behind this is due to improper tagging of prior releases (v0.1.0, v1.0.0, v1.1.0, and v1.1.1), which has caused for the improper caching of module information on _proxy.golang.org_ and _sum.golang.org_.

As a result, _noise's initial development phase starts from v1.1.1_. Until Noise's API is stable, subsequent releases will only comprise of bumps in MINOR and PATCH.
As a result, _noise's initial development phase starts from v1.1.2_. Until Noise's API is stable, subsequent releases will only comprise of bumps in MINOR and PATCH.

## License

Expand Down
6 changes: 5 additions & 1 deletion kademlia/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Iterator struct {
maxNumResults int
numParallelLookups int
numParallelRequestsPerLookup int

lookupTimeout time.Duration
}

// NewIterator instantiates a new overlay network iterator bounded to a node and routing table that may be
Expand All @@ -37,6 +39,8 @@ func NewIterator(node *noise.Node, table *Table, opts ...IteratorOption) *Iterat
maxNumResults: BucketSize,
numParallelLookups: 3,
numParallelRequestsPerLookup: 8,

lookupTimeout: 3 * time.Second,
}

for _, opt := range opts {
Expand Down Expand Up @@ -155,7 +159,7 @@ func (it *Iterator) processLookupRequests(in <-chan noise.ID, out chan<- []noise
}

func (it *Iterator) lookupRequest(id noise.ID, out chan<- []noise.ID) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), it.lookupTimeout)
defer cancel()

obj, err := it.node.RequestMessage(ctx, id.Address, FindNodeRequest{Target: id.ID})
Expand Down
13 changes: 12 additions & 1 deletion kademlia/iterator_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kademlia

import "go.uber.org/zap"
import (
"go.uber.org/zap"
"time"
)

// IteratorOption represents a functional option which may be passed to NewIterator, or to (*Protocol).Find or
// (*Protocol).Discover to configure Iterator.
Expand Down Expand Up @@ -37,3 +40,11 @@ func WithIteratorNumParallelRequestsPerLookup(numParallelRequestsPerLookup int)
it.numParallelRequestsPerLookup = numParallelRequestsPerLookup
}
}

// WithIteratorLookupTimeout sets the max duration to wait until we declare a lookup request sent in amidst
// a single disjoint lookup to have timed out. By default, it is set to 3 seconds.
func WithIteratorLookupTimeout(lookupTimeout time.Duration) IteratorOption {
return func(it *Iterator) {
it.lookupTimeout = lookupTimeout
}
}
8 changes: 6 additions & 2 deletions kademlia/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ type Protocol struct {
table *Table

events Events

pingTimeout time.Duration
}

// New returns a new instance of the Kademlia protocol.
func New(opts ...ProtocolOption) *Protocol {
p := &Protocol{}
p := &Protocol{
pingTimeout: 3 * time.Second,
}

for _, opt := range opts {
opt(p)
Expand Down Expand Up @@ -102,7 +106,7 @@ func (p *Protocol) Ack(id noise.ID) {
bucket := p.table.Bucket(id.ID)
last := bucket[len(bucket)-1]

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), p.pingTimeout)
pong, err := p.node.RequestMessage(ctx, last.Address, Ping{})
cancel()

Expand Down
14 changes: 13 additions & 1 deletion kademlia/protocol_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kademlia

import "go.uber.org/zap"
import (
"go.uber.org/zap"
"time"
)

// ProtocolOption represents a functional option which may be passed to New to configure a Protocol.
type ProtocolOption func(p *Protocol)
Expand All @@ -19,3 +22,12 @@ func WithProtocolLogger(logger *zap.Logger) ProtocolOption {
p.logger = logger
}
}

// WithProtocolPingTimeout configures the amount of time to wait for until we declare a ping to have failed. Peers
// typically either are pinged through a call of (*Protocol).Ping, or in amidst the execution of Kademlia's peer
// eviction policy. By default, it is set to 3 seconds.
func WithProtocolPingTimeout(pingTimeout time.Duration) ProtocolOption {
return func(p *Protocol) {
p.pingTimeout = pingTimeout
}
}
42 changes: 38 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,25 +127,51 @@ func (n *Node) Listen() error {

addr, ok := n.listener.Addr().(*net.TCPAddr)
if !ok {
err = fmt.Errorf("did not listen for tcp: %w", n.listener.Close())
return err
n.listener.Close()
return errors.New("did not bind to a tcp addr")
}

n.host = addr.IP
n.port = uint16(addr.Port)

if n.addr == "" {
n.addr = net.JoinHostPort(normalizeIP(n.host), strconv.FormatUint(uint64(n.port), 10))
}
n.id = NewID(n.publicKey, n.host, n.port)
} else {
resolved, err := ResolveAddress(n.addr)
if err != nil {
n.listener.Close()
return err
}

hostStr, portStr, err := net.SplitHostPort(resolved)
if err != nil {
n.listener.Close()
return err
}

n.id = NewID(n.publicKey, n.host, n.port)
host := net.ParseIP(hostStr)
if host == nil {
n.listener.Close()
return errors.New("host in provided public address is invalid (must be IPv4/IPv6)")
}

port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
n.listener.Close()
return err
}

n.id = NewID(n.publicKey, host, uint16(port))
}

for _, protocol := range n.protocols {
if protocol.Bind == nil {
continue
}

if err = protocol.Bind(n); err != nil {
n.listener.Close()
return err
}
}
Expand Down Expand Up @@ -383,6 +409,14 @@ func (n *Node) dialIfNotExists(ctx context.Context, addr string) (*Client, error
client.waitUntilClosed()

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
for _, protocol := range n.protocols {
if protocol.OnPingFailed == nil {
continue
}

protocol.OnPingFailed(addr, err)
}

return nil, err
}
}
Expand Down

0 comments on commit 64da665

Please sign in to comment.