Skip to content

Commit

Permalink
Attempt to connect to leader via 10 servers simultaneously
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnrznn committed Mar 17, 2024
1 parent dfa1d3a commit 20a5e49
Showing 1 changed file with 87 additions and 41 deletions.
128 changes: 87 additions & 41 deletions internal/protocol/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"io"
"net"
"sort"
"sync"
"time"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/logging"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

// DialFunc is a function that can be used to establish a network connection.
Expand Down Expand Up @@ -126,56 +128,100 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
return servers[i].Role < servers[j].Role
})

ctx, cancel := context.WithCancel(ctx)
defer cancel()

sem := semaphore.NewWeighted(10)

protocolChan := make(chan *Protocol)

wg := &sync.WaitGroup{}
wg.Add(len(servers))

go func() {
wg.Wait()
close(protocolChan)
}()

// Make an attempt for each address until we find the leader.
for _, server := range servers {
log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}
go func(server NodeInfo, pc chan<- *Protocol) {
defer wg.Done()

ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return
}
defer sem.Release(1)

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
continue
}
if protocol != nil {
// We found the leader
log(logging.Debug, "connected")
return protocol, nil
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
continue
}
if ctx.Err() != nil {
return
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
log(logging.Debug, "connect to reported leader %s", leader)
log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
log(logging.Warn, "reported leader unavailable err=%v", err)
continue
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
log(logging.Warn, "reported leader server is not the leader")
protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
return
}
if protocol != nil {
// We found the leader
log(logging.Debug, "connected")
pc <- protocol
return
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
return
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
log(logging.Debug, "connect to reported leader %s", leader)

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
log(logging.Warn, "reported leader unavailable err=%v", err)
return
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
log(logging.Warn, "reported leader server is not the leader")
return
}
log(logging.Debug, "connected")
pc <- protocol
}(server, protocolChan)
}

// Read from protocol chan, cancel context\
var protocol *Protocol
for pc := range protocolChan {
if protocol != nil {
pc.Close()
continue
}
log(logging.Debug, "connected")
protocol = pc
cancel()
}

if protocol != nil {
return protocol, nil
}

Expand Down

0 comments on commit 20a5e49

Please sign in to comment.