Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memberlist: support for debouncing notifications #592

Merged
merged 32 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
11761b2
Ring: Add a buffering interval to observed WatchKey updates.
seizethedave Oct 3, 2024
be934fe
Don't just update once.
seizethedave Oct 3, 2024
c1c8d20
Tuck periodic updater into loop method.
seizethedave Oct 3, 2024
76dd98d
Rm newline.
seizethedave Oct 3, 2024
f8aaa8a
Fix existing tests.
seizethedave Oct 3, 2024
6ae2251
Changelog
seizethedave Oct 3, 2024
7fd4584
Add some tests around update debouncing.
seizethedave Oct 4, 2024
acc3169
Adjust changelog.
seizethedave Oct 4, 2024
4aca6fc
Update CHANGELOG.md
seizethedave Oct 4, 2024
f583a42
Fix tests.
seizethedave Oct 4, 2024
f37aaf8
Validate cfg.UpdateInterval.
seizethedave Oct 4, 2024
e1c121a
Don't mutate the config.
seizethedave Oct 4, 2024
5bee925
Merge remote-tracking branch 'origin/main' into davidgrant/ring-updat…
seizethedave Oct 4, 2024
58dbc8e
merge changelog
seizethedave Oct 7, 2024
56702dd
Move delay bits to updateObserver, default to noDelay.
seizethedave Oct 8, 2024
3f0abaf
Don't need a noDelayObserver.
seizethedave Oct 8, 2024
2840d10
Don't need an interface.
seizethedave Oct 8, 2024
7f73429
Rename observer files.
seizethedave Oct 8, 2024
7ddb44b
observeUpdate -> put; add a basic test for the ticker loop.
seizethedave Oct 8, 2024
7c4de1b
Merge remote-tracking branch 'origin/main' into davidgrant/ring-updat…
seizethedave Oct 8, 2024
12a5324
Default no delay.
seizethedave Oct 8, 2024
8ce282f
merge
seizethedave Oct 10, 2024
b9ca62b
Add debounce to memberlist notification code.
seizethedave Oct 11, 2024
a75be3a
Rm debounce from ring.
seizethedave Oct 11, 2024
4e145cf
undo servicediscovery changes
seizethedave Oct 11, 2024
082a1d0
revise changelog
seizethedave Oct 11, 2024
8e2a4a0
Wait up to 3*casInterval like other tests.
seizethedave Oct 11, 2024
b5df789
Remove CAS/WatchKey layers from notification delay tests.
seizethedave Oct 11, 2024
91d4db9
Exercise monitor method. Remove all unnecessary stuff from test.
seizethedave Oct 11, 2024
ad78368
Doc comment.
seizethedave Oct 11, 2024
3cbd462
didn't mean to commit that bit.
seizethedave Oct 11, 2024
73b2a73
Merge remote-tracking branch 'origin/main' into davidgrant/ring-updat…
seizethedave Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@
* [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `(<reason>)` suffix to the error. #514
* [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520
* [CHANGE] Updated the minimum required Go version to 1.21. #540
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539
* [CHANGE] memberlist: Failure to fast-join a cluster via contacting a node is now logged at `info` instead of `debug`. #585
* [CHANGE] `Service.AddListener` and `Manager.AddListener` now return function for stopping the listener. #564
* [CHANGE] ring: KVStrore updates now processed at most once per `-ring.update-interval`. #592
seizethedave marked this conversation as resolved.
Show resolved Hide resolved
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
* [FEATURE] Add `log.BufferedLogger` type. #338
Expand Down Expand Up @@ -230,7 +232,6 @@
* [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571
* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583
* [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
42 changes: 38 additions & 4 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/internal/slices"
Expand Down Expand Up @@ -143,6 +144,7 @@ var (
type Config struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
UpdateInterval time.Duration `yaml:"update_interval" category:"advanced"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
ExcludedZones flagext.StringSliceCSV `yaml:"excluded_zones" category:"advanced"`
Expand All @@ -162,6 +164,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.KVStore.RegisterFlagsWithPrefix(prefix, "collectors/", f)

f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes. 0 = never (timeout disabled).")
f.DurationVar(&cfg.UpdateInterval, prefix+"ring.update-interval", 250*time.Millisecond, "How often to recompute ring state when a change is detected from the KVStore.")
f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.")
f.Var(&cfg.ExcludedZones, prefix+"distributor.excluded-zones", "Comma-separated list of zones to exclude from the ring. Instances in excluded zones will be filtered out from the ring.")
Expand Down Expand Up @@ -215,20 +218,23 @@ type Ring struct {
// Number of registered instances per zone.
instancesCountPerZone map[string]int

// Nubmber of registered instances with tokens per zone.
// Number of registered instances with tokens per zone.
instancesWithTokensCountPerZone map[string]int

// Number of registered instances are writable and have tokens.
writableInstancesWithTokensCount int

// Nubmber of registered instances with tokens per zone that are writable.
// Number of registered instances with tokens per zone that are writable.
writableInstancesWithTokensCountPerZone map[string]int

// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring
shuffledSubringWithLookbackCache map[subringCacheKey]cachedSubringWithLookback[*Ring]

// The last observed update from the KV store.
watchKeyUpdate atomic.Pointer[Desc]

numMembersGaugeVec *prometheus.GaugeVec
totalTokensGauge prometheus.Gauge
oldestTimestampGaugeVec *prometheus.GaugeVec
Expand Down Expand Up @@ -324,18 +330,46 @@ func (r *Ring) loop(ctx context.Context) error {
r.updateRingMetrics()
r.mtx.Unlock()

// Debounce WatchKey updates, as they can be frequent enough to cause lock
// contention. The most recent update is the one we'll use when we
// periodically update the ring.

go func() {
t := time.NewTicker(r.cfg.UpdateInterval)
seizethedave marked this conversation as resolved.
Show resolved Hide resolved
defer t.Stop()
for {
select {
case <-t.C:
r.observeKeyUpdate()
case <-ctx.Done():
return
}
}
}()

r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool {
if value == nil {
level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet")
return true
}

r.updateRingState(value.(*Desc))
r.storeKeyUpdate(value.(*Desc))
return true
})
return nil
}

// storeKeyUpdate stores a new watch key update for later storage in the ring.
func (r *Ring) storeKeyUpdate(value *Desc) {
r.watchKeyUpdate.Store(value)
}

// observeKeyUpdate is called periodically to update the ring state if a new watch key update is available.
func (r *Ring) observeKeyUpdate() {
if value := r.watchKeyUpdate.Swap(nil); value != nil {
r.updateRingState(value)
}
}

func (r *Ring) updateRingState(ringDesc *Desc) {
r.mtx.RLock()
prevRing := r.ringDesc
Expand Down
43 changes: 43 additions & 0 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func benchmarkUpdateRingState(b *testing.B, numInstances, numTokens int, updateT
cfg := Config{
KVStore: kv.Config{},
HeartbeatTimeout: 0, // get healthy stats
UpdateInterval: 250 * time.Millisecond,
ReplicationFactor: 3,
ZoneAwarenessEnabled: true,
}
Expand Down Expand Up @@ -3570,6 +3571,46 @@ func compareReplicationSets(first, second ReplicationSet) (added, removed []stri
return
}

// Verify that WatchKey updates make it to the ring, and the latest update is observed.
func TestRingWatchKeyUpdates(t *testing.T) {
cfg := Config{ReplicationFactor: 3}
ring, err := NewWithStoreClientAndStrategy(cfg, testRingName, testRingKey, nil, NewDefaultReplicationStrategy(), prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)

desc1 := NewDesc()
desc1.Ingesters = map[string]InstanceDesc{
"instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{1}, time.Now()),
}
desc2 := NewDesc()
desc2.Ingesters = map[string]InstanceDesc{
"instance-1": generateRingInstanceWithInfo("instance-1", "zone-a", []uint32{1}, time.Now()),
"instance-2": generateRingInstanceWithInfo("instance-2", "zone-a", []uint32{2}, time.Now()),
}

// processUpdate is a stand-in for the ring's timer goroutine that normally calls observeKeyUpdate.
processUpdate := func() *Desc {
ring.observeKeyUpdate()
ring.mtx.RLock()
defer ring.mtx.RUnlock()
return ring.ringDesc
}

assert.Empty(t, processUpdate().Ingesters, "no desc initially")
assert.Empty(t, processUpdate().Ingesters, "can update multiple times without any key update stored")

ring.storeKeyUpdate(desc1)
assert.Same(t, desc1, processUpdate())
assert.Same(t, desc1, processUpdate(), "no change if no new update")

ring.storeKeyUpdate(desc2)
assert.Same(t, desc2, processUpdate())
assert.Same(t, desc2, processUpdate(), "no change if no new update")

ring.storeKeyUpdate(desc1)
ring.storeKeyUpdate(desc2)
assert.Same(t, desc2, processUpdate(), "should observe last update")
}

// This test verifies that ring is getting updates, even after extending check in the loop method.
func TestRingUpdates(t *testing.T) {
const (
Expand Down Expand Up @@ -3608,6 +3649,7 @@ func TestRingUpdates(t *testing.T) {
cfg := Config{
KVStore: kv.Config{Mock: inmem},
HeartbeatTimeout: 1 * time.Minute,
UpdateInterval: 20 * time.Millisecond,
ReplicationFactor: 3,
ExcludedZones: flagext.StringSliceCSV(testData.excludedZones),
}
Expand Down Expand Up @@ -3716,6 +3758,7 @@ func TestRing_ShuffleShard_Caching(t *testing.T) {
cfg := Config{
KVStore: kv.Config{Mock: inmem},
HeartbeatTimeout: 1 * time.Minute,
UpdateInterval: 20 * time.Millisecond,
ReplicationFactor: 3,
ZoneAwarenessEnabled: true,
}
Expand Down
Loading