Skip to content

Commit

Permalink
Memberlist: support for debouncing notifications (#592)
Browse files Browse the repository at this point in the history
* Adds a layer of buffering to Memberlist's notification handling so that notifications are fired at most once per NotifyInterval, at which point it will deliver notifications using the most recently-observed data.
* Adds a new config flag to control this interval: -memberlist.notify-interval which defaults to 0 (off).
Motivation for this change:

In clusters where the memberlist KVStore watched by Ring has many replicas, redeploying those replicas can cause WatchKey and updateRingState to be called hundreds of times per second. When there are many concurrent goroutines calling ring.ShuffleShard, the high rate of updateRingState calls (which take locks and clear caches) can create heavy lock contention and latency as ShuffleShard attempts to take locks in order to repopulate those caches.
  • Loading branch information
seizethedave authored Oct 11, 2024
1 parent a36b0cc commit 8e7752e
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 15 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `(<reason>)` suffix to the error. #514
* [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520
* [CHANGE] Updated the minimum required Go version to 1.21. #540
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539
* [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585
* [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564
Expand Down Expand Up @@ -234,7 +235,7 @@
* [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591
* [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601
* [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
88 changes: 77 additions & 11 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type KVConfig struct {
GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"`
DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"`
EnableCompression bool `yaml:"compression_enabled" category:"advanced"`
NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"`

// ip:port to advertise other cluster members. Used for NAT traversal
AdvertiseAddr string `yaml:"advertise_addr"`
Expand Down Expand Up @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.")
f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.")
f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.")
f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.")
f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.")
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
Expand Down Expand Up @@ -251,6 +253,10 @@ type KV struct {
watchers map[string][]chan string
prefixWatchers map[string][]chan string

// Delayed notifications for watchers
notifMu sync.Mutex
keyNotifications map[string]struct{}

// Buffers with sent and received messages. Used for troubleshooting only.
// New messages are appended, old messages (based on configured size limit) removed from the front.
messagesMu sync.Mutex
Expand Down Expand Up @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
keyNotifications: make(map[string]struct{}),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
}

mlkv.createAndRegisterMetrics()
Expand Down Expand Up @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error {
return errFailedToJoinCluster
}

if m.cfg.NotifyInterval > 0 {
// Start delayed key notifications.
notifTicker := time.NewTicker(m.cfg.NotifyInterval)
defer notifTicker.Stop()
go m.monitorKeyNotifications(ctx, notifTicker.C)
}

var tickerChan <-chan time.Time
if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 {
t := time.NewTicker(m.cfg.RejoinInterval)
Expand Down Expand Up @@ -905,7 +919,59 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st
}
}

// notifyWatchers sends notification to all watchers of given key. If delay is
// enabled, it accumulates them for later sending.
func (m *KV) notifyWatchers(key string) {
if m.cfg.NotifyInterval <= 0 {
m.notifyWatchersSync(key)
return
}

m.notifMu.Lock()
defer m.notifMu.Unlock()
m.keyNotifications[key] = struct{}{}
}

// monitorKeyNotifications sends accumulated notifications to all watchers of
// respective keys when the given channel ticks.
func (m *KV) monitorKeyNotifications(ctx context.Context, tickChan <-chan time.Time) {
if m.cfg.NotifyInterval <= 0 {
panic("sendNotifications called with NotifyInterval <= 0")
}

for {
select {
case <-tickChan:
m.sendKeyNotifications()
case <-ctx.Done():
return
}
}
}

// sendKeyNotifications sends accumulated notifications to watchers of respective keys.
func (m *KV) sendKeyNotifications() {
newNotifs := func() map[string]struct{} {
// Grab and clear accumulated notifications.
m.notifMu.Lock()
defer m.notifMu.Unlock()

if len(m.keyNotifications) == 0 {
return nil
}
newMap := make(map[string]struct{})
notifs := m.keyNotifications
m.keyNotifications = newMap
return notifs
}

for key := range newNotifs() {
m.notifyWatchersSync(key)
}
}

// notifyWatcherSync immediately sends notification to all watchers of given key.
func (m *KV) notifyWatchersSync(key string) {
m.watchersMu.Lock()
defer m.watchersMu.Unlock()

Expand Down
118 changes: 117 additions & 1 deletion kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func getLocalhostAddrs() []string {
func TestBasicGetAndCas(t *testing.T) {
c := dataCodec{}

name := "Ing 1"
var cfg KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = TCPTransportConfig{
Expand All @@ -278,6 +277,7 @@ func TestBasicGetAndCas(t *testing.T) {
}

// Create member in PENDING state, with some tokens
name := "Ing 1"
err = cas(kv, key, updateFn(name))
require.NoError(t, err)

Expand Down Expand Up @@ -1803,3 +1803,119 @@ func marshalState(t *testing.T, kvps ...*KeyValuePair) []byte {

return buf.Bytes()
}

func TestNotificationDelay(t *testing.T) {
cfg := KVConfig{}
// We're going to trigger sends manually, so effectively disable the automatic send interval.
const hundredYears = 100 * 365 * 24 * time.Hour
cfg.NotifyInterval = hundredYears
kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())

watchChan := make(chan string, 16)

// Add ourselves as a watcher.
kv.watchersMu.Lock()
kv.watchers["foo_123"] = append(kv.watchers["foo_123"], watchChan)
kv.watchers["foo_124"] = append(kv.watchers["foo_124"], watchChan)
kv.watchersMu.Unlock()

defer func() {
kv.watchersMu.Lock()
removeWatcherChannel("foo_123", watchChan, kv.watchers)
removeWatcherChannel("foo_124", watchChan, kv.watchers)
kv.watchersMu.Unlock()
}()

verifyNotifs := func(expected map[string]int, comment string) {
observed := make(map[string]int, len(expected))
for kk := range expected {
observed[kk] = 0
}
loop:
for {
select {
case k := <-watchChan:
observed[k]++
default:
break loop
}
}
require.Equal(t, expected, observed, comment)
}

drainChan := func() {
for {
select {
case <-watchChan:
default:
return
}
}
}

kv.notifyWatchers("foo_123")
kv.sendKeyNotifications()
verifyNotifs(map[string]int{"foo_123": 1}, "1 change 1 notification")

// Test coalescing of updates.
drainChan()
verifyNotifs(map[string]int{"foo_123": 0}, "chan drained")
kv.notifyWatchers("foo_123")
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
kv.notifyWatchers("foo_123")
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
kv.notifyWatchers("foo_123")
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
kv.notifyWatchers("foo_123")
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
kv.notifyWatchers("foo_123")
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
kv.notifyWatchers("foo_123")
verifyNotifs(map[string]int{"foo_123": 0}, "no flush -> no watcher notification")
kv.sendKeyNotifications()
verifyNotifs(map[string]int{"foo_123": 1}, "flush should coalesce updates")

// multiple buffered updates
drainChan()
verifyNotifs(map[string]int{"foo_123": 0}, "chan drained")
kv.notifyWatchers("foo_123")
kv.sendKeyNotifications()
kv.notifyWatchers("foo_123")
kv.sendKeyNotifications()
verifyNotifs(map[string]int{"foo_123": 2}, "two buffered updates")

// multiple keys
drainChan()
kv.notifyWatchers("foo_123")
kv.notifyWatchers("foo_124")
kv.sendKeyNotifications()
verifyNotifs(map[string]int{"foo_123": 1, "foo_124": 1}, "2 changes 2 notifications")
kv.sendKeyNotifications()
verifyNotifs(map[string]int{"foo_123": 0, "foo_124": 0}, "no new notifications")

// sendKeyNotifications can be called repeatedly without new updates.
kv.sendKeyNotifications()
kv.sendKeyNotifications()
kv.sendKeyNotifications()
kv.sendKeyNotifications()

// Finally, exercise the monitor method.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tick := make(chan time.Time)
go kv.monitorKeyNotifications(ctx, tick)
kv.notifyWatchers("foo_123")
tick <- time.Now()

require.Eventually(t, func() bool {
select {
case k := <-watchChan:
if k != "foo_123" {
panic(fmt.Sprintf("unexpected key: %s", k))
}
return true
default: // nothing yet.
return false
}
}, 20*time.Second, 100*time.Millisecond)
}
4 changes: 2 additions & 2 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ type Ring struct {
// Number of registered instances per zone.
instancesCountPerZone map[string]int

// Nubmber of registered instances with tokens per zone.
// Number of registered instances with tokens per zone.
instancesWithTokensCountPerZone map[string]int

// Number of registered instances are writable and have tokens.
writableInstancesWithTokensCount int

// Nubmber of registered instances with tokens per zone that are writable.
// Number of registered instances with tokens per zone that are writable.
writableInstancesWithTokensCountPerZone map[string]int

// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
Expand Down

0 comments on commit 8e7752e

Please sign in to comment.