diff --git a/README.md b/README.md index c413a2ad..6be9b7d9 100644 --- a/README.md +++ b/README.md @@ -151,9 +151,9 @@ Releases are marked with a version number formatted as MAJOR.MINOR.PATCH. Major Therefore, **noise** _mostly_ respects semantic versioning. -The rationale behind this is due to improper tagging of prior releases (v0.1.0, v1.0.0, and v1.1.0), which has caused for the improper caching of module information on _proxy.golang.org_ and _sum.golang.org_. +The rationale behind this is due to improper tagging of prior releases (v0.1.0, v1.0.0, v1.1.0, and v1.1.1), which has caused for the improper caching of module information on _proxy.golang.org_ and _sum.golang.org_. -As a result, _noise's initial development phase starts from v1.1.1_. Until Noise's API is stable, subsequent releases will only comprise of bumps in MINOR and PATCH. +As a result, _noise's initial development phase starts from v1.1.2_. Until Noise's API is stable, subsequent releases will only comprise of bumps in MINOR and PATCH. ## License diff --git a/kademlia/iterator.go b/kademlia/iterator.go index f175e5c4..9b0f3ca2 100644 --- a/kademlia/iterator.go +++ b/kademlia/iterator.go @@ -24,6 +24,8 @@ type Iterator struct { maxNumResults int numParallelLookups int numParallelRequestsPerLookup int + + lookupTimeout time.Duration } // NewIterator instantiates a new overlay network iterator bounded to a node and routing table that may be @@ -37,6 +39,8 @@ func NewIterator(node *noise.Node, table *Table, opts ...IteratorOption) *Iterat maxNumResults: BucketSize, numParallelLookups: 3, numParallelRequestsPerLookup: 8, + + lookupTimeout: 3 * time.Second, } for _, opt := range opts { @@ -155,7 +159,7 @@ func (it *Iterator) processLookupRequests(in <-chan noise.ID, out chan<- []noise } func (it *Iterator) lookupRequest(id noise.ID, out chan<- []noise.ID) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), it.lookupTimeout) defer cancel() obj, err := it.node.RequestMessage(ctx, id.Address, FindNodeRequest{Target: id.ID}) diff --git a/kademlia/iterator_options.go b/kademlia/iterator_options.go index f3a5d9ba..21f9528d 100644 --- a/kademlia/iterator_options.go +++ b/kademlia/iterator_options.go @@ -1,6 +1,9 @@ package kademlia -import "go.uber.org/zap" +import ( + "go.uber.org/zap" + "time" +) // IteratorOption represents a functional option which may be passed to NewIterator, or to (*Protocol).Find or // (*Protocol).Discover to configure Iterator. @@ -37,3 +40,11 @@ func WithIteratorNumParallelRequestsPerLookup(numParallelRequestsPerLookup int) it.numParallelRequestsPerLookup = numParallelRequestsPerLookup } } + +// WithIteratorLookupTimeout sets the max duration to wait until we declare a lookup request sent in amidst +// a single disjoint lookup to have timed out. By default, it is set to 3 seconds. +func WithIteratorLookupTimeout(lookupTimeout time.Duration) IteratorOption { + return func(it *Iterator) { + it.lookupTimeout = lookupTimeout + } +} diff --git a/kademlia/protocol.go b/kademlia/protocol.go index 35c04a41..24d12b33 100644 --- a/kademlia/protocol.go +++ b/kademlia/protocol.go @@ -26,11 +26,15 @@ type Protocol struct { table *Table events Events + + pingTimeout time.Duration } // New returns a new instance of the Kademlia protocol. func New(opts ...ProtocolOption) *Protocol { - p := &Protocol{} + p := &Protocol{ + pingTimeout: 3 * time.Second, + } for _, opt := range opts { opt(p) @@ -102,7 +106,7 @@ func (p *Protocol) Ack(id noise.ID) { bucket := p.table.Bucket(id.ID) last := bucket[len(bucket)-1] - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), p.pingTimeout) pong, err := p.node.RequestMessage(ctx, last.Address, Ping{}) cancel() diff --git a/kademlia/protocol_options.go b/kademlia/protocol_options.go index a3c98860..ae2ddc47 100644 --- a/kademlia/protocol_options.go +++ b/kademlia/protocol_options.go @@ -1,6 +1,9 @@ package kademlia -import "go.uber.org/zap" +import ( + "go.uber.org/zap" + "time" +) // ProtocolOption represents a functional option which may be passed to New to configure a Protocol. type ProtocolOption func(p *Protocol) @@ -19,3 +22,12 @@ func WithProtocolLogger(logger *zap.Logger) ProtocolOption { p.logger = logger } } + +// WithProtocolPingTimeout configures the amount of time to wait for until we declare a ping to have failed. Peers +// typically either are pinged through a call of (*Protocol).Ping, or in amidst the execution of Kademlia's peer +// eviction policy. By default, it is set to 3 seconds. +func WithProtocolPingTimeout(pingTimeout time.Duration) ProtocolOption { + return func(p *Protocol) { + p.pingTimeout = pingTimeout + } +} diff --git a/node.go b/node.go index b7aa7aa9..fd8207c8 100644 --- a/node.go +++ b/node.go @@ -127,8 +127,8 @@ func (n *Node) Listen() error { addr, ok := n.listener.Addr().(*net.TCPAddr) if !ok { - err = fmt.Errorf("did not listen for tcp: %w", n.listener.Close()) - return err + n.listener.Close() + return errors.New("did not bind to a tcp addr") } n.host = addr.IP @@ -136,9 +136,34 @@ func (n *Node) Listen() error { if n.addr == "" { n.addr = net.JoinHostPort(normalizeIP(n.host), strconv.FormatUint(uint64(n.port), 10)) - } + n.id = NewID(n.publicKey, n.host, n.port) + } else { + resolved, err := ResolveAddress(n.addr) + if err != nil { + n.listener.Close() + return err + } + + hostStr, portStr, err := net.SplitHostPort(resolved) + if err != nil { + n.listener.Close() + return err + } - n.id = NewID(n.publicKey, n.host, n.port) + host := net.ParseIP(hostStr) + if host == nil { + n.listener.Close() + return errors.New("host in provided public address is invalid (must be IPv4/IPv6)") + } + + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + n.listener.Close() + return err + } + + n.id = NewID(n.publicKey, host, uint16(port)) + } for _, protocol := range n.protocols { if protocol.Bind == nil { @@ -146,6 +171,7 @@ func (n *Node) Listen() error { } if err = protocol.Bind(n); err != nil { + n.listener.Close() return err } } @@ -383,6 +409,14 @@ func (n *Node) dialIfNotExists(ctx context.Context, addr string) (*Client, error client.waitUntilClosed() if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + for _, protocol := range n.protocols { + if protocol.OnPingFailed == nil { + continue + } + + protocol.OnPingFailed(addr, err) + } + return nil, err } }