Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memberlist: support for debouncing notifications #592

Merged
merged 32 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
11761b2
Ring: Add a buffering interval to observed WatchKey updates.
seizethedave Oct 3, 2024
be934fe
Don't just update once.
seizethedave Oct 3, 2024
c1c8d20
Tuck periodic updater into loop method.
seizethedave Oct 3, 2024
76dd98d
Rm newline.
seizethedave Oct 3, 2024
f8aaa8a
Fix existing tests.
seizethedave Oct 3, 2024
6ae2251
Changelog
seizethedave Oct 3, 2024
7fd4584
Add some tests around update debouncing.
seizethedave Oct 4, 2024
acc3169
Adjust changelog.
seizethedave Oct 4, 2024
4aca6fc
Update CHANGELOG.md
seizethedave Oct 4, 2024
f583a42
Fix tests.
seizethedave Oct 4, 2024
f37aaf8
Validate cfg.UpdateInterval.
seizethedave Oct 4, 2024
e1c121a
Don't mutate the config.
seizethedave Oct 4, 2024
5bee925
Merge remote-tracking branch 'origin/main' into davidgrant/ring-updat…
seizethedave Oct 4, 2024
58dbc8e
merge changelog
seizethedave Oct 7, 2024
56702dd
Move delay bits to updateObserver, default to noDelay.
seizethedave Oct 8, 2024
3f0abaf
Don't need a noDelayObserver.
seizethedave Oct 8, 2024
2840d10
Don't need an interface.
seizethedave Oct 8, 2024
7f73429
Rename observer files.
seizethedave Oct 8, 2024
7ddb44b
observeUpdate -> put; add a basic test for the ticker loop.
seizethedave Oct 8, 2024
7c4de1b
Merge remote-tracking branch 'origin/main' into davidgrant/ring-updat…
seizethedave Oct 8, 2024
12a5324
Default no delay.
seizethedave Oct 8, 2024
8ce282f
merge
seizethedave Oct 10, 2024
b9ca62b
Add debounce to memberlist notification code.
seizethedave Oct 11, 2024
a75be3a
Rm debounce from ring.
seizethedave Oct 11, 2024
4e145cf
undo servicediscovery changes
seizethedave Oct 11, 2024
082a1d0
revise changelog
seizethedave Oct 11, 2024
8e2a4a0
Wait up to 3*casInterval like other tests.
seizethedave Oct 11, 2024
b5df789
Remove CAS/WatchKey layers from notification delay tests.
seizethedave Oct 11, 2024
91d4db9
Exercise monitor method. Remove all unnecessary stuff from test.
seizethedave Oct 11, 2024
ad78368
Doc comment.
seizethedave Oct 11, 2024
3cbd462
didn't mean to commit that bit.
seizethedave Oct 11, 2024
73b2a73
Merge remote-tracking branch 'origin/main' into davidgrant/ring-updat…
seizethedave Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `(<reason>)` suffix to the error. #514
* [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520
* [CHANGE] Updated the minimum required Go version to 1.21. #540
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539
* [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585
* [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564
Expand Down Expand Up @@ -234,7 +235,7 @@
* [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591
* [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601
* [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
82 changes: 71 additions & 11 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type KVConfig struct {
GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"`
DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"`
EnableCompression bool `yaml:"compression_enabled" category:"advanced"`
NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"`

// ip:port to advertise other cluster members. Used for NAT traversal
AdvertiseAddr string `yaml:"advertise_addr"`
Expand Down Expand Up @@ -195,6 +196,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.DeadNodeReclaimTime, prefix+"memberlist.dead-node-reclaim-time", mlDefaults.DeadNodeReclaimTime, "How soon can dead node's name be reclaimed with new address. 0 to disable.")
f.IntVar(&cfg.MessageHistoryBufferBytes, prefix+"memberlist.message-history-buffer-bytes", 0, "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.")
f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.")
f.DurationVar(&cfg.NotifyInterval, prefix+"memberlist.notify-interval", 0, "How frequently to notify watchers when a key changes. Can reduce CPU activity in large memberlist deployments. 0 to notify without delay.")
f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.")
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
Expand Down Expand Up @@ -251,6 +253,10 @@ type KV struct {
watchers map[string][]chan string
prefixWatchers map[string][]chan string

// Delayed notifications for watchers
notifMu sync.Mutex
keyNotifications map[string]struct{}

// Buffers with sent and received messages. Used for troubleshooting only.
// New messages are appended, old messages (based on configured size limit) removed from the front.
messagesMu sync.Mutex
Expand Down Expand Up @@ -359,17 +365,18 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
keyNotifications: make(map[string]struct{}),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
}

mlkv.createAndRegisterMetrics()
Expand Down Expand Up @@ -486,6 +493,13 @@ func (m *KV) running(ctx context.Context) error {
return errFailedToJoinCluster
}

if m.cfg.NotifyInterval > 0 {
// Start delayed key notifications.
notifTicker := time.NewTicker(m.cfg.NotifyInterval)
defer notifTicker.Stop()
go m.sendKeyNotifications(ctx, notifTicker.C)
}

var tickerChan <-chan time.Time
if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 {
t := time.NewTicker(m.cfg.RejoinInterval)
Expand Down Expand Up @@ -905,7 +919,53 @@ func removeWatcherChannel(k string, w chan string, watchers map[string][]chan st
}
}

// notifyWatchers sends notification to all watchers of given key. If delay is
// enabled, it accumulates them for later sending.
func (m *KV) notifyWatchers(key string) {
if m.cfg.NotifyInterval <= 0 {
m.notifyWatchersSync(key)
return
}

m.notifMu.Lock()
defer m.notifMu.Unlock()
m.keyNotifications[key] = struct{}{}
}

// sendKeyNotifications sends accumulated notifications to all watchers of
// respective keys when the given channel ticks.
func (m *KV) sendKeyNotifications(ctx context.Context, tickChan <-chan time.Time) {
if m.cfg.NotifyInterval <= 0 {
panic("sendNotifications called with NotifyInterval <= 0")
}

newNotifs := func() map[string]struct{} {
m.notifMu.Lock()
defer m.notifMu.Unlock()

if len(m.keyNotifications) == 0 {
return nil
}
newMap := make(map[string]struct{})
notifs := m.keyNotifications
m.keyNotifications = newMap
return notifs
}

for {
select {
case <-tickChan:
for key := range newNotifs() {
m.notifyWatchersSync(key)
}
case <-ctx.Done():
return
}
}
}

// notifyWatcherSync immediately sends notification to all watchers of given key.
func (m *KV) notifyWatchersSync(key string) {
m.watchersMu.Lock()
defer m.watchersMu.Unlock()

Expand Down
133 changes: 132 additions & 1 deletion kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func getLocalhostAddrs() []string {
func TestBasicGetAndCas(t *testing.T) {
c := dataCodec{}

name := "Ing 1"
var cfg KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = TCPTransportConfig{
Expand All @@ -278,6 +277,7 @@ func TestBasicGetAndCas(t *testing.T) {
}

// Create member in PENDING state, with some tokens
name := "Ing 1"
err = cas(kv, key, updateFn(name))
require.NoError(t, err)

Expand Down Expand Up @@ -1783,3 +1783,134 @@ func marshalState(t *testing.T, kvps ...*KeyValuePair) []byte {

return buf.Bytes()
}

func TestNotificationDelay(t *testing.T) {
codec := dataCodec{}
cfg := KVConfig{}
cfg.Codecs = append(cfg.Codecs, codec)
cfg.TCPTransport = TCPTransportConfig{
BindAddrs: getLocalhostAddrs(),
}
// We're going to trigger sends manually, so effectively disable the automatic send interval.
const hundredYears = 100 * 365 * 24 * time.Hour
cfg.NotifyInterval = hundredYears
kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())

require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), kv))
})

cli, err := NewClient(kv, codec)
require.NoError(t, err)

casInterval := 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Arrange to do our own ticking.
tick := make(chan time.Time)
go kv.sendKeyNotifications(ctx, tick)

// Mirror any WatchKey updates to our own map and verify that they
// eventually arrive.

dbMu := sync.Mutex{}
db := make(map[string]*data)
keyCalls := make(map[string]int)

setData := func(key string, d *data) {
dbMu.Lock()
defer dbMu.Unlock()
db[key] = d
keyCalls[key]++
}
getData := func(key string) *data {
dbMu.Lock()
defer dbMu.Unlock()
return db[key]
}
callsForKey := func(key string) int {
dbMu.Lock()
defer dbMu.Unlock()
return keyCalls[key]
}
verifyVal := func(k, v string) bool {
d := getData(k)
if d == nil {
return false
}
_, ok := d.Members[v]
return ok
}

watchData := func(key string) {
go cli.WatchKey(ctx, key, func(in any) bool {
setData(key, in.(*data))
return true
})
}

watchData("foo_123")
watchData("foo_124")

assert.Equal(t, 0, callsForKey("foo_123"))
assert.Equal(t, 0, callsForKey("foo_124"))

err = cas(cli, "foo_123", updateFn("val1"))
require.NoError(t, err)

tick <- time.Now()

require.Eventually(t, func() bool {
return verifyVal("foo_123", "val1")
}, 3*casInterval, 25*time.Millisecond)

assert.Equal(t, 1, callsForKey("foo_123"))
assert.Equal(t, 0, callsForKey("foo_124"))

// Test coalescing of updates.
err = cas(cli, "foo_123", updateFn("val2"))
require.NoError(t, err)
err = cas(cli, "foo_123", updateFn("val3"))
require.NoError(t, err)
err = cas(cli, "foo_123", updateFn("val4"))
require.NoError(t, err)

assert.Equal(t, 1, callsForKey("foo_123"), "no flush -> no callback")
assert.Equal(t, 0, callsForKey("foo_124"), "no flush -> no callback")

tick <- time.Now()

require.Eventually(t, func() bool {
return verifyVal("foo_123", "val4")
}, 3*casInterval, 25*time.Millisecond, "multiple updates should be coalesced into the last one")

assert.Equal(t, 2, callsForKey("foo_123"))
assert.Equal(t, 0, callsForKey("foo_124"))

err = cas(cli, "foo_123", updateFn("val100"))
require.NoError(t, err)
err = cas(cli, "foo_124", updateFn("val101"))
require.NoError(t, err)

tick <- time.Now()

require.Eventually(t, func() bool {
return verifyVal("foo_123", "val100") && verifyVal("foo_124", "val101")
}, 3*casInterval, 25*time.Millisecond)

assert.Equal(t, 3, callsForKey("foo_123"))
assert.Equal(t, 1, callsForKey("foo_124"))

require.NotPanics(t, func() {
tick <- time.Now()
tick <- time.Now()
tick <- time.Now()
}, "shouldn't panic or anything like that when ticked without updates")

time.Sleep(100 * time.Millisecond)
assert.Equal(t, 3, callsForKey("foo_123"))
assert.Equal(t, 1, callsForKey("foo_124"))
}
4 changes: 2 additions & 2 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ type Ring struct {
// Number of registered instances per zone.
instancesCountPerZone map[string]int

// Nubmber of registered instances with tokens per zone.
// Number of registered instances with tokens per zone.
instancesWithTokensCountPerZone map[string]int

// Number of registered instances are writable and have tokens.
writableInstancesWithTokensCount int

// Nubmber of registered instances with tokens per zone that are writable.
// Number of registered instances with tokens per zone that are writable.
writableInstancesWithTokensCountPerZone map[string]int

// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
Expand Down
Loading