From 20a5e49a343b5ae00b6e28fe724f393b41c1624e Mon Sep 17 00:00:00 2001 From: Connor Zanin Date: Sat, 16 Mar 2024 20:58:01 -0600 Subject: [PATCH 1/2] Attempt to connect to leader via 10 servers simultaneously --- internal/protocol/connector.go | 128 ++++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 41 deletions(-) diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 3195008f..7fa23700 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -7,6 +7,7 @@ import ( "io" "net" "sort" + "sync" "time" "github.com/Rican7/retry" @@ -14,6 +15,7 @@ import ( "github.com/Rican7/retry/strategy" "github.com/canonical/go-dqlite/logging" "github.com/pkg/errors" + "golang.org/x/sync/semaphore" ) // DialFunc is a function that can be used to establish a network connection. @@ -126,56 +128,100 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P return servers[i].Role < servers[j].Role }) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sem := semaphore.NewWeighted(10) + + protocolChan := make(chan *Protocol) + + wg := &sync.WaitGroup{} + wg.Add(len(servers)) + + go func() { + wg.Wait() + close(protocolChan) + }() + // Make an attempt for each address until we find the leader. for _, server := range servers { - log := func(l logging.Level, format string, a ...interface{}) { - format = fmt.Sprintf("server %s: ", server.Address) + format - log(l, format, a...) - } + go func(server NodeInfo, pc chan<- *Protocol) { + defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() + if err := sem.Acquire(ctx, 1); err != nil { + return + } + defer sem.Release(1) - protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log) - if err != nil { - // This server is unavailable, try with the next target. - log(logging.Warn, err.Error()) - continue - } - if protocol != nil { - // We found the leader - log(logging.Debug, "connected") - return protocol, nil - } - if leader == "" { - // This server does not know who the current leader is, - // try with the next target. - log(logging.Warn, "no known leader") - continue - } + if ctx.Err() != nil { + return + } - // If we get here, it means this server reported that another - // server is the leader, let's close the connection to this - // server and try with the suggested one. - log(logging.Debug, "connect to reported leader %s", leader) + log := func(l logging.Level, format string, a ...interface{}) { + format = fmt.Sprintf("server %s: ", server.Address) + format + log(l, format, a...) + } - ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) - defer cancel() + ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() - protocol, _, err = c.connectAttemptOne(ctx, leader, log) - if err != nil { - // The leader reported by the previous server is - // unavailable, try with the next target. - log(logging.Warn, "reported leader unavailable err=%v", err) - continue - } - if protocol == nil { - // The leader reported by the target server does not consider itself - // the leader, try with the next target. - log(logging.Warn, "reported leader server is not the leader") + protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log) + if err != nil { + // This server is unavailable, try with the next target. + log(logging.Warn, err.Error()) + return + } + if protocol != nil { + // We found the leader + log(logging.Debug, "connected") + pc <- protocol + return + } + if leader == "" { + // This server does not know who the current leader is, + // try with the next target. + log(logging.Warn, "no known leader") + return + } + + // If we get here, it means this server reported that another + // server is the leader, let's close the connection to this + // server and try with the suggested one. + log(logging.Debug, "connect to reported leader %s", leader) + + ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout) + defer cancel() + + protocol, _, err = c.connectAttemptOne(ctx, leader, log) + if err != nil { + // The leader reported by the previous server is + // unavailable, try with the next target. + log(logging.Warn, "reported leader unavailable err=%v", err) + return + } + if protocol == nil { + // The leader reported by the target server does not consider itself + // the leader, try with the next target. + log(logging.Warn, "reported leader server is not the leader") + return + } + log(logging.Debug, "connected") + pc <- protocol + }(server, protocolChan) + } + + // Read from protocol chan, cancel context\ + var protocol *Protocol + for pc := range protocolChan { + if protocol != nil { + pc.Close() continue } - log(logging.Debug, "connected") + protocol = pc + cancel() + } + + if protocol != nil { return protocol, nil } From 018812d81a2f336cf9f6cf40a547fd16e68f86af Mon Sep 17 00:00:00 2001 From: Connor Zanin Date: Thu, 21 Mar 2024 11:29:41 -0400 Subject: [PATCH 2/2] More readable connection collection --- internal/protocol/connector.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/protocol/connector.go b/internal/protocol/connector.go index 7fa23700..c8e3ed7a 100644 --- a/internal/protocol/connector.go +++ b/internal/protocol/connector.go @@ -210,22 +210,19 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P }(server, protocolChan) } - // Read from protocol chan, cancel context\ - var protocol *Protocol - for pc := range protocolChan { - if protocol != nil { - pc.Close() - continue - } - protocol = pc - cancel() + // Read from protocol chan, cancel context + protocol, ok := <-protocolChan + if !ok { + return nil, ErrNoAvailableLeader } - if protocol != nil { - return protocol, nil + cancel() + + for extra := range protocolChan { + extra.Close() } - return nil, ErrNoAvailableLeader + return protocol, nil } // Perform the initial handshake using the given protocol version.