Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memberlist: Use separate queue for CAS messages and only wait for CAS messages queue to be empty when stopping #539

Merged
merged 7 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@
* [ENHANCEMENT] SpanProfiler: do less work on unsampled traces. #528
* [ENHANCEMENT] Log Middleware: if the trace is not sampled, log its ID as `trace_id_unsampled` instead of `trace_id`. #529
* [EHNANCEMENT] httpgrpc: httpgrpc Server can now use error message from special HTTP header when converting HTTP response to an error. This is useful when HTTP response body contains binary data that doesn't form valid utf-8 string, otherwise grpc would fail to marshal returned error. #531
* [ENHANCEMENT] memberlist: use separate queue for broadcast messages that are result of CAS updates, and prioritize CAS update messages when sending broadcasts. On stopping, only wait for CAS updates queue to be empty. #539
* [ENHANCEMENT] memberlist: Added `-<prefix>memberlist.broadcast-timeout-for-cas-updates-on-shutdown` option to set timeout for sending CAS updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
109 changes: 66 additions & 43 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ type KVConfig struct {
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"`

// Timeout used when leaving the memberlist cluster.
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
BroadcastTimeoutForCasUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout__for_cas_updates_on_shutdown" category:"advanced"`

// How much space to use to keep received and sent messages in memory (for troubleshooting).
MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"`
Expand Down Expand Up @@ -198,6 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.")
f.DurationVar(&cfg.BroadcastTimeoutForCasUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-cas-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining CAS updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to CAS updates, not other kind of broadcast messages. 0 = no timeout, wait until all CAS updates are sent.")

cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix)
}
Expand Down Expand Up @@ -231,10 +233,11 @@ type KV struct {
// dns discovery provider
provider DNSProvider

// Protects access to memberlist and broadcasts fields.
delegateReady atomic.Bool
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue
// Protects access to memberlist and gossipBroadcasts fields.
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
delegateReady atomic.Bool
memberlist *memberlist.Memberlist
casBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally
gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes

// KV Store.
storeMu sync.Mutex
Expand Down Expand Up @@ -456,7 +459,11 @@ func (m *KV) starting(ctx context.Context) error {
}
// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
m.casBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: mlCfg.RetransmitMult,
}
m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: mlCfg.RetransmitMult,
}
Expand Down Expand Up @@ -719,20 +726,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string {
func (m *KV) stopping(_ error) error {
level.Info(m.logger).Log("msg", "leaving memberlist cluster")

// Wait until broadcast queue is empty, but don't wait for too long.
// Wait until CAS broadcast queue is empty, but don't wait for too long.
// Also don't wait if there is just one node left.
// Problem is that broadcast queue is also filled up by state changes received from other nodes,
// so it may never be empty in a busy cluster. However, we generally only care about messages
// generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able
// to get out in this timeout.
// Once we enter Stopping state, we don't queue any more CAS messages.

waitTimeout := time.Now().Add(10 * time.Second)
for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) {
deadline := time.Now().Add(m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown)

msgs := m.casBroadcasts.NumQueued()
nodes := m.memberlist.NumMembers()
for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) {
level.Info(m.logger).Log("msg", "waiting for CAS broadcast messages to be sent out", "count", msgs, "nodes", nodes)
time.Sleep(250 * time.Millisecond)

msgs = m.casBroadcasts.NumQueued()
nodes = m.memberlist.NumMembers()
}

if cnt := m.broadcasts.NumQueued(); cnt > 0 {
level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers())
if msgs > 0 {
level.Warn(m.logger).Log("msg", "broadcast messages left in CAS queue", "count", msgs, "nodes", nodes)
}

err := m.memberlist.Leave(m.cfg.LeaveTimeout)
Expand Down Expand Up @@ -972,11 +983,7 @@ outer:
m.casSuccesses.Inc()
m.notifyWatchers(key)

if m.State() == services.Running {
m.broadcastNewValue(key, change, newver, codec)
} else {
level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
}
m.broadcastNewValue(key, change, newver, codec, true)
}

return nil
Expand Down Expand Up @@ -1034,7 +1041,12 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{})
return change, newver, retry, nil
}

func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) {
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, isCas bool) {
if isCas && m.State() != services.Running {
level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
return
}

data, err := codec.Encode(change)
if err != nil {
level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err)
Expand All @@ -1058,7 +1070,25 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
Changes: change.MergeContent(),
})

m.queueBroadcast(key, change.MergeContent(), version, pairData)
l := len(pairData)
b := ringBroadcast{
key: key,
content: change.MergeContent(),
version: version,
msg: pairData,
finished: func(ringBroadcast) {
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
},
logger: m.logger,
}

m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))

if isCas {
m.casBroadcasts.QueueBroadcast(b)
} else {
m.gossipBroadcasts.QueueBroadcast(b)
}
}

// NodeMeta is method from Memberlist Delegate interface
Expand Down Expand Up @@ -1153,7 +1183,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
m.notifyWatchers(key)

// Don't resend original message, but only changes.
m.broadcastNewValue(key, mod, version, update.codec)
m.broadcastNewValue(key, mod, version, update.codec, false)
}

case <-m.shutdown:
Expand All @@ -1163,32 +1193,25 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
}
}

func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inlined into broadcastNewValue

l := len(message)

b := ringBroadcast{
key: key,
content: content,
version: version,
msg: message,
finished: func(ringBroadcast) {
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
},
logger: m.logger,
}

m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))
m.broadcasts.QueueBroadcast(b)
}

// GetBroadcasts is method from Memberlist Delegate interface
// It returns all pending broadcasts (within the size limit)
func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
if !m.delegateReady.Load() {
return nil
}

return m.broadcasts.GetBroadcasts(overhead, limit)
// Prioritize CAS queue
msgs := m.casBroadcasts.GetBroadcasts(overhead, limit)

// Decrease limit for each message we got from CAS broadcasts.
for _, m := range msgs {
limit -= overhead + len(m)
}

if limit > 0 {
msgs = append(msgs, m.gossipBroadcasts.GetBroadcasts(overhead, limit)...)
}
return msgs
}

// LocalState is method from Memberlist Delegate interface
Expand Down Expand Up @@ -1335,7 +1358,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
} else if newver > 0 {
m.notifyWatchers(kvPair.Key)
m.broadcastNewValue(kvPair.Key, change, newver, codec)
m.broadcastNewValue(kvPair.Key, change, newver, codec, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These messages we're persisting to the local KV that come from external broadcasts are non-CAS by definition? (Only the local mutations are called CAS?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Currently only CAS operation can modify KV store. There is also Delete operation in KV client interface, but memberlist implementation doesn't support it yet. But we should implement it eventually, so perhaps it would be better to call it "local updates", instead of "cas updates". WDYT? I think I'll rename it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! CAS is an implementation detail and doesn't really explain that it's a local mod.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this change in 979c031. I've also updated memberlist_client_messages_in_broadcast_queue metric to report values for both queues individually.

}
}

Expand Down
51 changes: 50 additions & 1 deletion kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen

startTime := time.Now()
firstKv := clients[0]
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3/2) // Watch for 1.5 cas intervals.
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3) // Watch for 3x cas intervals.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated, only done to avoid test flakiness.

updates := 0
firstKv.WatchKey(ctx, key, func(in interface{}) bool {
updates++
Expand Down Expand Up @@ -1647,3 +1647,52 @@ func (p *delayedDNSProviderMock) Resolve(_ context.Context, addrs []string) erro
func (p delayedDNSProviderMock) Addresses() []string {
return p.resolved
}

func TestGetBroadcastsPrefersCASUpdates(t *testing.T) {
codec := dataCodec{}

cfg := KVConfig{
TCPTransport: TCPTransportConfig{
BindAddrs: getLocalhostAddrs(),
},
}

// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
cfg.RetransmitMult = 1
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

now := time.Now()

smallUpdate := &data{Members: map[string]member{"a": {Timestamp: now.Unix(), State: JOINING}}}
bigUpdate := &data{Members: map[string]member{"b": {Timestamp: now.Unix(), State: JOINING}, "c": {Timestamp: now.Unix(), State: JOINING}, "d": {Timestamp: now.Unix(), State: JOINING}}}

// No broadcast messages from KV at the beginning.
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))

// Check that CAS broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue.
kv.broadcastNewValue("notcas", smallUpdate, 1, codec, false)
kv.broadcastNewValue("notcas", bigUpdate, 2, codec, false)
kv.broadcastNewValue("cas", smallUpdate, 1, codec, true)
kv.broadcastNewValue("cas", bigUpdate, 2, codec, true)

msgs := kv.GetBroadcasts(0, 10000)
require.Len(t, msgs, 4) // we get all 4 messages
require.Equal(t, "cas", getKey(t, msgs[0]))
require.Equal(t, "cas", getKey(t, msgs[1]))
require.Equal(t, "notcas", getKey(t, msgs[2]))
require.Equal(t, "notcas", getKey(t, msgs[3]))
// Check that TransmitLimitedQueue.GetBroadcasts preferred larger messages (it does that).
require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger CAS message is returned before smaller one
require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-CAS message is returned before smaller one
}

func getKey(t *testing.T, msg []byte) string {
kvPair := KeyValuePair{}
err := kvPair.Unmarshal(msg)
require.NoError(t, err)
return kvPair.Key
}
4 changes: 2 additions & 2 deletions kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (m *KV) createAndRegisterMetrics() {
Name: "messages_in_broadcast_queue",
Help: "Number of user messages in the broadcast queue",
}, func() float64 {
// m.broadcasts is not set before Starting state
// Queues are not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.broadcasts.NumQueued())
return float64(m.gossipBroadcasts.NumQueued()) + float64(m.casBroadcasts.NumQueued())
}
return 0
})
Expand Down