diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index e885fdc65359..f60d9b73077c 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -771,6 +771,13 @@
STORAGE | storage.wal.fsync.latency | The write ahead log fsync latency | Fsync Latency | HISTOGRAM | NANOSECONDS | AVG | NONE |
STORAGE | storage.write-stall-nanos | Total write stall duration in nanos | Nanoseconds | GAUGE | NANOSECONDS | AVG | NONE |
STORAGE | storage.write-stalls | Number of instances of intentional write stalls to backpressure incoming writes | Events | GAUGE | COUNT | AVG | NONE |
+STORAGE | storeliveness.transport.receive_dropped | Number of Store Liveness messages dropped by the Store Liveness Transport on the receiver side | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+STORAGE | storeliveness.transport.received | Number of Store Liveness messages received by the Store Liveness Transport | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+STORAGE | storeliveness.transport.send-queue-bytes | Total byte size of pending outgoing messages in all Store Liveness Transport per-store send queues | Bytes | GAUGE | BYTES | AVG | NONE |
+STORAGE | storeliveness.transport.send-queue-idle | Number of Store Liveness Transport per-store send queues that have become idle due to no recently-sent messages | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+STORAGE | storeliveness.transport.send-queue-size | Number of pending outgoing messages in all Store Liveness Transport per-store send queues | Messages | GAUGE | COUNT | AVG | NONE |
+STORAGE | storeliveness.transport.send_dropped | Number of Store Liveness messages dropped by the Store Liveness Transport on the sender side | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+STORAGE | storeliveness.transport.sent | Number of Store Liveness messages sent by the Store Liveness Transport | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | sysbytes | Number of bytes in system KV pairs | Storage | GAUGE | BYTES | AVG | NONE |
STORAGE | syscount | Count of system KV pairs | Keys | GAUGE | COUNT | AVG | NONE |
STORAGE | tenant.consumption.cross_region_network_ru | Total number of RUs charged for cross-region network traffic | Request Units | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index a6dbeffa5d2c..06ff97060b91 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -2191,7 +2191,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
supportGracePeriod := s.cfg.RPCContext.StoreLivenessWithdrawalGracePeriod()
options := storeliveness.NewOptions(heartbeatInterval, livenessInterval, supportGracePeriod)
sm := storeliveness.NewSupportManager(
- slpb.StoreIdent{NodeID: s.nodeDesc.NodeID}, s.StateEngine(), options,
+ slpb.StoreIdent{NodeID: s.nodeDesc.NodeID, StoreID: s.StoreID()}, s.StateEngine(), options,
s.cfg.Settings, s.stopper, s.cfg.Clock, s.cfg.StoreLivenessTransport,
)
s.cfg.StoreLivenessTransport.ListenMessages(s.StoreID(), sm)
diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go
index 1c4a7f475df7..63a5b7d44e97 100644
--- a/pkg/kv/kvserver/store_test.go
+++ b/pkg/kv/kvserver/store_test.go
@@ -250,7 +250,7 @@ func createTestStoreWithoutStart(
nil, /* knobs */
)
cfg.StoreLivenessTransport = storeliveness.NewTransport(
- cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server,
+ cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server, nil, /* knobs */
)
stores := NewStores(cfg.AmbientCtx, cfg.Clock)
diff --git a/pkg/kv/kvserver/storeliveness/BUILD.bazel b/pkg/kv/kvserver/storeliveness/BUILD.bazel
index 59c6b88c52d7..85569d64e197 100644
--- a/pkg/kv/kvserver/storeliveness/BUILD.bazel
+++ b/pkg/kv/kvserver/storeliveness/BUILD.bazel
@@ -5,6 +5,7 @@ go_library(
srcs = [
"config.go",
"fabric.go",
+ "metrics.go",
"persist.go",
"requester_state.go",
"support_manager.go",
@@ -25,6 +26,7 @@ go_library(
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
+ "//pkg/util/metric",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
diff --git a/pkg/kv/kvserver/storeliveness/metrics.go b/pkg/kv/kvserver/storeliveness/metrics.go
new file mode 100644
index 000000000000..990ca04f534d
--- /dev/null
+++ b/pkg/kv/kvserver/storeliveness/metrics.go
@@ -0,0 +1,89 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package storeliveness
+
+import "github.com/cockroachdb/cockroach/pkg/util/metric"
+
+// TransportMetrics includes all Store Liveness Transport metrics.
+type TransportMetrics struct {
+ SendQueueSize *metric.Gauge
+ SendQueueBytes *metric.Gauge
+ SendQueueIdle *metric.Counter
+
+ MessagesSent *metric.Counter
+ MessagesReceived *metric.Counter
+ MessagesSendDropped *metric.Counter
+ MessagesReceiveDropped *metric.Counter
+}
+
+func newTransportMetrics() *TransportMetrics {
+ return &TransportMetrics{
+ SendQueueSize: metric.NewGauge(metaSendQueueSize),
+ SendQueueBytes: metric.NewGauge(metaSendQueueBytes),
+ SendQueueIdle: metric.NewCounter(metaSendQueueIdle),
+ MessagesSent: metric.NewCounter(metaMessagesSent),
+ MessagesReceived: metric.NewCounter(metaMessagesReceived),
+ MessagesSendDropped: metric.NewCounter(metaMessagesSendDropped),
+ MessagesReceiveDropped: metric.NewCounter(metaMessagesReceiveDropped),
+ }
+}
+
+var (
+ metaSendQueueSize = metric.Metadata{
+ Name: "storeliveness.transport.send-queue-size",
+ Help: "Number of pending outgoing messages in all " +
+ "Store Liveness Transport per-store send queues",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
+ metaSendQueueBytes = metric.Metadata{
+ Name: "storeliveness.transport.send-queue-bytes",
+ Help: "Total byte size of pending outgoing messages in all " +
+ "Store Liveness Transport per-store send queues",
+ Measurement: "Bytes",
+ Unit: metric.Unit_BYTES,
+ }
+ metaSendQueueIdle = metric.Metadata{
+ Name: "storeliveness.transport.send-queue-idle",
+ Help: "Number of Store Liveness Transport per-store send queues " +
+ "that have become idle due to no recently-sent messages",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
+ metaMessagesSent = metric.Metadata{
+ Name: "storeliveness.transport.sent",
+ Help: "Number of Store Liveness messages sent by the " +
+ "Store Liveness Transport",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
+ metaMessagesReceived = metric.Metadata{
+ Name: "storeliveness.transport.received",
+ Help: "Number of Store Liveness messages received by the " +
+ "Store Liveness Transport",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
+ metaMessagesSendDropped = metric.Metadata{
+ Name: "storeliveness.transport.send_dropped",
+ Help: "Number of Store Liveness messages dropped by the " +
+ "Store Liveness Transport on the sender side",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
+ metaMessagesReceiveDropped = metric.Metadata{
+ Name: "storeliveness.transport.receive_dropped",
+ Help: "Number of Store Liveness messages dropped by the " +
+ "Store Liveness Transport on the receiver side",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
+)
diff --git a/pkg/kv/kvserver/storeliveness/support_manager.go b/pkg/kv/kvserver/storeliveness/support_manager.go
index 37eb0651a8f6..4d0bdb9b0dd4 100644
--- a/pkg/kv/kvserver/storeliveness/support_manager.go
+++ b/pkg/kv/kvserver/storeliveness/support_manager.go
@@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/errors"
"golang.org/x/exp/maps"
)
@@ -85,8 +86,8 @@ func NewSupportManager(
// HandleMessage implements the MessageHandler interface. It appends incoming
// messages to a queue and does not block on processing the messages.
-func (sm *SupportManager) HandleMessage(msg *slpb.Message) {
- sm.receiveQueue.Append(msg)
+func (sm *SupportManager) HandleMessage(msg *slpb.Message) error {
+ return sm.receiveQueue.Append(msg)
}
var _ MessageHandler = (*SupportManager)(nil)
@@ -342,6 +343,13 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
log.VInfof(ctx, 2, "store %+v sent %d responses", sm.storeID, len(responses))
}
+// maxReceiveQueueSize is the maximum number of messages the receive queue can
+// store. If message consumption is slow (e.g. due to a disk stall) and the
+// queue reaches maxReceiveQueueSize, incoming messages will be dropped.
+const maxReceiveQueueSize = 10000
+
+var receiveQueueSizeLimitReachedErr = errors.Errorf("store liveness receive queue is full")
+
// receiveQueue stores all received messages from the MessageHandler and allows
// them to be processed async and in batch.
type receiveQueue struct {
@@ -358,14 +366,19 @@ func newReceiveQueue() receiveQueue {
}
}
-func (q *receiveQueue) Append(msg *slpb.Message) {
+func (q *receiveQueue) Append(msg *slpb.Message) error {
q.mu.Lock()
defer q.mu.Unlock()
+ // Drop messages if maxReceiveQueueSize is reached.
+ if len(q.mu.msgs) >= maxReceiveQueueSize {
+ return receiveQueueSizeLimitReachedErr
+ }
q.mu.msgs = append(q.mu.msgs, msg)
select {
case q.sig <- struct{}{}:
default:
}
+ return nil
}
func (q *receiveQueue) Sig() <-chan struct{} {
diff --git a/pkg/kv/kvserver/storeliveness/support_manager_test.go b/pkg/kv/kvserver/storeliveness/support_manager_test.go
index d265d4a9c84f..1dac24d34af2 100644
--- a/pkg/kv/kvserver/storeliveness/support_manager_test.go
+++ b/pkg/kv/kvserver/storeliveness/support_manager_test.go
@@ -79,7 +79,7 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
Epoch: slpb.Epoch(1),
Expiration: requestedExpiration,
}
- sm.HandleMessage(heartbeatResp)
+ require.NoError(t, sm.HandleMessage(heartbeatResp))
// Ensure support is provided as seen by SupportFrom.
testutils.SucceedsSoon(
@@ -122,7 +122,7 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
Epoch: slpb.Epoch(1),
Expiration: sm.clock.Now().AddDuration(time.Second),
}
- sm.HandleMessage(heartbeat)
+ require.NoError(t, sm.HandleMessage(heartbeat))
// Ensure a response is sent.
testutils.SucceedsSoon(
@@ -257,6 +257,8 @@ func TestSupportManagerRestart(t *testing.T) {
require.True(t, withdrawalTime.Less(now))
}
+// TestSupportManagerDiskStall tests that the SupportManager continues to
+// respond to SupportFrom and SupportFor calls when its disk is stalled.
func TestSupportManagerDiskStall(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
@@ -328,6 +330,42 @@ func TestSupportManagerDiskStall(t *testing.T) {
ensureHeartbeats(t, sender, 1)
}
+// TestSupportManagerReceiveQueueLimit tests that the receive queue returns
+// errors when the queue size limit is reached.
+func TestSupportManagerReceiveQueueLimit(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ engine := storage.NewDefaultInMemForTesting()
+ defer engine.Close()
+ settings := clustersettings.MakeTestingClusterSettings()
+ stopper := stop.NewStopper()
+ defer stopper.Stop(ctx)
+ manual := hlc.NewHybridManualClock()
+ clock := hlc.NewClockForTesting(manual)
+ sender := &testMessageSender{}
+ sm := NewSupportManager(store, engine, options, settings, stopper, clock, sender)
+ // Initialize the SupportManager without starting the main goroutine.
+ require.NoError(t, sm.onRestart(ctx))
+
+ heartbeat := &slpb.Message{
+ Type: slpb.MsgHeartbeat,
+ From: remoteStore,
+ To: sm.storeID,
+ Epoch: slpb.Epoch(1),
+ Expiration: clock.Now().AddDuration(sm.options.LivenessInterval),
+ }
+
+ for i := 0; i < maxReceiveQueueSize; i++ {
+ require.NoError(t, sm.HandleMessage(heartbeat))
+ }
+
+ // Nothing is consuming messages from the queue, so the next HandleMessage
+ // should result in an error.
+ require.Regexp(t, sm.HandleMessage(heartbeat), "store liveness receive queue is full")
+}
+
func ensureHeartbeats(t *testing.T, sender *testMessageSender, expectedNum int) []slpb.Message {
var msgs []slpb.Message
testutils.SucceedsSoon(
diff --git a/pkg/kv/kvserver/storeliveness/transport.go b/pkg/kv/kvserver/storeliveness/transport.go
index 22a731cbb2b0..1fbf72c33c24 100644
--- a/pkg/kv/kvserver/storeliveness/transport.go
+++ b/pkg/kv/kvserver/storeliveness/transport.go
@@ -44,7 +44,7 @@ const (
connClass = rpc.SystemClass
)
-var logSendQueueFullEvery = log.Every(1 * time.Second)
+var logQueueFullEvery = log.Every(1 * time.Second)
// MessageHandler is the interface that must be implemented by
// arguments to Transport.ListenMessages.
@@ -53,7 +53,7 @@ type MessageHandler interface {
// block (e.g. do a synchronous disk write) to prevent a single store with a
// problem (e.g. a stalled disk) from affecting message receipt by other
// stores on the same node.
- HandleMessage(msg *slpb.Message)
+ HandleMessage(msg *slpb.Message) error
}
// sendQueue is a queue of outgoing Messages.
@@ -74,11 +74,15 @@ type Transport struct {
stopper *stop.Stopper
clock *hlc.Clock
dialer *nodedialer.Dialer
+ metrics *TransportMetrics
// queues stores outgoing message queues keyed by the destination node ID.
queues syncutil.Map[roachpb.NodeID, sendQueue]
// handlers stores the MessageHandler for each store on the node.
handlers syncutil.Map[roachpb.StoreID, MessageHandler]
+
+ // TransportTestingKnobs includes all knobs for testing.
+ knobs *TransportTestingKnobs
}
var _ MessageSender = (*Transport)(nil)
@@ -90,12 +94,18 @@ func NewTransport(
clock *hlc.Clock,
dialer *nodedialer.Dialer,
grpcServer *grpc.Server,
+ knobs *TransportTestingKnobs,
) *Transport {
+ if knobs == nil {
+ knobs = &TransportTestingKnobs{}
+ }
t := &Transport{
AmbientContext: ambient,
stopper: stopper,
clock: clock,
dialer: dialer,
+ metrics: newTransportMetrics(),
+ knobs: knobs,
}
if grpcServer != nil {
slpb.RegisterStoreLivenessServer(grpcServer, t)
@@ -103,6 +113,11 @@ func NewTransport(
return t
}
+// Metrics returns metrics tracking this transport.
+func (t *Transport) Metrics() *TransportMetrics {
+ return t.metrics
+}
+
// ListenMessages registers a MessageHandler to receive proxied messages.
func (t *Transport) ListenMessages(storeID roachpb.StoreID, handler MessageHandler) {
t.handlers.Store(storeID, &handler)
@@ -163,8 +178,17 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) {
)
return
}
-
- (*handler).HandleMessage(msg)
+ if err := (*handler).HandleMessage(msg); err != nil {
+ if logQueueFullEvery.ShouldLog() {
+ log.Warningf(
+ t.AnnotateCtx(context.Background()),
+ "error handling message to store %v: %v", msg.To, err,
+ )
+ }
+ t.metrics.MessagesReceiveDropped.Inc(1)
+ return
+ }
+ t.metrics.MessagesReceived.Inc(1)
}
// SendAsync implements the MessageSender interface. It sends a message to the
@@ -174,13 +198,14 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) {
// The returned bool may be a false positive but will never be a false negative;
// if sent is true the message may or may not actually be sent but if it's false
// the message definitely was not sent.
-func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (sent bool) {
+func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued bool) {
toNodeID := msg.To.NodeID
fromNodeID := msg.From.NodeID
// If this is a message from one local store to another local store, do not
// dial the node and go through GRPC; instead, handle the message directly
// using the corresponding message handler.
if toNodeID == fromNodeID {
+ t.metrics.MessagesSent.Inc(1)
// Make a copy of the message to avoid escaping the function argument
// msg to the heap.
msgCopy := msg
@@ -188,6 +213,7 @@ func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (sent bool)
return true
}
if b, ok := t.dialer.GetCircuitBreaker(toNodeID, connClass); ok && b.Signal().Err() != nil {
+ t.metrics.MessagesSendDropped.Inc(1)
return false
}
@@ -202,14 +228,17 @@ func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (sent bool)
select {
case q.messages <- msg:
+ t.metrics.SendQueueSize.Inc(1)
+ t.metrics.SendQueueBytes.Inc(int64(msg.Size()))
return true
default:
- if logSendQueueFullEvery.ShouldLog() {
+ if logQueueFullEvery.ShouldLog() {
log.Warningf(
t.AnnotateCtx(context.Background()),
"store liveness send queue to n%d is full", toNodeID,
)
}
+ t.metrics.MessagesSendDropped.Inc(1)
return false
}
}
@@ -234,14 +263,31 @@ func (t *Transport) getQueue(nodeID roachpb.NodeID) (*sendQueue, bool) {
func (t *Transport) startProcessNewQueue(
ctx context.Context, toNodeID roachpb.NodeID,
) (started bool) {
+ cleanup := func() {
+ q, ok := t.getQueue(toNodeID)
+ t.queues.Delete(toNodeID)
+ // Account for all remaining messages in the queue. SendAsync may be
+ // writing to the queue concurrently, so it's possible that we won't
+ // account for a few messages below.
+ if ok {
+ for {
+ select {
+ case m := <-q.messages:
+ t.metrics.MessagesSendDropped.Inc(1)
+ t.metrics.SendQueueSize.Dec(1)
+ t.metrics.SendQueueBytes.Dec(int64(m.Size()))
+ default:
+ return
+ }
+ }
+ }
+ }
worker := func(ctx context.Context) {
q, existingQueue := t.getQueue(toNodeID)
if !existingQueue {
log.Fatalf(ctx, "queue for n%d does not exist", toNodeID)
}
- defer func() {
- t.queues.Delete(toNodeID)
- }()
+ defer cleanup()
conn, err := t.dialer.Dial(ctx, toNodeID, connClass)
if err != nil {
// DialNode already logs sufficiently, so just return.
@@ -269,7 +315,7 @@ func (t *Transport) startProcessNewQueue(
},
)
if err != nil {
- t.queues.Delete(toNodeID)
+ cleanup()
return false
}
return true
@@ -285,25 +331,33 @@ func (t *Transport) processQueue(q *sendQueue, stream slpb.StoreLiveness_StreamC
err = errors.Join(err, closeErr)
}()
+ getIdleTimeout := func() time.Duration {
+ if overrideFn := t.knobs.OverrideIdleTimeout; overrideFn != nil {
+ return overrideFn()
+ } else {
+ return idleTimeout
+ }
+ }
var idleTimer timeutil.Timer
defer idleTimer.Stop()
var batchTimer timeutil.Timer
defer batchTimer.Stop()
batch := &slpb.MessageBatch{}
for {
- idleTimer.Reset(idleTimeout)
+ idleTimer.Reset(getIdleTimeout())
select {
case <-t.stopper.ShouldQuiesce():
return nil
- // TODO(mira): add a metric for idle queues (as part of #125067) and a
- // unit test.
case <-idleTimer.C:
idleTimer.Read = true
+ t.metrics.SendQueueIdle.Inc(1)
return nil
case msg := <-q.messages:
batch.Messages = append(batch.Messages, msg)
+ t.metrics.SendQueueSize.Dec(1)
+ t.metrics.SendQueueBytes.Dec(int64(msg.Size()))
// Pull off as many queued requests as possible within batchDuration.
batchTimer.Reset(batchDuration)
@@ -311,6 +365,8 @@ func (t *Transport) processQueue(q *sendQueue, stream slpb.StoreLiveness_StreamC
select {
case msg = <-q.messages:
batch.Messages = append(batch.Messages, msg)
+ t.metrics.SendQueueSize.Dec(1)
+ t.metrics.SendQueueBytes.Dec(int64(msg.Size()))
case <-batchTimer.C:
batchTimer.Read = true
}
@@ -318,8 +374,10 @@ func (t *Transport) processQueue(q *sendQueue, stream slpb.StoreLiveness_StreamC
batch.Now = t.clock.NowAsClockTimestamp()
if err = stream.Send(batch); err != nil {
+ t.metrics.MessagesSendDropped.Inc(int64(len(batch.Messages)))
return err
}
+ t.metrics.MessagesSent.Inc(int64(len(batch.Messages)))
// Reuse the Messages slice, but zero out the contents to avoid delaying
// GC of memory referenced from within.
@@ -331,3 +389,11 @@ func (t *Transport) processQueue(q *sendQueue, stream slpb.StoreLiveness_StreamC
}
}
}
+
+// TransportTestingKnobs includes all knobs that facilitate testing Transport.
+type TransportTestingKnobs struct {
+ // OverrideIdleTimeout overrides the idleTimeout, which controls how
+ // long until an instance of processQueue winds down after not observing any
+ // messages.
+ OverrideIdleTimeout func() time.Duration
+}
diff --git a/pkg/kv/kvserver/storeliveness/transport_test.go b/pkg/kv/kvserver/storeliveness/transport_test.go
index 543634224f5a..8ba46fcdf780 100644
--- a/pkg/kv/kvserver/storeliveness/transport_test.go
+++ b/pkg/kv/kvserver/storeliveness/transport_test.go
@@ -49,10 +49,15 @@ func newMessageHandler(size int) testMessageHandler {
}
}
-func (tmh *testMessageHandler) HandleMessage(msg *slpb.Message) {
+func (tmh *testMessageHandler) HandleMessage(msg *slpb.Message) error {
// Simulate a message handling delay.
time.Sleep(time.Duration(rand.Int63n(int64(maxDelay))))
- tmh.messages <- msg
+ select {
+ case tmh.messages <- msg:
+ return nil
+ default:
+ return receiveQueueSizeLimitReachedErr
+ }
}
var _ MessageHandler = (*testMessageHandler)(nil)
@@ -75,16 +80,18 @@ type transportTester struct {
nodeRPCContext *rpc.Context
clocks map[roachpb.NodeID]clockWithManualSource
transports map[roachpb.NodeID]*Transport
+ maxHandlerSize int
}
func newTransportTester(t testing.TB, st *cluster.Settings) *transportTester {
ctx := context.Background()
tt := &transportTester{
- t: t,
- st: st,
- stopper: stop.NewStopper(),
- clocks: map[roachpb.NodeID]clockWithManualSource{},
- transports: map[roachpb.NodeID]*Transport{},
+ t: t,
+ st: st,
+ stopper: stop.NewStopper(),
+ clocks: map[roachpb.NodeID]clockWithManualSource{},
+ transports: map[roachpb.NodeID]*Transport{},
+ maxHandlerSize: maxReceiveQueueSize,
}
opts := rpc.DefaultContextOptions()
@@ -123,6 +130,7 @@ func (tt *transportTester) AddNodeWithoutGossip(
clock,
nodedialer.New(tt.nodeRPCContext, gossip.AddressResolver(tt.gossip)),
grpcServer,
+ nil, /* knobs */
)
tt.transports[nodeID] = transport
@@ -154,7 +162,7 @@ func (tt *transportTester) UpdateGossip(nodeID roachpb.NodeID, address net.Addr)
// AddStore registers a store on a node and returns a message handler for
// messages sent to that store.
func (tt *transportTester) AddStore(id slpb.StoreIdent) testMessageHandler {
- handler := newMessageHandler(100)
+ handler := newMessageHandler(tt.maxHandlerSize)
tt.transports[id.NodeID].ListenMessages(id.StoreID, &handler)
return handler
}
@@ -215,6 +223,10 @@ func TestTransportSendAndReceive(t *testing.T) {
}
require.ElementsMatch(t, stores, senders)
}
+ // There are two stores per node, so we expect the number of messages sent and
+ // received by each node to be equal to twice the number of stores.
+ require.Equal(t, 2*int64(len(stores)), tt.transports[node1].metrics.MessagesSent.Count())
+ require.Equal(t, 2*int64(len(stores)), tt.transports[node2].metrics.MessagesReceived.Count())
}
// TestTransportRestartedNode simulates a node restart by stopping a node's
@@ -247,19 +259,49 @@ func TestTransportRestartedNode(t *testing.T) {
msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver}
- checkSend := func(expectedSuccess bool) {
+ checkEnqueued := func(expectedEnqueued bool) {
+ testutils.SucceedsSoon(
+ t, func() error {
+ enqueued := tt.transports[sender.NodeID].SendAsync(ctx, msg)
+ if enqueued != expectedEnqueued {
+ return errors.Newf("enqueue success is still %v", enqueued)
+ }
+ return nil
+ },
+ )
+ }
+
+ checkSent := func() {
+ initialSent := tt.transports[sender.NodeID].metrics.MessagesSent.Count()
+ testutils.SucceedsSoon(
+ t, func() error {
+ tt.transports[sender.NodeID].SendAsync(ctx, msg)
+ sent := tt.transports[sender.NodeID].metrics.MessagesSent.Count()
+ if initialSent >= sent {
+ return errors.Newf("message not sent yet; initial %d, current %d", initialSent, sent)
+ }
+ return nil
+ },
+ )
+ }
+
+ checkDropped := func() {
+ initialDropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count()
testutils.SucceedsSoon(
t, func() error {
- sendSuccess := tt.transports[sender.NodeID].SendAsync(ctx, msg)
- if sendSuccess != expectedSuccess {
- return errors.Newf("send success is still %v", sendSuccess)
+ tt.transports[sender.NodeID].SendAsync(ctx, msg)
+ dropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count()
+ if initialDropped >= dropped {
+ return errors.Newf(
+ "message not dropped yet; initial %d, current %d", initialDropped, dropped,
+ )
}
return nil
},
)
}
- checkReceive := func() {
+ checkReceived := func() {
testutils.SucceedsSoon(
t, func() error {
select {
@@ -279,25 +321,36 @@ func TestTransportRestartedNode(t *testing.T) {
// Part 1: send a message to the receiver whose address hasn't been gossiped yet.
// The message is sent out successfully.
- checkSend(true /* expectedSuccess */)
+ checkEnqueued(true /* expectedEnqueued */)
+ // The message sent as part of checkSend above will likely be dropped it's
+ // also possible that the SendAsync races with the deletion of the send queue
+ // (due to the failed node dial), in which case a dropped message will not be
+ // recorded.
+ checkDropped()
// Part 2: send messages to the receiver, whose address is now gossiped, and
// assert the messages are received.
tt.UpdateGossip(receiver.NodeID, addr)
- checkReceive()
+ checkEnqueued(true /* expectedEnqueued */)
+ checkSent()
+ checkReceived()
// Part 3: send messages to the crashed receiver and ensure the message send
// fails after the circuit breaker kicks in.
receiverStopper.Stop(context.Background())
- checkSend(false /* expectedSuccess */)
+ checkEnqueued(false /* expectedEnqueued */)
+ // Subsequent calls to SendAsync are expected to result in messages being
+ // dropped due to the tripped circuit breaker.
+ checkDropped()
// Part 4: send messages to the restarted/replaced receiver; ensure the
// message send succeeds (after the circuit breaker un-trips) and the messages
// are received.
tt.AddNode(receiver.NodeID)
tt.AddStore(receiver)
- checkSend(true /* expectedSuccess */)
- checkReceive()
+ checkEnqueued(true /* expectedEnqueued */)
+ checkSent()
+ checkReceived()
}
// TestTransportSendToMissingStore verifies that sending a message to a store
@@ -337,12 +390,16 @@ func TestTransportSendToMissingStore(t *testing.T) {
select {
case received := <-handler.messages:
require.Equal(t, existingMsg, *received)
+ require.Equal(
+ t, int64(1), tt.transports[existingRcv.NodeID].metrics.MessagesReceived.Count(),
+ )
return nil
default:
}
return errors.New("still waiting to receive message")
},
)
+ require.Equal(t, int64(2), tt.transports[sender.NodeID].metrics.MessagesSent.Count())
}
// TestTransportClockPropagation verifies that the HLC clock timestamps are
@@ -456,3 +513,115 @@ func TestTransportShortCircuit(t *testing.T) {
}, "sending message to a remote store with a nil dialer",
)
}
+
+// TestTransportIdleSendQueue tests that the send queue idles out.
+func TestTransportIdleSendQueue(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ tt := newTransportTester(t, cluster.MakeTestingClusterSettings())
+ defer tt.Stop()
+
+ node1, node2 := roachpb.NodeID(1), roachpb.NodeID(2)
+ sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)}
+ receiver := slpb.StoreIdent{NodeID: node2, StoreID: roachpb.StoreID(2)}
+ msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver}
+
+ tt.AddNode(node1)
+ tt.AddNode(node2)
+ tt.AddStore(sender)
+ handler := tt.AddStore(receiver)
+
+ tt.transports[sender.NodeID].knobs.OverrideIdleTimeout = func() time.Duration {
+ return time.Millisecond
+ }
+
+ // Send and receive a message.
+ require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg))
+ testutils.SucceedsSoon(
+ t, func() error {
+ select {
+ case received := <-handler.messages:
+ require.Equal(t, msg, *received)
+ return nil
+ default:
+ }
+ return errors.New("still waiting to receive message")
+ },
+ )
+
+ testutils.SucceedsSoon(
+ t, func() error {
+ if tt.transports[sender.NodeID].metrics.SendQueueIdle.Count() != int64(1) {
+ return errors.New("idle queue metrics not incremented yet")
+ }
+ return nil
+ },
+ )
+}
+
+// TestTransportFullReceiveQueue tests that messages are dropped when the
+// receive queue reaches its max size.
+func TestTransportFullReceiveQueue(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+ ctx := context.Background()
+ tt := newTransportTester(t, cluster.MakeTestingClusterSettings())
+ tt.maxHandlerSize = 100
+ defer tt.Stop()
+
+ node1, node2 := roachpb.NodeID(1), roachpb.NodeID(2)
+ sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)}
+ receiver := slpb.StoreIdent{NodeID: node2, StoreID: roachpb.StoreID(2)}
+ msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver}
+
+ tt.AddNode(node1)
+ tt.AddNode(node2)
+ tt.AddStore(sender)
+ tt.AddStore(receiver)
+
+ // Fill up the receive queue of the receiver. Nothing is consuming from it.
+ sendDropped := 0
+ for i := 0; i < tt.maxHandlerSize; i++ {
+ testutils.SucceedsSoon(
+ t, func() error {
+ // The message enqueue can fail temporarily if the sender queue fills up.
+ if !tt.transports[sender.NodeID].SendAsync(ctx, msg) {
+ sendDropped++
+ return errors.New("still waiting to enqueue message")
+ }
+ return nil
+ },
+ )
+ }
+
+ require.Equal(
+ t, int64(sendDropped), tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count(),
+ )
+ testutils.SucceedsSoon(
+ t, func() error {
+ if tt.transports[sender.NodeID].metrics.MessagesSent.Count() != int64(tt.maxHandlerSize) {
+ return errors.New("not all messages are sent yet")
+ }
+ return nil
+ },
+ )
+ testutils.SucceedsSoon(
+ t, func() error {
+ if tt.transports[receiver.NodeID].metrics.MessagesReceived.Count() != int64(tt.maxHandlerSize) {
+ return errors.New("not all messages are received yet")
+ }
+ return nil
+ },
+ )
+ // The receiver queue is full but the enqueue to the sender queue succeeds.
+ require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg))
+ testutils.SucceedsSoon(
+ t, func() error {
+ if tt.transports[receiver.NodeID].metrics.MessagesReceiveDropped.Count() != int64(1) {
+ return errors.New("message not dropped yet")
+ }
+ return nil
+ },
+ )
+}
diff --git a/pkg/server/server.go b/pkg/server/server.go
index fad3ad9922d1..14a6dd8ef334 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -656,8 +656,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
nodeRegistry.AddMetricStruct(raftTransport.Metrics())
storeLivenessTransport := storeliveness.NewTransport(
- cfg.AmbientCtx, stopper, clock, kvNodeDialer, grpcServer.Server,
+ cfg.AmbientCtx, stopper, clock, kvNodeDialer, grpcServer.Server, nil, /* knobs */
)
+ nodeRegistry.AddMetricStruct(storeLivenessTransport.Metrics())
ctSender := sidetransport.NewSender(stopper, st, clock, kvNodeDialer)
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go
index 7e0a6d586d59..067215c4ccd8 100644
--- a/pkg/testutils/localtestcluster/local_test_cluster.go
+++ b/pkg/testutils/localtestcluster/local_test_cluster.go
@@ -185,7 +185,8 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) {
ltc.DB = kv.NewDBWithContext(cfg.AmbientCtx, factory, ltc.Clock, *ltc.dbContext)
transport := kvserver.NewDummyRaftTransport(cfg.AmbientCtx, cfg.Settings, ltc.Clock)
storeLivenessTransport := storeliveness.NewTransport(
- cfg.AmbientCtx, ltc.stopper, ltc.Clock, nil, nil,
+ cfg.AmbientCtx, ltc.stopper, ltc.Clock,
+ nil /* dialer */, nil /* grpcServer */, nil, /* knobs */
)
// By default, disable the replica scanner and split queue, which
// confuse tests using LocalTestCluster.