Skip to content

Commit

Permalink
use a map to accurately track dials in flight
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Aug 7, 2023
1 parent c21bf23 commit 6b39900
Showing 1 changed file with 38 additions and 35 deletions.
73 changes: 38 additions & 35 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type dialWorker struct {
connected bool
// dq is used to pace dials to different addresses of the peer
dq *dialQueue
// dialsInFlight are the addresses with dials pending completion.
dialsInFlight int
// dialsInFlight are the addresses with dials pending completion. We use this to schedule new dials
// and to cleanup all pending dials when closing the loop
dialsInFlight map[string]bool
// totalDials is used to track number of dials made by this worker for metrics
totalDials int

Expand All @@ -112,6 +113,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dia
pendingRequests: make(map[*pendRequest]bool),
trackedDials: make(map[string]*addrDial),
resch: make(chan dialResult),
dialsInFlight: make(map[string]bool),
cl: cl,
}
}
Expand All @@ -136,7 +138,7 @@ func (w *dialWorker) loop() {
}
timerRunning = false
if w.dq.Len() > 0 {
if w.dialsInFlight == 0 && !w.connected {
if len(w.dialsInFlight) == 0 && !w.connected {
// if there are no dials in flight, trigger the next dials immediately
dialTimer.Reset(startTime)
} else {
Expand Down Expand Up @@ -172,25 +174,7 @@ loop:
continue loop
}

addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer)
if err != nil {
req.resch <- dialResponse{err: &DialError{Peer: w.peer, DialErrors: addrErrs, Cause: err}}
continue loop
}

// TODO(sukunrt): remove this check. This should never happen given the call
// to w.s.bestAcceptableConnToPeer above, but I think this happens for circuit v2 addresses
for _, addr := range addrs {
if ad, ok := w.trackedDials[string(addr.Bytes())]; ok {
if ad.conn != nil {
// dial to this addr was successful, complete the request
req.resch <- dialResponse{conn: ad.conn}
continue loop
}
}
}

w.addNewRequest(req, addrs, addrErrs)
w.addNewRequest(req)
scheduleNextDial()

case <-dialTimer.Ch():
Expand All @@ -214,7 +198,7 @@ loop:
// Errored without attempting a dial. This happens in case of backoff.
w.dispatchError(ad, err)
} else {
w.dialsInFlight++
w.dialsInFlight[string(ad.addr.Bytes())] = true
w.totalDials++
}
}
Expand All @@ -233,9 +217,7 @@ loop:
if res.Conn != nil {
res.Conn.Close()
}
// It is better to decrement the dials in flight and schedule one extra dial
// than risking not closing the worker loop on cleanup
w.dialsInFlight--
delete(w.dialsInFlight, string(res.Addr.Bytes()))
continue
}

Expand All @@ -245,7 +227,7 @@ loop:
continue
}

w.dialsInFlight--
delete(w.dialsInFlight, string(res.Addr.Bytes()))
// We're recording any error as a failure here.
// Notably, this also applies to cancelations (i.e. if another dial attempt was faster).
// This is ok since the black hole detector uses a very low threshold (5%).
Expand All @@ -261,14 +243,31 @@ loop:
}
}

// addNewRequest adds a new dial request to the worker loop. If the request has no pending dials, a response
// is sent immediately otherwise it is tracked in pendingRequests
func (w *dialWorker) addNewRequest(req dialRequest, addrs []ma.Multiaddr, addrErrs []TransportError) {
// addNewRequest adds a new dial request to the worker loop. If the request has a valid connection or all relevant
// dials have failed, the request is handled immediately, otherwise it is added to pendingRequests.
func (w *dialWorker) addNewRequest(req dialRequest) {
addrs, addrErrs, err := w.s.addrsForDial(req.ctx, w.peer)
if err != nil {
req.resch <- dialResponse{err: &DialError{Peer: w.peer, DialErrors: addrErrs, Cause: err}}
return
}

// TODO(sukunrt): remove this check. This should never happen given the call
// to w.s.bestAcceptableConnToPeer above, but I think this happens for circuit v2 addresses
for _, addr := range addrs {
if ad, ok := w.trackedDials[string(addr.Bytes())]; ok {
if ad.conn != nil {
// dial to this addr was successful, complete the request
req.resch <- dialResponse{conn: ad.conn}
return
}
}
}

// get the delays to dial these addrs from the swarms dialRanker
simConnect, _, _ := network.GetSimultaneousConnect(req.ctx)
addrRanking := w.rankAddrs(addrs, simConnect)

// create the pending request object
pr := &pendRequest{
req: req,
err: &DialError{Peer: w.peer, DialErrors: addrErrs},
Expand Down Expand Up @@ -300,8 +299,9 @@ func (w *dialWorker) addNewRequest(req dialRequest, addrs []ma.Multiaddr, addrEr
}

if !ad.dialed {
// we haven't dialed this address. update the ad.ctx to have simultaneous connect values
// set correctly
// We are tracking a dial to this address but we haven't dialled it already.
// If the new request is a holepunching request, update the context and the element in the
// dial queue
if isSimConnect, isClient, reason := network.GetSimultaneousConnect(req.ctx); isSimConnect {
if wasSimConnect, _, _ := network.GetSimultaneousConnect(ad.ctx); !wasSimConnect {
ad.ctx = network.WithSimultaneousConnect(ad.ctx, isClient, reason)
Expand Down Expand Up @@ -433,16 +433,19 @@ func (w *dialWorker) cleanup() {
if w.s.metricsTracer != nil {
w.s.metricsTracer.DialCompleted(w.connected, w.totalDials)
}
for w.dialsInFlight > 0 {
for len(w.dialsInFlight) > 0 {
res := <-w.resch
if res.Kind != DialFailed && res.Kind != DialSuccessful {
continue
}
// We're recording any error as a failure here.
// Notably, this also applies to cancelations (i.e. if another dial attempt was faster).
// This is ok since the black hole detector uses a very low threshold (5%).
w.s.bhd.RecordResult(res.Addr, res.Err == nil)
if res.Conn != nil {
res.Conn.Close()
}
w.dialsInFlight--
delete(w.dialsInFlight, string(res.Addr.Bytes()))
}
}

Expand Down

0 comments on commit 6b39900

Please sign in to comment.