Skip to content

Commit

Permalink
Merge #131025
Browse files Browse the repository at this point in the history
131025: storeliveness: add transport metrics r=nvanbenschoten a=miraradeva

This commit adds seven new metrics for the Store Liveness Transport:

- `storeliveness.transport.send-queue-size`: Number of pending outgoing  messages in all Store Liveness Transport per-store send queues.
- `storeliveness.transport.send-queue-bytes`: Total byte size of pending  outgoing messages in all Store Liveness Transport per-store send queues.
- `storeliveness.transport.send-queue-idle`: Number of Store Liveness  Transport per-store send queues that have become idle due to no recently-sent messages.
- `storeliveness.transport.sent`: Number of Store Liveness messages sent by the Store Liveness Transport.
- `storeliveness.transport.received`: Number of Store Liveness messages received by the Store Liveness Transport.
- `storeliveness.transport.send_dropped`: Number of Store Liveness messages dropped by the Store Liveness Transport on the sender side.
- `storeliveness.transport.receive_dropped`: Number of Store Liveness messages dropped by the Store Liveness Transport on the receiver side.

Part of: #125067

Release note: None

----

<img width="1095" alt="Screenshot 2024-09-20 at 1 20 18 PM" src="https://github.com/user-attachments/assets/bf9bcce7-4055-44fd-a0b1-afeb3fc65268">


Co-authored-by: Mira Radeva <[email protected]>
  • Loading branch information
craig[bot] and miraradeva committed Sep 25, 2024
2 parents b92a8ea + 69ac6a4 commit 611d44d
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 40 deletions.
7 changes: 7 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,13 @@
<tr><td>STORAGE</td><td>storage.wal.fsync.latency</td><td>The write ahead log fsync latency</td><td>Fsync Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.write-stall-nanos</td><td>Total write stall duration in nanos</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.write-stalls</td><td>Number of instances of intentional write stalls to backpressure incoming writes</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.receive_dropped</td><td>Number of Store Liveness messages dropped by the Store Liveness Transport on the receiver side</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.received</td><td>Number of Store Liveness messages received by the Store Liveness Transport</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.send-queue-bytes</td><td>Total byte size of pending outgoing messages in all Store Liveness Transport per-store send queues</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.send-queue-idle</td><td>Number of Store Liveness Transport per-store send queues that have become idle due to no recently-sent messages</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.send-queue-size</td><td>Number of pending outgoing messages in all Store Liveness Transport per-store send queues</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.send_dropped</td><td>Number of Store Liveness messages dropped by the Store Liveness Transport on the sender side</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>storeliveness.transport.sent</td><td>Number of Store Liveness messages sent by the Store Liveness Transport</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>sysbytes</td><td>Number of bytes in system KV pairs</td><td>Storage</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>syscount</td><td>Count of system KV pairs</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>tenant.consumption.cross_region_network_ru</td><td>Total number of RUs charged for cross-region network traffic</td><td>Request Units</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
supportGracePeriod := s.cfg.RPCContext.StoreLivenessWithdrawalGracePeriod()
options := storeliveness.NewOptions(heartbeatInterval, livenessInterval, supportGracePeriod)
sm := storeliveness.NewSupportManager(
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID}, s.StateEngine(), options,
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID, StoreID: s.StoreID()}, s.StateEngine(), options,
s.cfg.Settings, s.stopper, s.cfg.Clock, s.cfg.StoreLivenessTransport,
)
s.cfg.StoreLivenessTransport.ListenMessages(s.StoreID(), sm)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func createTestStoreWithoutStart(
nil, /* knobs */
)
cfg.StoreLivenessTransport = storeliveness.NewTransport(
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server,
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server, nil, /* knobs */
)

stores := NewStores(cfg.AmbientCtx, cfg.Clock)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"config.go",
"fabric.go",
"metrics.go",
"persist.go",
"requester_state.go",
"support_manager.go",
Expand All @@ -25,6 +26,7 @@ go_library(
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
Expand Down
89 changes: 89 additions & 0 deletions pkg/kv/kvserver/storeliveness/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storeliveness

import "github.com/cockroachdb/cockroach/pkg/util/metric"

// TransportMetrics includes all Store Liveness Transport metrics.
type TransportMetrics struct {
SendQueueSize *metric.Gauge
SendQueueBytes *metric.Gauge
SendQueueIdle *metric.Counter

MessagesSent *metric.Counter
MessagesReceived *metric.Counter
MessagesSendDropped *metric.Counter
MessagesReceiveDropped *metric.Counter
}

func newTransportMetrics() *TransportMetrics {
return &TransportMetrics{
SendQueueSize: metric.NewGauge(metaSendQueueSize),
SendQueueBytes: metric.NewGauge(metaSendQueueBytes),
SendQueueIdle: metric.NewCounter(metaSendQueueIdle),
MessagesSent: metric.NewCounter(metaMessagesSent),
MessagesReceived: metric.NewCounter(metaMessagesReceived),
MessagesSendDropped: metric.NewCounter(metaMessagesSendDropped),
MessagesReceiveDropped: metric.NewCounter(metaMessagesReceiveDropped),
}
}

var (
metaSendQueueSize = metric.Metadata{
Name: "storeliveness.transport.send-queue-size",
Help: "Number of pending outgoing messages in all " +
"Store Liveness Transport per-store send queues",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaSendQueueBytes = metric.Metadata{
Name: "storeliveness.transport.send-queue-bytes",
Help: "Total byte size of pending outgoing messages in all " +
"Store Liveness Transport per-store send queues",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaSendQueueIdle = metric.Metadata{
Name: "storeliveness.transport.send-queue-idle",
Help: "Number of Store Liveness Transport per-store send queues " +
"that have become idle due to no recently-sent messages",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaMessagesSent = metric.Metadata{
Name: "storeliveness.transport.sent",
Help: "Number of Store Liveness messages sent by the " +
"Store Liveness Transport",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaMessagesReceived = metric.Metadata{
Name: "storeliveness.transport.received",
Help: "Number of Store Liveness messages received by the " +
"Store Liveness Transport",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaMessagesSendDropped = metric.Metadata{
Name: "storeliveness.transport.send_dropped",
Help: "Number of Store Liveness messages dropped by the " +
"Store Liveness Transport on the sender side",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaMessagesReceiveDropped = metric.Metadata{
Name: "storeliveness.transport.receive_dropped",
Help: "Number of Store Liveness messages dropped by the " +
"Store Liveness Transport on the receiver side",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
)
19 changes: 16 additions & 3 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -85,8 +86,8 @@ func NewSupportManager(

// HandleMessage implements the MessageHandler interface. It appends incoming
// messages to a queue and does not block on processing the messages.
func (sm *SupportManager) HandleMessage(msg *slpb.Message) {
sm.receiveQueue.Append(msg)
func (sm *SupportManager) HandleMessage(msg *slpb.Message) error {
return sm.receiveQueue.Append(msg)
}

var _ MessageHandler = (*SupportManager)(nil)
Expand Down Expand Up @@ -342,6 +343,13 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
log.VInfof(ctx, 2, "store %+v sent %d responses", sm.storeID, len(responses))
}

// maxReceiveQueueSize is the maximum number of messages the receive queue can
// store. If message consumption is slow (e.g. due to a disk stall) and the
// queue reaches maxReceiveQueueSize, incoming messages will be dropped.
const maxReceiveQueueSize = 10000

var receiveQueueSizeLimitReachedErr = errors.Errorf("store liveness receive queue is full")

// receiveQueue stores all received messages from the MessageHandler and allows
// them to be processed async and in batch.
type receiveQueue struct {
Expand All @@ -358,14 +366,19 @@ func newReceiveQueue() receiveQueue {
}
}

func (q *receiveQueue) Append(msg *slpb.Message) {
func (q *receiveQueue) Append(msg *slpb.Message) error {
q.mu.Lock()
defer q.mu.Unlock()
// Drop messages if maxReceiveQueueSize is reached.
if len(q.mu.msgs) >= maxReceiveQueueSize {
return receiveQueueSizeLimitReachedErr
}
q.mu.msgs = append(q.mu.msgs, msg)
select {
case q.sig <- struct{}{}:
default:
}
return nil
}

func (q *receiveQueue) Sig() <-chan struct{} {
Expand Down
42 changes: 40 additions & 2 deletions pkg/kv/kvserver/storeliveness/support_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
Epoch: slpb.Epoch(1),
Expiration: requestedExpiration,
}
sm.HandleMessage(heartbeatResp)
require.NoError(t, sm.HandleMessage(heartbeatResp))

// Ensure support is provided as seen by SupportFrom.
testutils.SucceedsSoon(
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
Epoch: slpb.Epoch(1),
Expiration: sm.clock.Now().AddDuration(time.Second),
}
sm.HandleMessage(heartbeat)
require.NoError(t, sm.HandleMessage(heartbeat))

// Ensure a response is sent.
testutils.SucceedsSoon(
Expand Down Expand Up @@ -257,6 +257,8 @@ func TestSupportManagerRestart(t *testing.T) {
require.True(t, withdrawalTime.Less(now))
}

// TestSupportManagerDiskStall tests that the SupportManager continues to
// respond to SupportFrom and SupportFor calls when its disk is stalled.
func TestSupportManagerDiskStall(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -328,6 +330,42 @@ func TestSupportManagerDiskStall(t *testing.T) {
ensureHeartbeats(t, sender, 1)
}

// TestSupportManagerReceiveQueueLimit tests that the receive queue returns
// errors when the queue size limit is reached.
func TestSupportManagerReceiveQueueLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()
settings := clustersettings.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
manual := hlc.NewHybridManualClock()
clock := hlc.NewClockForTesting(manual)
sender := &testMessageSender{}
sm := NewSupportManager(store, engine, options, settings, stopper, clock, sender)
// Initialize the SupportManager without starting the main goroutine.
require.NoError(t, sm.onRestart(ctx))

heartbeat := &slpb.Message{
Type: slpb.MsgHeartbeat,
From: remoteStore,
To: sm.storeID,
Epoch: slpb.Epoch(1),
Expiration: clock.Now().AddDuration(sm.options.LivenessInterval),
}

for i := 0; i < maxReceiveQueueSize; i++ {
require.NoError(t, sm.HandleMessage(heartbeat))
}

// Nothing is consuming messages from the queue, so the next HandleMessage
// should result in an error.
require.Regexp(t, sm.HandleMessage(heartbeat), "store liveness receive queue is full")
}

func ensureHeartbeats(t *testing.T, sender *testMessageSender, expectedNum int) []slpb.Message {
var msgs []slpb.Message
testutils.SucceedsSoon(
Expand Down
Loading

0 comments on commit 611d44d

Please sign in to comment.