From 3e1e0d36716c74df05a239d8e452094c6b3ef0da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 3 Jul 2024 10:37:15 +0200 Subject: [PATCH 1/5] Add message queue for CAS messages only. Make wait timeout on shutdown configurable. --- kv/memberlist/memberlist_client.go | 109 ++++++++++++++---------- kv/memberlist/memberlist_client_test.go | 49 +++++++++++ kv/memberlist/metrics.go | 4 +- 3 files changed, 117 insertions(+), 45 deletions(-) diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index a1b659d40..af486903b 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -157,7 +157,8 @@ type KVConfig struct { LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + BroadcastTimeoutForCasUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout__for_cas_updates_on_shutdown" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` @@ -198,6 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") + f.DurationVar(&cfg.BroadcastTimeoutForCasUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-cas-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining CAS updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to CAS updates, not other kind of broadcast messages. 0 = no timeout, wait until all CAS updates are sent.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -231,10 +233,11 @@ type KV struct { // dns discovery provider provider DNSProvider - // Protects access to memberlist and broadcasts fields. - delegateReady atomic.Bool - memberlist *memberlist.Memberlist - broadcasts *memberlist.TransmitLimitedQueue + // Protects access to memberlist and gossipBroadcasts fields. + delegateReady atomic.Bool + memberlist *memberlist.Memberlist + casBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally + gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes // KV Store. storeMu sync.Mutex @@ -456,7 +459,11 @@ func (m *KV) starting(ctx context.Context) error { } // Finish delegate initialization. m.memberlist = list - m.broadcasts = &memberlist.TransmitLimitedQueue{ + m.casBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: list.NumMembers, + RetransmitMult: mlCfg.RetransmitMult, + } + m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, RetransmitMult: mlCfg.RetransmitMult, } @@ -719,20 +726,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string { func (m *KV) stopping(_ error) error { level.Info(m.logger).Log("msg", "leaving memberlist cluster") - // Wait until broadcast queue is empty, but don't wait for too long. + // Wait until CAS broadcast queue is empty, but don't wait for too long. // Also don't wait if there is just one node left. - // Problem is that broadcast queue is also filled up by state changes received from other nodes, - // so it may never be empty in a busy cluster. However, we generally only care about messages - // generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able - // to get out in this timeout. + // Once we enter Stopping state, we don't queue any more CAS messages. - waitTimeout := time.Now().Add(10 * time.Second) - for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) { + deadline := time.Now().Add(m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown) + + msgs := m.casBroadcasts.NumQueued() + nodes := m.memberlist.NumMembers() + for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { + level.Info(m.logger).Log("msg", "waiting for CAS broadcast messages to be sent out", "count", msgs, "nodes", nodes) time.Sleep(250 * time.Millisecond) + + msgs = m.casBroadcasts.NumQueued() + nodes = m.memberlist.NumMembers() } - if cnt := m.broadcasts.NumQueued(); cnt > 0 { - level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers()) + if msgs > 0 { + level.Warn(m.logger).Log("msg", "broadcast messages left in CAS queue", "count", msgs, "nodes", nodes) } err := m.memberlist.Leave(m.cfg.LeaveTimeout) @@ -972,11 +983,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - if m.State() == services.Running { - m.broadcastNewValue(key, change, newver, codec) - } else { - level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) - } + m.broadcastNewValue(key, change, newver, codec, true) } return nil @@ -1034,7 +1041,12 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) return change, newver, retry, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) { +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, isCas bool) { + if isCas && m.State() != services.Running { + level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) + return + } + data, err := codec.Encode(change) if err != nil { level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) @@ -1058,7 +1070,25 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec Changes: change.MergeContent(), }) - m.queueBroadcast(key, change.MergeContent(), version, pairData) + l := len(pairData) + b := ringBroadcast{ + key: key, + content: change.MergeContent(), + version: version, + msg: pairData, + finished: func(ringBroadcast) { + m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) + }, + logger: m.logger, + } + + m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) + + if isCas { + m.casBroadcasts.QueueBroadcast(b) + } else { + m.gossipBroadcasts.QueueBroadcast(b) + } } // NodeMeta is method from Memberlist Delegate interface @@ -1153,7 +1183,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { m.notifyWatchers(key) // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec) + m.broadcastNewValue(key, mod, version, update.codec, false) } case <-m.shutdown: @@ -1163,24 +1193,6 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { } } -func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) { - l := len(message) - - b := ringBroadcast{ - key: key, - content: content, - version: version, - msg: message, - finished: func(ringBroadcast) { - m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) - }, - logger: m.logger, - } - - m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) - m.broadcasts.QueueBroadcast(b) -} - // GetBroadcasts is method from Memberlist Delegate interface // It returns all pending broadcasts (within the size limit) func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { @@ -1188,7 +1200,18 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { return nil } - return m.broadcasts.GetBroadcasts(overhead, limit) + // Prioritize CAS queue + msgs := m.casBroadcasts.GetBroadcasts(overhead, limit) + + // Decrease limit for each message we got from CAS broadcasts. + for _, m := range msgs { + limit -= overhead + len(m) + } + + if limit > 0 { + msgs = append(msgs, m.gossipBroadcasts.GetBroadcasts(overhead, limit)...) + } + return msgs } // LocalState is method from Memberlist Delegate interface @@ -1335,7 +1358,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false) } } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 383471c5b..729cfb7ce 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1647,3 +1647,52 @@ func (p *delayedDNSProviderMock) Resolve(_ context.Context, addrs []string) erro func (p delayedDNSProviderMock) Addresses() []string { return p.resolved } + +func TestGetBroadcastsPrefersCASUpdates(t *testing.T) { + codec := dataCodec{} + + cfg := KVConfig{ + TCPTransport: TCPTransportConfig{ + BindAddrs: getLocalhostAddrs(), + }, + } + + // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. + cfg.RetransmitMult = 1 + cfg.Codecs = append(cfg.Codecs, codec) + + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) + defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck + + now := time.Now() + + smallUpdate := &data{Members: map[string]member{"a": {Timestamp: now.Unix(), State: JOINING}}} + bigUpdate := &data{Members: map[string]member{"b": {Timestamp: now.Unix(), State: JOINING}, "c": {Timestamp: now.Unix(), State: JOINING}, "d": {Timestamp: now.Unix(), State: JOINING}}} + + // No broadcast messages from KV at the beginning. + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + // Check that CAS broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. + kv.broadcastNewValue("notcas", smallUpdate, 1, codec, false) + kv.broadcastNewValue("notcas", bigUpdate, 2, codec, false) + kv.broadcastNewValue("cas", smallUpdate, 1, codec, true) + kv.broadcastNewValue("cas", bigUpdate, 2, codec, true) + + msgs := kv.GetBroadcasts(0, 10000) + require.Len(t, msgs, 4) // we get all 4 messages + require.Equal(t, "cas", getKey(t, msgs[0])) + require.Equal(t, "cas", getKey(t, msgs[1])) + require.Equal(t, "notcas", getKey(t, msgs[2])) + require.Equal(t, "notcas", getKey(t, msgs[3])) + // Check that TransmitLimitedQueue.GetBroadcasts preferred larger messages (it does that). + require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger CAS message is returned before smaller one + require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-CAS message is returned before smaller one +} + +func getKey(t *testing.T, msg []byte) string { + kvPair := KeyValuePair{} + err := kvPair.Unmarshal(msg) + require.NoError(t, err) + return kvPair.Key +} diff --git a/kv/memberlist/metrics.go b/kv/memberlist/metrics.go index 75a6b2324..ad4d70fd9 100644 --- a/kv/memberlist/metrics.go +++ b/kv/memberlist/metrics.go @@ -77,9 +77,9 @@ func (m *KV) createAndRegisterMetrics() { Name: "messages_in_broadcast_queue", Help: "Number of user messages in the broadcast queue", }, func() float64 { - // m.broadcasts is not set before Starting state + // Queues are not set before Starting state if m.State() == services.Running || m.State() == services.Stopping { - return float64(m.broadcasts.NumQueued()) + return float64(m.gossipBroadcasts.NumQueued()) + float64(m.casBroadcasts.NumQueued()) } return 0 }) From 6924d2fd47293065980faef73bd40bd2319861e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 3 Jul 2024 10:50:24 +0200 Subject: [PATCH 2/5] CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 79485a0b2..56809974a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -213,6 +213,8 @@ * [ENHANCEMENT] SpanProfiler: do less work on unsampled traces. #528 * [ENHANCEMENT] Log Middleware: if the trace is not sampled, log its ID as `trace_id_unsampled` instead of `trace_id`. #529 * [EHNANCEMENT] httpgrpc: httpgrpc Server can now use error message from special HTTP header when converting HTTP response to an error. This is useful when HTTP response body contains binary data that doesn't form valid utf-8 string, otherwise grpc would fail to marshal returned error. #531 +* [ENHANCEMENT] memberlist: use separate queue for broadcast messages that are result of CAS updates, and prioritize CAS update messages when sending broadcasts. On stopping, only wait for CAS updates queue to be empty. #539 +* [ENHANCEMENT] memberlist: Added `-memberlist.broadcast-timeout-for-cas-updates-on-shutdown` option to set timeout for sending CAS updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 From 1c1f876e6158b773cd9ee8d611a66834939dedc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 3 Jul 2024 12:00:07 +0200 Subject: [PATCH 3/5] Observe ring for longer in testMultipleClientsWithConfigGenerator test, to make it less flaky, --- kv/memberlist/memberlist_client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 729cfb7ce..f998adb2a 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -718,7 +718,7 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen startTime := time.Now() firstKv := clients[0] - ctx, cancel := context.WithTimeout(context.Background(), casInterval*3/2) // Watch for 1.5 cas intervals. + ctx, cancel := context.WithTimeout(context.Background(), casInterval*3) // Watch for 3x cas intervals. updates := 0 firstKv.WatchKey(ctx, key, func(in interface{}) bool { updates++ From 979c0311f20e86684374328c29dd386be86e4b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 17 Jul 2024 10:05:59 +0200 Subject: [PATCH 4/5] Rename "casQueue" to "localQueue". Split messages_in_broadcast_queue metric into two values, one for each queue. --- CHANGELOG.md | 5 +-- kv/memberlist/memberlist_client.go | 47 +++++++++++++------------ kv/memberlist/memberlist_client_test.go | 36 ++++++++++++------- kv/memberlist/metrics.go | 30 ++++++++++++---- 4 files changed, 74 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f9c0acb7..0b57114a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ * [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `()` suffix to the error. #514 * [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520 * [CHANGE] Updated the minimum required Go version to 1.21. #540 +* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539 * [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276 * [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279 * [FEATURE] Add `log.BufferedLogger` type. #338 @@ -214,8 +215,8 @@ * [ENHANCEMENT] SpanProfiler: do less work on unsampled traces. #528 * [ENHANCEMENT] Log Middleware: if the trace is not sampled, log its ID as `trace_id_unsampled` instead of `trace_id`. #529 * [EHNANCEMENT] httpgrpc: httpgrpc Server can now use error message from special HTTP header when converting HTTP response to an error. This is useful when HTTP response body contains binary data that doesn't form valid utf-8 string, otherwise grpc would fail to marshal returned error. #531 -* [ENHANCEMENT] memberlist: use separate queue for broadcast messages that are result of CAS updates, and prioritize CAS update messages when sending broadcasts. On stopping, only wait for CAS updates queue to be empty. #539 -* [ENHANCEMENT] memberlist: Added `-memberlist.broadcast-timeout-for-cas-updates-on-shutdown` option to set timeout for sending CAS updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539 +* [ENHANCEMENT] memberlist: use separate queue for broadcast messages that are result of local updates, and prioritize locally-generated messages when sending broadcasts. On stopping, only wait for queue with locally-generated messages to be empty. #539 +* [ENHANCEMENT] memberlist: Added `-memberlist.broadcast-timeout-for-local-updates-on-shutdown` option to set timeout for sending locally-generated updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539 * [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index af486903b..a7eefe92f 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -157,8 +157,8 @@ type KVConfig struct { LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` - BroadcastTimeoutForCasUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout__for_cas_updates_on_shutdown" category:"advanced"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` @@ -199,7 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") - f.DurationVar(&cfg.BroadcastTimeoutForCasUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-cas-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining CAS updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to CAS updates, not other kind of broadcast messages. 0 = no timeout, wait until all CAS updates are sent.") + f.DurationVar(&cfg.BroadcastTimeoutForLocalUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-local-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -233,10 +233,10 @@ type KV struct { // dns discovery provider provider DNSProvider - // Protects access to memberlist and gossipBroadcasts fields. + // Protects access to memberlist and broadcast queues. delegateReady atomic.Bool memberlist *memberlist.Memberlist - casBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally + localBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes // KV Store. @@ -276,7 +276,8 @@ type KV struct { numberOfPushes prometheus.Counter totalSizeOfPulls prometheus.Counter totalSizeOfPushes prometheus.Counter - numberOfBroadcastMessagesInQueue prometheus.GaugeFunc + numberOfGossipMessagesInQueue prometheus.GaugeFunc + numberOfLocalMessagesInQueue prometheus.GaugeFunc totalSizeOfBroadcastMessagesInQueue prometheus.Gauge numberOfBroadcastMessagesDropped prometheus.Counter casAttempts prometheus.Counter @@ -459,7 +460,7 @@ func (m *KV) starting(ctx context.Context) error { } // Finish delegate initialization. m.memberlist = list - m.casBroadcasts = &memberlist.TransmitLimitedQueue{ + m.localBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, RetransmitMult: mlCfg.RetransmitMult, } @@ -726,24 +727,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string { func (m *KV) stopping(_ error) error { level.Info(m.logger).Log("msg", "leaving memberlist cluster") - // Wait until CAS broadcast queue is empty, but don't wait for too long. + // Wait until queue with locally-generated messages is empty, but don't wait for too long. // Also don't wait if there is just one node left. - // Once we enter Stopping state, we don't queue any more CAS messages. + // Note: Once we enter Stopping state, we don't queue more locally-generated messages. - deadline := time.Now().Add(m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown) + deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown) - msgs := m.casBroadcasts.NumQueued() + msgs := m.localBroadcasts.NumQueued() nodes := m.memberlist.NumMembers() - for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { - level.Info(m.logger).Log("msg", "waiting for CAS broadcast messages to be sent out", "count", msgs, "nodes", nodes) + for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { + level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes) time.Sleep(250 * time.Millisecond) - msgs = m.casBroadcasts.NumQueued() + msgs = m.localBroadcasts.NumQueued() nodes = m.memberlist.NumMembers() } if msgs > 0 { - level.Warn(m.logger).Log("msg", "broadcast messages left in CAS queue", "count", msgs, "nodes", nodes) + level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes) } err := m.memberlist.Leave(m.cfg.LeaveTimeout) @@ -1041,9 +1042,9 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) return change, newver, retry, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, isCas bool) { - if isCas && m.State() != services.Running { - level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { + if locallyGenerated && m.State() != services.Running { + level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) return } @@ -1084,8 +1085,8 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) - if isCas { - m.casBroadcasts.QueueBroadcast(b) + if locallyGenerated { + m.localBroadcasts.QueueBroadcast(b) } else { m.gossipBroadcasts.QueueBroadcast(b) } @@ -1200,10 +1201,10 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { return nil } - // Prioritize CAS queue - msgs := m.casBroadcasts.GetBroadcasts(overhead, limit) + // Prioritize locally-generated messages + msgs := m.localBroadcasts.GetBroadcasts(overhead, limit) - // Decrease limit for each message we got from CAS broadcasts. + // Decrease limit for each message we got from locally-generated broadcasts. for _, m := range msgs { limit -= overhead + len(m) } diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index f998adb2a..d2648e49a 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1648,7 +1648,7 @@ func (p delayedDNSProviderMock) Addresses() []string { return p.resolved } -func TestGetBroadcastsPrefersCASUpdates(t *testing.T) { +func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { codec := dataCodec{} cfg := KVConfig{ @@ -1661,7 +1661,8 @@ func TestGetBroadcastsPrefersCASUpdates(t *testing.T) { cfg.RetransmitMult = 1 cfg.Codecs = append(cfg.Codecs, codec) - kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + reg := prometheus.NewRegistry() + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg) require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck @@ -1673,21 +1674,30 @@ func TestGetBroadcastsPrefersCASUpdates(t *testing.T) { // No broadcast messages from KV at the beginning. require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) - // Check that CAS broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. - kv.broadcastNewValue("notcas", smallUpdate, 1, codec, false) - kv.broadcastNewValue("notcas", bigUpdate, 2, codec, false) - kv.broadcastNewValue("cas", smallUpdate, 1, codec, true) - kv.broadcastNewValue("cas", bigUpdate, 2, codec, true) + // Check that locally-generated broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. + kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false) + kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false) + kv.broadcastNewValue("local", smallUpdate, 1, codec, true) + kv.broadcastNewValue("local", bigUpdate, 2, codec, true) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue + # TYPE memberlist_client_messages_in_broadcast_queue gauge + memberlist_client_messages_in_broadcast_queue{queue="gossip"} 2 + memberlist_client_messages_in_broadcast_queue{queue="local"} 2 + `), "memberlist_client_messages_in_broadcast_queue") + require.NoError(t, err) msgs := kv.GetBroadcasts(0, 10000) require.Len(t, msgs, 4) // we get all 4 messages - require.Equal(t, "cas", getKey(t, msgs[0])) - require.Equal(t, "cas", getKey(t, msgs[1])) - require.Equal(t, "notcas", getKey(t, msgs[2])) - require.Equal(t, "notcas", getKey(t, msgs[3])) + require.Equal(t, "local", getKey(t, msgs[0])) + require.Equal(t, "local", getKey(t, msgs[1])) + require.Equal(t, "non-local", getKey(t, msgs[2])) + require.Equal(t, "non-local", getKey(t, msgs[3])) + // Check that TransmitLimitedQueue.GetBroadcasts preferred larger messages (it does that). - require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger CAS message is returned before smaller one - require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-CAS message is returned before smaller one + require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger local message is returned before smaller one + require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-local message is returned before smaller one } func getKey(t *testing.T, msg []byte) string { diff --git a/kv/memberlist/metrics.go b/kv/memberlist/metrics.go index ad4d70fd9..0f09c5d71 100644 --- a/kv/memberlist/metrics.go +++ b/kv/memberlist/metrics.go @@ -71,15 +71,33 @@ func (m *KV) createAndRegisterMetrics() { Help: "Total size of pulled state", }) - m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: m.cfg.MetricsNamespace, - Subsystem: subsystem, - Name: "messages_in_broadcast_queue", - Help: "Number of user messages in the broadcast queue", + const queueMetricName = "messages_in_broadcast_queue" + const queueMetricHelp = "Number of user messages in the broadcast queue" + + m.numberOfGossipMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: queueMetricName, + Help: queueMetricHelp, + ConstLabels: map[string]string{"queue": "gossip"}, + }, func() float64 { + // Queues are not set before Starting state + if m.State() == services.Running || m.State() == services.Stopping { + return float64(m.gossipBroadcasts.NumQueued()) + } + return 0 + }) + + m.numberOfLocalMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: queueMetricName, + Help: queueMetricHelp, + ConstLabels: map[string]string{"queue": "local"}, }, func() float64 { // Queues are not set before Starting state if m.State() == services.Running || m.State() == services.Stopping { - return float64(m.gossipBroadcasts.NumQueued()) + float64(m.casBroadcasts.NumQueued()) + return float64(m.localBroadcasts.NumQueued()) } return 0 }) From 82bb78a0d42a65126407afdb00fcc507af8e28c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 17 Jul 2024 10:20:36 +0200 Subject: [PATCH 5/5] Add 3 messages to local queue to test metric. --- kv/memberlist/memberlist_client_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index d2648e49a..1f6d60b7d 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -1670,6 +1670,7 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { smallUpdate := &data{Members: map[string]member{"a": {Timestamp: now.Unix(), State: JOINING}}} bigUpdate := &data{Members: map[string]member{"b": {Timestamp: now.Unix(), State: JOINING}, "c": {Timestamp: now.Unix(), State: JOINING}, "d": {Timestamp: now.Unix(), State: JOINING}}} + mediumUpdate := &data{Members: map[string]member{"d": {Timestamp: now.Unix(), State: JOINING}, "e": {Timestamp: now.Unix(), State: JOINING}}} // No broadcast messages from KV at the beginning. require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) @@ -1679,25 +1680,28 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false) kv.broadcastNewValue("local", smallUpdate, 1, codec, true) kv.broadcastNewValue("local", bigUpdate, 2, codec, true) + kv.broadcastNewValue("local", mediumUpdate, 3, codec, true) err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue # TYPE memberlist_client_messages_in_broadcast_queue gauge memberlist_client_messages_in_broadcast_queue{queue="gossip"} 2 - memberlist_client_messages_in_broadcast_queue{queue="local"} 2 + memberlist_client_messages_in_broadcast_queue{queue="local"} 3 `), "memberlist_client_messages_in_broadcast_queue") require.NoError(t, err) msgs := kv.GetBroadcasts(0, 10000) - require.Len(t, msgs, 4) // we get all 4 messages + require.Len(t, msgs, 5) // we get all 4 messages require.Equal(t, "local", getKey(t, msgs[0])) require.Equal(t, "local", getKey(t, msgs[1])) - require.Equal(t, "non-local", getKey(t, msgs[2])) + require.Equal(t, "local", getKey(t, msgs[2])) require.Equal(t, "non-local", getKey(t, msgs[3])) + require.Equal(t, "non-local", getKey(t, msgs[4])) // Check that TransmitLimitedQueue.GetBroadcasts preferred larger messages (it does that). - require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger local message is returned before smaller one - require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-local message is returned before smaller one + require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger local message is returned before medium one. + require.True(t, len(msgs[1]) > len(msgs[2])) // Medium local message is returned before small one. + require.True(t, len(msgs[3]) > len(msgs[4])) // Bigger non-local message is returned before smaller one } func getKey(t *testing.T, msg []byte) string {