Skip to content

Commit

Permalink
storeliveness: limit the max size of the SupportManager receive queue
Browse files Browse the repository at this point in the history
Previously, the SupportManager's receive queue could grown unbounded if
incoming messages were not processed fast enough. This can happen in
the case of a disk stall when the SupportManager's processing goroutine
is blobked on a write and not handling any new incoming messages.

This commit adds a maximum size to the receive queue. After the maximum
size is reached, handling further incoming messages returns an error,
which is interpreted as a dropped message in the transport.

Part of: #125063

Release note: None
  • Loading branch information
miraradeva committed Sep 20, 2024
1 parent df958c6 commit 5e3ed64
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
supportGracePeriod := s.cfg.RPCContext.StoreLivenessWithdrawalGracePeriod()
options := storeliveness.NewOptions(heartbeatInterval, livenessInterval, supportGracePeriod)
sm := storeliveness.NewSupportManager(
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID}, s.StateEngine(), options,
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID, StoreID: s.StoreID()}, s.StateEngine(), options,
s.cfg.Settings, s.stopper, s.cfg.Clock, s.cfg.StoreLivenessTransport,
)
s.cfg.StoreLivenessTransport.ListenMessages(s.StoreID(), sm)
Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -85,8 +86,8 @@ func NewSupportManager(

// HandleMessage implements the MessageHandler interface. It appends incoming
// messages to a queue and does not block on processing the messages.
func (sm *SupportManager) HandleMessage(msg *slpb.Message) {
sm.receiveQueue.Append(msg)
func (sm *SupportManager) HandleMessage(msg *slpb.Message) error {
return sm.receiveQueue.Append(msg)
}

var _ MessageHandler = (*SupportManager)(nil)
Expand Down Expand Up @@ -342,6 +343,11 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
log.VInfof(ctx, 2, "store %+v sent %d responses", sm.storeID, len(responses))
}

// maxReceiveQueueSize is the maximum number of messages the receive queue can
// store. If message consumption is slow (e.g. due to a disk stall) and the
// queue reaches maxReceiveQueueSize, incoming messages will be dropped.
const maxReceiveQueueSize = 10000

// receiveQueue stores all received messages from the MessageHandler and allows
// them to be processed async and in batch.
type receiveQueue struct {
Expand All @@ -358,14 +364,19 @@ func newReceiveQueue() receiveQueue {
}
}

func (q *receiveQueue) Append(msg *slpb.Message) {
func (q *receiveQueue) Append(msg *slpb.Message) error {
q.mu.Lock()
defer q.mu.Unlock()
// Drop messages if maxReceiveQueueSize is reached.
if len(q.mu.msgs) >= maxReceiveQueueSize {
return errors.Errorf("store liveness receive queue is full")
}
q.mu.msgs = append(q.mu.msgs, msg)
select {
case q.sig <- struct{}{}:
default:
}
return nil
}

func (q *receiveQueue) Sig() <-chan struct{} {
Expand Down
42 changes: 40 additions & 2 deletions pkg/kv/kvserver/storeliveness/support_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
Epoch: slpb.Epoch(1),
Expiration: requestedExpiration,
}
sm.HandleMessage(heartbeatResp)
require.NoError(t, sm.HandleMessage(heartbeatResp))

// Ensure support is provided as seen by SupportFrom.
testutils.SucceedsSoon(
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
Epoch: slpb.Epoch(1),
Expiration: sm.clock.Now().AddDuration(time.Second),
}
sm.HandleMessage(heartbeat)
require.NoError(t, sm.HandleMessage(heartbeat))

// Ensure a response is sent.
testutils.SucceedsSoon(
Expand Down Expand Up @@ -257,6 +257,8 @@ func TestSupportManagerRestart(t *testing.T) {
require.True(t, withdrawalTime.Less(now))
}

// TestSupportManagerDiskStall tests that the SupportManager continues to
// respond to SupportFrom and SupportFor calls when its disk is stalled.
func TestSupportManagerDiskStall(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -328,6 +330,42 @@ func TestSupportManagerDiskStall(t *testing.T) {
ensureHeartbeats(t, sender, 1)
}

// TestSupportManagerReceiveQueueLimit tests that the receive queue returns
// errors when the queue size limit is reached.
func TestSupportManagerReceiveQueueLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()
settings := clustersettings.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
manual := hlc.NewHybridManualClock()
clock := hlc.NewClockForTesting(manual)
sender := &testMessageSender{}
sm := NewSupportManager(store, engine, options, settings, stopper, clock, sender)
// Initialize the SupportManager without starting the main goroutine.
require.NoError(t, sm.onRestart(ctx))

heartbeat := &slpb.Message{
Type: slpb.MsgHeartbeat,
From: remoteStore,
To: sm.storeID,
Epoch: slpb.Epoch(1),
Expiration: clock.Now().AddDuration(sm.options.LivenessInterval),
}

for i := 0; i < maxReceiveQueueSize; i++ {
require.NoError(t, sm.HandleMessage(heartbeat))
}

// Nothing is consuming messages from the queue, so the next HandleMessage
// should result in an error.
require.Regexp(t, sm.HandleMessage(heartbeat), "store liveness receive queue is full")
}

func ensureHeartbeats(t *testing.T, sender *testMessageSender, expectedNum int) []slpb.Message {
var msgs []slpb.Message
testutils.SucceedsSoon(
Expand Down
16 changes: 12 additions & 4 deletions pkg/kv/kvserver/storeliveness/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
connClass = rpc.SystemClass
)

var logSendQueueFullEvery = log.Every(1 * time.Second)
var logQueueFullEvery = log.Every(1 * time.Second)

// MessageHandler is the interface that must be implemented by
// arguments to Transport.ListenMessages.
Expand All @@ -53,7 +53,7 @@ type MessageHandler interface {
// block (e.g. do a synchronous disk write) to prevent a single store with a
// problem (e.g. a stalled disk) from affecting message receipt by other
// stores on the same node.
HandleMessage(msg *slpb.Message)
HandleMessage(msg *slpb.Message) error
}

// sendQueue is a queue of outgoing Messages.
Expand Down Expand Up @@ -178,7 +178,15 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) {
)
return
}
(*handler).HandleMessage(msg)
if err := (*handler).HandleMessage(msg); err != nil {
if logQueueFullEvery.ShouldLog() {
log.Warningf(
t.AnnotateCtx(context.Background()),
"error handling message to store %v: %v", msg.To, err,
)
}
t.metrics.MessagesDropped.Inc(1)
}
t.metrics.MessagesReceived.Inc(1)
}

Expand Down Expand Up @@ -223,7 +231,7 @@ func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued b
t.metrics.SendQueueBytes.Inc(int64(msg.Size()))
return true
default:
if logSendQueueFullEvery.ShouldLog() {
if logQueueFullEvery.ShouldLog() {
log.Warningf(
t.AnnotateCtx(context.Background()),
"store liveness send queue to n%d is full", toNodeID,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/storeliveness/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ func newMessageHandler(size int) testMessageHandler {
}
}

func (tmh *testMessageHandler) HandleMessage(msg *slpb.Message) {
func (tmh *testMessageHandler) HandleMessage(msg *slpb.Message) error {
// Simulate a message handling delay.
time.Sleep(time.Duration(rand.Int63n(int64(maxDelay))))
tmh.messages <- msg
return nil
}

var _ MessageHandler = (*testMessageHandler)(nil)
Expand Down

0 comments on commit 5e3ed64

Please sign in to comment.