Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver/rangefeed: introduce StreamManager #134958

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"scheduled_processor.go",
"scheduler.go",
"stream.go",
"stream_manager.go",
"task.go",
"test_helpers.go",
"unbuffered_sender.go",
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/rangefeed/sender_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func (c *testRangefeedCounter) UpdateMetricsOnRangefeedConnect() {
}

func (c *testRangefeedCounter) UpdateMetricsOnRangefeedDisconnect() {
c.count.Add(-1)
c.UpdateMetricsOnRangefeedDisconnectBy(1)
}

func (c *testRangefeedCounter) UpdateMetricsOnRangefeedDisconnectBy(num int64) {
c.count.Add(int32(-num))
}

func (c *testRangefeedCounter) get() int {
Expand Down
203 changes: 203 additions & 0 deletions pkg/kv/kvserver/rangefeed/stream_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package rangefeed

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// +-----------------+ +-----------------+ +-----------------+
// | | Disconnect | | r.disconnect | |
// | MuxRangeFeed +-------------->| StreamManager +----------------->| Registration |
// | | | | Send/SendError | |
// | | | |<-----------------+ |
// +-----------------+ +-----------------+ +-------------+---+
// ^ |
// r.disconnect| | p.asyncUnregisterReq
// | v
// +-----------------+ +---+-------------+
// | | UnregFromReplica| |
// | Replica |<----------------+ Processor |
// | | | |
// +-----------------+ +-----------------+

// StreamManager manages one or more streams. It is responsible for starting and
// stopping the underlying sender, as well as managing the streams themselves.
type StreamManager struct {
// taskCancel cancels the context used by the sender.runspawn goroutine,
// causing it to exit. It is called in Stop().
taskCancel context.CancelFunc

// wg is used to coordinate goroutines spawned by StreamManager.
wg sync.WaitGroup

// errCh delivers errors from sender.run back to the caller. If non-empty, the
// sender.run is finished and error should be handled. Note that it is
// possible for sender.run to be finished without sending an error to errCh.
errCh chan error

// Implemented by UnbufferedSender and BufferedSender. Implementations should
// ensure that sendUnbuffered and sendBuffered are thread-safe.
sender sender

// streamID -> Disconnector
streams struct {
syncutil.Mutex
m map[int64]Disconnector
}

// metrics is used to record rangefeed metrics for the node. It tracks number
// of active rangefeeds.
metrics RangefeedMetricsRecorder
}

// sender is an interface that is implemented by BufferedSender and
// UnbufferedSender. It is wrapped under StreamManager, Stream, and
// BufferedStream.
type sender interface {
// sendUnbuffered sends a RangeFeedEvent to the underlying grpc stream
// directly. This call may block and must be thread-safe.
sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error
// sendBuffered buffers a RangeFeedEvent before sending to the underlying grpc
// stream. This call must be non-blocking and thread-safe.
sendBuffered(ev *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation) error
// run is the main loop for the sender. It is expected to run in the
// background until a node level error is encountered which would shut down
// all streams in StreamManager.
run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error
// cleanup is called when the sender is stopped. It is expected to clean up
// any resources used by the sender.
cleanup(ctx context.Context)
}

func NewStreamManager(sender sender, metrics RangefeedMetricsRecorder) *StreamManager {
sm := &StreamManager{
sender: sender,
metrics: metrics,
}
sm.streams.m = make(map[int64]Disconnector)
return sm
}

func (sm *StreamManager) NewStream(streamID int64, rangeID roachpb.RangeID) (sink Stream) {
log.Fatalf(context.Background(), "unexpected sender type %T", sm)
return nil
}

// OnError is a callback that is called when a sender sends a rangefeed
// completion error back to the client. Note that we check for the existence of
// streamID to avoid metrics inaccuracy when the error is sent before the stream
// is added to the StreamManager.
func (sm *StreamManager) OnError(streamID int64) {
sm.streams.Lock()
defer sm.streams.Unlock()
if _, ok := sm.streams.m[streamID]; ok {
delete(sm.streams.m, streamID)
sm.metrics.UpdateMetricsOnRangefeedDisconnect()
}
}

// DisconnectStream disconnects the stream with the given streamID.
func (sm *StreamManager) DisconnectStream(streamID int64, err *kvpb.Error) {
if err == nil {
log.Fatalf(context.Background(),
"unexpected: DisconnectStream called with nil error")
return
}
sm.streams.Lock()
defer sm.streams.Unlock()
if disconnector, ok := sm.streams.m[streamID]; ok {
// Fine to skip nil checking here since that would be a programming error.
disconnector.Disconnect(err)
}
}

// AddStream adds a streamID with its disconnector to the StreamManager.
// StreamManager can use the disconnector to shut down the rangefeed stream
// later on.
func (sm *StreamManager) AddStream(streamID int64, d Disconnector) {
// At this point, the stream had been registered with the processor and
// started receiving events. We need to lock here to avoid race conditions
// with a disconnect error passing through before the stream is added.
sm.streams.Lock()
defer sm.streams.Unlock()
if d.IsDisconnected() {
// If the stream is already disconnected, we don't add it to streams. The
// registration will have already sent an error to the client.
return
}
if _, ok := sm.streams.m[streamID]; ok {
log.Fatalf(context.Background(), "stream %d already exists", streamID)
}
sm.streams.m[streamID] = d
sm.metrics.UpdateMetricsOnRangefeedConnect()
}

// Start launches sender.run in the background if no error is returned.
// sender.run continues running until it errors or StreamManager.Stop is called.
// Note that it is not valid to call Start multiple times or restart after Stop.
// Example usage:
//
// if err := StreamManager.Start(ctx, stopper); err != nil {
// return err
// }
//
// defer StreamManager.Stop()
func (sm *StreamManager) Start(ctx context.Context, stopper *stop.Stopper) error {
sm.errCh = make(chan error, 1)
sm.wg.Add(1)
ctx, sm.taskCancel = context.WithCancel(ctx)
if err := stopper.RunAsyncTask(ctx, "stream-manager-sender", func(ctx context.Context) {
defer sm.wg.Done()
if err := sm.sender.run(ctx, stopper, sm.OnError); err != nil {
sm.errCh <- err
}
}); err != nil {
sm.taskCancel()
sm.wg.Done()
return err
}
return nil
}

// Stop cancels the sender.run task and waits for it to complete. It does
// nothing if sender.run is already finished. It is expected to be called after
// StreamManager.Start.
func (sm *StreamManager) Stop(ctx context.Context) {
sm.taskCancel()
sm.wg.Wait()
sm.sender.cleanup(ctx)
sm.streams.Lock()
defer sm.streams.Unlock()
rangefeedClosedErr := kvpb.NewError(
kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))
sm.metrics.UpdateMetricsOnRangefeedDisconnectBy(int64(len(sm.streams.m)))
for _, disconnector := range sm.streams.m {
// Disconnect all streams with a retry error. No rangefeed errors will be
// sent to the client after shutdown, but the gRPC stream will still
// terminate.
disconnector.Disconnect(rangefeedClosedErr)
}
sm.streams.m = make(map[int64]Disconnector)
}

// Error returns a channel for receiving errors from sender.run. Only non-nil
// errors are sent, and at most one error is delivered. If the channel is
// non-empty, sender.run has finished, and the error should be handled.
// sender.run may also finish without sending anything to the channel.
func (sm *StreamManager) Error() chan error {
if sm.errCh == nil {
log.Fatalf(context.Background(), "StreamManager.Error called before StreamManager.Start")
}
return sm.errCh
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/unbuffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type RangefeedMetricsRecorder interface {
UpdateMetricsOnRangefeedConnect()
UpdateMetricsOnRangefeedDisconnect()
UpdateMetricsOnRangefeedDisconnectBy(num int64)
}

// ServerStreamSender forwards MuxRangefeedEvents from UnbufferedSender to the
Expand Down
12 changes: 9 additions & 3 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,16 @@ func (nm *nodeMetrics) UpdateMetricsOnRangefeedConnect() {
nm.ActiveMuxRangeFeed.Inc(1)
}

// UpdateOnRangefeedDisconnect decrements rangefeed metrics when a server
// rangefeed is disconnected.
// UpdateMetricsOnRangefeedDisconnect decrements rangefeed metrics when one
// server rangefeed is disconnected.
func (nm *nodeMetrics) UpdateMetricsOnRangefeedDisconnect() {
nm.ActiveMuxRangeFeed.Dec(1)
nm.UpdateMetricsOnRangefeedDisconnectBy(1)
}

// UpdateMetricsOnRangefeedDisconnectBy decrements rangefeed metrics by the
// given num argument when there are multiple rangefeed disconnects.
func (nm *nodeMetrics) UpdateMetricsOnRangefeedDisconnectBy(num int64) {
nm.ActiveMuxRangeFeed.Dec(num)
}

// A Node manages a map of stores (by store ID) for which it serves
Expand Down