From b576c793ecdbf175c970639dbcffc2f9598a8156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 18 Jul 2024 11:41:57 +0200 Subject: [PATCH] Update dskit (#8761) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update dskit. Signed-off-by: Peter Štibraný * docs, help, changelog. Signed-off-by: Peter Štibraný * CHANGELOG.md PR. Signed-off-by: Peter Štibraný --------- Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + cmd/mimir/config-descriptor.json | 11 ++ cmd/mimir/help-all.txt.tmpl | 2 + .../configuration-parameters/index.md | 8 ++ go.mod | 2 +- go.sum | 4 +- .../grafana/dskit/kv/memberlist/broadcast.go | 4 - .../dskit/kv/memberlist/memberlist_client.go | 112 +++++++++++------- .../grafana/dskit/kv/memberlist/metrics.go | 32 +++-- .../grafana/dskit/tracing/tracing.go | 24 +++- vendor/modules.txt | 2 +- 11 files changed, 138 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35a15f44fc4..8902acb686f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [ENHANCEMENT] Query-frontend: Add `header_cache_control` to query stats. #8590 * [ENHANCEMENT] Query-scheduler: Introduce `query-scheduler.use-multi-algorithm-query-queue`, which allows use of an experimental queue structure, with no change in external queue behavior. #7873 * [ENHANCEMENT] Expose a new `s3.trace.enabled` configuration option to enable detailed logging of operations against S3-compatible object stores. #8690 +* [ENHANCEMENT] memberlist: locally-generated messages (e.g. ring updates) are sent to gossip network before forwarded messages. Introduced `-memberlist.broadcast-timeout-for-local-updates-on-shutdown` option to modify how long to wait until queue with locally-generated messages is empty when shutting down. Previously this was hard-coded to 10s, and wait included all messages (locally-generated and forwarded). Now it defaults to 10s, 0 means no timeout. Increasing this value may help to avoid problem when ring updates on shutdown are not propagated to other nodes, and ring entry is left in a wrong state. #8761 * [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346 * [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388 * [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 82f6e3edd7a..c007e200584 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -15863,6 +15863,17 @@ "fieldType": "duration", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "broadcast_timeout_for_local_updates_on_shutdown", + "required": false, + "desc": "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.", + "fieldValue": null, + "fieldDefaultValue": 10000000000, + "fieldFlag": "memberlist.broadcast-timeout-for-local-updates-on-shutdown", + "fieldType": "duration", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "message_history_buffer_bytes", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 91d5b661e23..63e0b3d761c 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1679,6 +1679,8 @@ Usage of ./cmd/mimir/mimir: IP address to listen on for gossip messages. Multiple addresses may be specified. Defaults to 0.0.0.0 -memberlist.bind-port int Port to listen on for gossip messages. (default 7946) + -memberlist.broadcast-timeout-for-local-updates-on-shutdown duration + Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent. (default 10s) -memberlist.cluster-label string The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true. -memberlist.cluster-label-verification-disabled diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 8046477c869..d5f8e7e6f13 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3030,6 +3030,14 @@ The `memberlist` block configures the Gossip memberlist. # CLI flag: -memberlist.leave-timeout [leave_timeout: | default = 20s] +# (advanced) Timeout for broadcasting all remaining locally-generated updates to +# other nodes when shutting down. Only used if there are nodes left in the +# memberlist cluster, and only applies to locally-generated updates, not to +# broadcast messages that are result of incoming gossip updates. 0 = no timeout, +# wait until all locally-generated updates are sent. +# CLI flag: -memberlist.broadcast-timeout-for-local-updates-on-shutdown +[broadcast_timeout_for_local_updates_on_shutdown: | default = 10s] + # (advanced) How much space to use for keeping received and sent messages in # memory for troubleshooting (two buffers). 0 to disable. # CLI flag: -memberlist.message-history-buffer-bytes diff --git a/go.mod b/go.mod index 9b669ddc3d4..544906e6986 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20240712071108-b834d6b908f5 + github.com/grafana/dskit v0.0.0-20240718080635-f5bd38371e1c github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 5684bdf678e..dcb94c41afe 100644 --- a/go.sum +++ b/go.sum @@ -515,8 +515,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240708204730-284b8fe16cff h1:okOj0w7kyIfrENSFGVXTjn3aSiS2QUcqwDozQkoKdH0= github.com/grafana/alerting v0.0.0-20240708204730-284b8fe16cff/go.mod h1:DLj8frbtCaITljC2jc0L85JQViPF3mPfOSiYhm1osso= -github.com/grafana/dskit v0.0.0-20240712071108-b834d6b908f5 h1:lX7Wea15AnBS0QVwZm4dWdJmSgjF4HmNkHxyiGF3/qc= -github.com/grafana/dskit v0.0.0-20240712071108-b834d6b908f5/go.mod h1:UA1BG0yY/B7lTcdeqoud+3/TglKmPL88OM5qCeRs8BU= +github.com/grafana/dskit v0.0.0-20240718080635-f5bd38371e1c h1:rKnsl5RKCI7kFC3ORBZLYwS6AulFpJ3yv6c0pSH7aKQ= +github.com/grafana/dskit v0.0.0-20240718080635-f5bd38371e1c/go.mod h1:UA1BG0yY/B7lTcdeqoud+3/TglKmPL88OM5qCeRs8BU= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go b/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go index 6657b73a51d..aefaa2f65e1 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go @@ -1,10 +1,7 @@ package memberlist import ( - "fmt" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/hashicorp/memberlist" ) @@ -45,7 +42,6 @@ func (r ringBroadcast) Invalidates(old memberlist.Broadcast) bool { // otherwise, we may be invalidating some older messages, which however covered different // ingesters if r.version >= oldb.version { - level.Debug(r.logger).Log("msg", "Invalidating forwarded broadcast", "key", r.key, "version", r.version, "oldVersion", oldb.version, "content", fmt.Sprintf("%v", r.content), "oldContent", fmt.Sprintf("%v", oldb.content)) return true } } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index a1b659d4097..a7eefe92fc2 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -157,7 +157,8 @@ type KVConfig struct { LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` + BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` @@ -198,6 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") + f.DurationVar(&cfg.BroadcastTimeoutForLocalUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-local-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -231,10 +233,11 @@ type KV struct { // dns discovery provider provider DNSProvider - // Protects access to memberlist and broadcasts fields. - delegateReady atomic.Bool - memberlist *memberlist.Memberlist - broadcasts *memberlist.TransmitLimitedQueue + // Protects access to memberlist and broadcast queues. + delegateReady atomic.Bool + memberlist *memberlist.Memberlist + localBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally + gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes // KV Store. storeMu sync.Mutex @@ -273,7 +276,8 @@ type KV struct { numberOfPushes prometheus.Counter totalSizeOfPulls prometheus.Counter totalSizeOfPushes prometheus.Counter - numberOfBroadcastMessagesInQueue prometheus.GaugeFunc + numberOfGossipMessagesInQueue prometheus.GaugeFunc + numberOfLocalMessagesInQueue prometheus.GaugeFunc totalSizeOfBroadcastMessagesInQueue prometheus.Gauge numberOfBroadcastMessagesDropped prometheus.Counter casAttempts prometheus.Counter @@ -456,7 +460,11 @@ func (m *KV) starting(ctx context.Context) error { } // Finish delegate initialization. m.memberlist = list - m.broadcasts = &memberlist.TransmitLimitedQueue{ + m.localBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: list.NumMembers, + RetransmitMult: mlCfg.RetransmitMult, + } + m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, RetransmitMult: mlCfg.RetransmitMult, } @@ -719,20 +727,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string { func (m *KV) stopping(_ error) error { level.Info(m.logger).Log("msg", "leaving memberlist cluster") - // Wait until broadcast queue is empty, but don't wait for too long. + // Wait until queue with locally-generated messages is empty, but don't wait for too long. // Also don't wait if there is just one node left. - // Problem is that broadcast queue is also filled up by state changes received from other nodes, - // so it may never be empty in a busy cluster. However, we generally only care about messages - // generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able - // to get out in this timeout. + // Note: Once we enter Stopping state, we don't queue more locally-generated messages. - waitTimeout := time.Now().Add(10 * time.Second) - for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) { + deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown) + + msgs := m.localBroadcasts.NumQueued() + nodes := m.memberlist.NumMembers() + for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) { + level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes) time.Sleep(250 * time.Millisecond) + + msgs = m.localBroadcasts.NumQueued() + nodes = m.memberlist.NumMembers() } - if cnt := m.broadcasts.NumQueued(); cnt > 0 { - level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers()) + if msgs > 0 { + level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes) } err := m.memberlist.Leave(m.cfg.LeaveTimeout) @@ -972,11 +984,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - if m.State() == services.Running { - m.broadcastNewValue(key, change, newver, codec) - } else { - level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) - } + m.broadcastNewValue(key, change, newver, codec, true) } return nil @@ -1034,7 +1042,12 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) return change, newver, retry, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) { +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { + if locallyGenerated && m.State() != services.Running { + level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) + return + } + data, err := codec.Encode(change) if err != nil { level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err) @@ -1058,7 +1071,25 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec Changes: change.MergeContent(), }) - m.queueBroadcast(key, change.MergeContent(), version, pairData) + l := len(pairData) + b := ringBroadcast{ + key: key, + content: change.MergeContent(), + version: version, + msg: pairData, + finished: func(ringBroadcast) { + m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) + }, + logger: m.logger, + } + + m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) + + if locallyGenerated { + m.localBroadcasts.QueueBroadcast(b) + } else { + m.gossipBroadcasts.QueueBroadcast(b) + } } // NodeMeta is method from Memberlist Delegate interface @@ -1153,7 +1184,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { m.notifyWatchers(key) // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec) + m.broadcastNewValue(key, mod, version, update.codec, false) } case <-m.shutdown: @@ -1163,24 +1194,6 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { } } -func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) { - l := len(message) - - b := ringBroadcast{ - key: key, - content: content, - version: version, - msg: message, - finished: func(ringBroadcast) { - m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l)) - }, - logger: m.logger, - } - - m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l)) - m.broadcasts.QueueBroadcast(b) -} - // GetBroadcasts is method from Memberlist Delegate interface // It returns all pending broadcasts (within the size limit) func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { @@ -1188,7 +1201,18 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { return nil } - return m.broadcasts.GetBroadcasts(overhead, limit) + // Prioritize locally-generated messages + msgs := m.localBroadcasts.GetBroadcasts(overhead, limit) + + // Decrease limit for each message we got from locally-generated broadcasts. + for _, m := range msgs { + limit -= overhead + len(m) + } + + if limit > 0 { + msgs = append(msgs, m.gossipBroadcasts.GetBroadcasts(overhead, limit)...) + } + return msgs } // LocalState is method from Memberlist Delegate interface @@ -1335,7 +1359,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false) } } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go index 75a6b232476..0f09c5d71fb 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go @@ -71,15 +71,33 @@ func (m *KV) createAndRegisterMetrics() { Help: "Total size of pulled state", }) - m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: m.cfg.MetricsNamespace, - Subsystem: subsystem, - Name: "messages_in_broadcast_queue", - Help: "Number of user messages in the broadcast queue", + const queueMetricName = "messages_in_broadcast_queue" + const queueMetricHelp = "Number of user messages in the broadcast queue" + + m.numberOfGossipMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: queueMetricName, + Help: queueMetricHelp, + ConstLabels: map[string]string{"queue": "gossip"}, + }, func() float64 { + // Queues are not set before Starting state + if m.State() == services.Running || m.State() == services.Stopping { + return float64(m.gossipBroadcasts.NumQueued()) + } + return 0 + }) + + m.numberOfLocalMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: queueMetricName, + Help: queueMetricHelp, + ConstLabels: map[string]string{"queue": "local"}, }, func() float64 { - // m.broadcasts is not set before Starting state + // Queues are not set before Starting state if m.State() == services.Running || m.State() == services.Stopping { - return float64(m.broadcasts.NumQueued()) + return float64(m.localBroadcasts.NumQueued()) } return 0 }) diff --git a/vendor/github.com/grafana/dskit/tracing/tracing.go b/vendor/github.com/grafana/dskit/tracing/tracing.go index 66b3a3cef4c..1882a081dfd 100644 --- a/vendor/github.com/grafana/dskit/tracing/tracing.go +++ b/vendor/github.com/grafana/dskit/tracing/tracing.go @@ -55,16 +55,30 @@ func NewFromEnv(serviceName string, options ...jaegercfg.Option) (io.Closer, err // ExtractTraceID extracts the trace id, if any from the context. func ExtractTraceID(ctx context.Context) (string, bool) { + if tid, _, ok := extractJaegerContext(ctx); ok { + return tid.String(), true + } + return "", false +} + +// ExtractTraceSpanID extracts the trace id, span id if any from the context. +func ExtractTraceSpanID(ctx context.Context) (string, string, bool) { + if tid, sid, ok := extractJaegerContext(ctx); ok { + return tid.String(), sid.String(), true + } + return "", "", false +} + +func extractJaegerContext(ctx context.Context) (tid jaeger.TraceID, sid jaeger.SpanID, success bool) { sp := opentracing.SpanFromContext(ctx) if sp == nil { - return "", false + return } - sctx, ok := sp.Context().(jaeger.SpanContext) + jsp, ok := sp.Context().(jaeger.SpanContext) if !ok { - return "", false + return } - - return sctx.TraceID().String(), true + return jsp.TraceID(), jsp.SpanID(), true } // ExtractSampledTraceID works like ExtractTraceID but the returned bool is only diff --git a/vendor/modules.txt b/vendor/modules.txt index 5fadad4c95b..229bedf7c2e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -610,7 +610,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20240712071108-b834d6b908f5 +# github.com/grafana/dskit v0.0.0-20240718080635-f5bd38371e1c ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast