Skip to content

Commit

Permalink
make WriteTo non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
aldernero authored and julienduchesne committed Oct 2, 2024
1 parent b69ac1b commit 92c9075
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type TCPTransportConfig struct {
// Timeout for writing packet data. Zero = no timeout.
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"`

// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
// Maximum number of concurrent writes to other nodes.
MaxConcurrentWrites int `yaml:"max_concurrent_writes" category:"advanced"`

// Transport logs lots of messages at debug level, so it deserves an extra flag for turning it on
TransportDebug bool `yaml:"-" category:"advanced"`

// Where to put custom metrics. nil = don't register.
Expand All @@ -73,6 +76,7 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s
f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.")
f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 2*time.Second, "Timeout used when connecting to other nodes to send packet.")
f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.")
f.IntVar(&cfg.MaxConcurrentWrites, prefix+"memberlist.max-concurrent-writes", 1, "Maximum number of concurrent writes to other nodes.")
f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.")

f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.")
Expand All @@ -88,6 +92,7 @@ type TCPTransport struct {
packetCh chan *memberlist.Packet
connCh chan net.Conn
wg sync.WaitGroup
writeCh chan struct{}
tcpListeners []net.Listener
tlsConfig *tls.Config

Expand Down Expand Up @@ -124,6 +129,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer pr
logger: log.With(logger, "component", "memberlist TCPTransport"),
packetCh: make(chan *memberlist.Packet),
connCh: make(chan net.Conn),
writeCh: make(chan struct{}, config.MaxConcurrentWrites),
}

var err error
Expand Down Expand Up @@ -426,7 +432,15 @@ func (t *TCPTransport) getAdvertisedAddr() string {
func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) {
t.sentPackets.Inc()
t.sentPacketsBytes.Add(float64(len(b)))
t.writeCh <- struct{}{}
go func() {
defer func() { <-t.writeCh }()
t.writeToAsync(b, addr)
}()
return time.Now(), nil
}

func (t *TCPTransport) writeToAsync(b []byte, addr string) {
err := t.writeTo(b, addr)
if err != nil {
t.sentPacketsErrors.Inc()
Expand All @@ -441,10 +455,7 @@ func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) {

// WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors,
// but memberlist library doesn't seem to cope with that very well. That is why we return nil instead.
return time.Now(), nil
}

return time.Now(), nil
}

func (t *TCPTransport) writeTo(b []byte, addr string) error {
Expand Down

0 comments on commit 92c9075

Please sign in to comment.