Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver/rangefeed: remove per-registration goroutines #125872

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,8 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink)) // check if we've errored yet
_, rErr := s3.RangeFeed(sink.ctx, &req, sink)
require.NoError(t, rErr) // check if we've errored yet
require.NoError(t, sink.Error())
t.Logf("started rangefeed on %s", repl3)

Expand Down Expand Up @@ -1645,9 +1646,8 @@ func (c *channelSink) Error() error {
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (c *channelSink) Disconnect(err *kvpb.Error) {
// SendError implements the Stream interface.
func (c *channelSink) SendError(err *kvpb.Error) {
c.done <- err
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,8 @@ func (s *dummyStream) SendUnbuffered(ev *kvpb.RangeFeedEvent) error {
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *dummyStream) Disconnect(err *kvpb.Error) {
// SendError implements the Stream interface.
func (s *dummyStream) SendError(err *kvpb.Error) {
s.done <- err
}

Expand All @@ -489,7 +488,7 @@ func waitReplicaRangeFeed(
return stream.SendUnbuffered(&event)
}

err := r.RangeFeed(stream.ctx, req, stream, nil /* pacer */)
_, err := r.RangeFeed(stream.ctx, req, stream, nil /* pacer */)
if err != nil {
return sendErrToStream(kvpb.NewError(err))
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
"budget.go",
"buffered_registration.go",
"buffered_sender.go",
"buffered_stream.go",
"catchup_scan.go",
"event_queue.go",
"event_size.go",
Expand All @@ -18,8 +17,10 @@ go_library(
"scheduled_processor.go",
"scheduler.go",
"stream.go",
"stream_manager.go",
"task.go",
"test_helpers.go",
"unbuffered_registration.go",
"unbuffered_sender.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed",
Expand Down Expand Up @@ -63,6 +64,7 @@ go_test(
srcs = [
"bench_test.go",
"budget_test.go",
"buffered_sender_test.go",
"catchup_scan_bench_test.go",
"catchup_scan_test.go",
"event_queue_test.go",
Expand All @@ -75,6 +77,7 @@ go_test(
"scheduler_test.go",
"sender_helper_test.go",
"task_test.go",
"unbuffered_registration_test.go",
"unbuffered_sender_test.go",
],
embed = [":rangefeed"],
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
// extra data.
const withFiltering = false
streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)}
ok, _ := p.Register(ctx, span, hlc.MinTimestamp, nil,
ok, _, _ := p.Register(ctx, span, hlc.MinTimestamp, nil,
withDiff, withFiltering, false, /* withOmitRemote */
streams[i], nil)
require.True(b, ok)
Expand Down Expand Up @@ -202,9 +202,8 @@ func (s *noopStream) SendUnbuffered(*kvpb.RangeFeedEvent) error {
// thread-safety.
func (s *noopStream) SendUnbufferedIsThreadSafe() {}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *noopStream) Disconnect(error *kvpb.Error) {
// SendError implements the Stream interface.
func (s *noopStream) SendError(error *kvpb.Error) {
s.done <- error
}

Expand Down
33 changes: 27 additions & 6 deletions pkg/kv/kvserver/rangefeed/buffered_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func newBufferedRegistration(
metrics *Metrics,
stream Stream,
unregisterFn func(),
cleanup func(registration),
) *bufferedRegistration {
br := &bufferedRegistration{
baseRegistration: baseRegistration{
Expand All @@ -91,6 +92,7 @@ func newBufferedRegistration(
withDiff: withDiff,
withFiltering: withFiltering,
withOmitRemote: withOmitRemote,
cleanup: cleanup,
unreg: unregisterFn,
},
metrics: metrics,
Expand Down Expand Up @@ -148,10 +150,16 @@ func (br *bufferedRegistration) publish(
}
}

// disconnect cancels the output loop context for the registration and passes an
func (br *bufferedRegistration) IsDisconnected() bool {
br.mu.Lock()
defer br.mu.Unlock()
return br.mu.disconnected
}

// Disconnect cancels the output loop context for the registration and passes an
// error to the output error stream for the registration.
// Safe to run multiple times, but subsequent errors would be discarded.
func (br *bufferedRegistration) disconnect(pErr *kvpb.Error) {
func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
br.mu.Lock()
defer br.mu.Unlock()
if !br.mu.disconnected {
Expand All @@ -163,7 +171,8 @@ func (br *bufferedRegistration) disconnect(pErr *kvpb.Error) {
br.mu.outputLoopCancelFn()
}
br.mu.disconnected = true
br.stream.Disconnect(pErr)
br.cleanup(br)
br.stream.SendError(pErr)
}
}

Expand Down Expand Up @@ -212,10 +221,10 @@ func (br *bufferedRegistration) outputLoop(ctx context.Context) error {
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-br.streamCtx.Done():
return br.streamCtx.Err()
case <-ctx.Done():
return ctx.Err()
}
}
}
Expand All @@ -230,7 +239,7 @@ func (br *bufferedRegistration) runOutputLoop(ctx context.Context, _forStacks ro
ctx, br.mu.outputLoopCancelFn = context.WithCancel(ctx)
br.mu.Unlock()
err := br.outputLoop(ctx)
br.disconnect(kvpb.NewError(err))
br.Disconnect(kvpb.NewError(err))
}

// drainAllocations should be done after registration is disconnected from
Expand Down Expand Up @@ -301,3 +310,15 @@ func (br *bufferedRegistration) detachCatchUpIter() *CatchUpIterator {
br.mu.catchUpIter = nil
return catchUpIter
}

// Used for testing only.
func (br *bufferedRegistration) getBuf() chan *sharedEvent {
return br.buf
}

// Used for testing only.
func (br *bufferedRegistration) getOverflowed() bool {
br.mu.Lock()
defer br.mu.Unlock()
return br.mu.overflowed
}
180 changes: 117 additions & 63 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,98 +7,152 @@ package rangefeed

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ┌─────────────────────────────────────────┐ MuxRangefeedEvent
// │ Node.MuxRangeFeed │◄──────────────────────────────────────────────────┐
// └─────────────────┬───▲───────────────────┘ ▲ │
// Sender.AddStream │ │LockedMuxStream.Send │ │
// ┌────────────▼───┴──────────┐ │ │
// │ Buffered/Unbuffered Sender├───────────┐ │ │
// └────────────┬──────────────┘ │ │ │
// │ │ │ │
// ┌────────▼─────────┐ │ │ │
// │ Stores.Rangefeed │ │ │ │
// └────────┬─────────┘ │ │ │
// │ │ │ │
// ┌───────▼─────────┐ BufferedSender BufferedSender │
// │ Store.Rangefeed │ SendUnbuffered/SendBuffered SendBufferedError ─────► BufferedSender.run
// └───────┬─────────┘ (catch-up scan)(live raft) ▲
// │ ▲ │
// ┌────────▼──────────┐ │ │
// │ Replica.Rangefeed │ │ │
// └────────┬──────────┘ │ │
// │ │ │
// ┌───────▼──────┐ │ │
// │ Registration │ │ │
// └──────┬───────┘ │ │
// │ │ │
// │ │ │
// └─────────────────────────┘─────────────────┘
// BufferedPerRangeEventSink.Send BufferedPerRangeEventSink.Disconnect
//
// BufferedSender is embedded in every rangefeed.BufferedPerRangeEventSink,
// serving as a helper which buffers events before forwarding events to the
// underlying gRPC stream.
//
// Refer to the comments above UnbufferedSender for more details on the role of
// senders in the entire rangefeed architecture.
type BufferedSender struct {
// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
// thread safety.
sender ServerStreamSender

// metrics is used to record rangefeed metrics for the node.
metrics RangefeedMetricsRecorder
// queueMu protects the buffer queue.
queueMu struct {
syncutil.Mutex
stopped bool
buffer *eventQueue
}

// notifyDataC is used to notify the BufferedSender.run goroutine that there
// are events to send. Channel is initialised with a buffer of 1 and all writes to it
// are non-blocking.
notifyDataC chan struct{}
}

func NewBufferedSender(
sender ServerStreamSender, metrics RangefeedMetricsRecorder,
) *BufferedSender {
return &BufferedSender{
sender: sender,
metrics: metrics,
func NewBufferedSender(sender ServerStreamSender) *BufferedSender {
bs := &BufferedSender{
sender: sender,
}
bs.queueMu.buffer = newEventQueue()
bs.notifyDataC = make(chan struct{}, 1)
return bs
}

// SendBuffered buffers the event before sending them to the underlying
// ServerStreamSender.
func (bs *BufferedSender) SendBuffered(
event *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation,
// sendBuffered buffers the event before sending it to the underlying gRPC
// stream. It should not block since errors are sent directly here. sendBuffered
// will take the ownership of the alloc and release it if the returned error is
// non-nil.
func (bs *BufferedSender) sendBuffered(
ev *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation,
) error {
panic("unimplemented: buffered sender for rangefeed #126560")
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
if bs.queueMu.stopped {
return errors.New("stream sender is stopped")
}
// TODO(wenyihu6): pass an actual context here
alloc.Use(context.Background())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are OK with context.Background() here for now as all it is used for is a potential log.Fatal. Eventually, it would be nice to just pass contexts everywhere, but that would be better as a discrete change.

Copy link
Contributor Author

@wenyihu6 wenyihu6 Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg, I will file an issue about this after it's been merged.

bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
select {
case bs.notifyDataC <- struct{}{}:
default:
}
return nil
}

// SendUnbuffered bypasses the buffer and sends the event to the underlying
// ServerStreamSender directly. Note that this can cause event re-ordering.
// Caller is responsible for ensuring that events are sent in order.
func (bs *BufferedSender) SendUnbuffered(
event *kvpb.MuxRangeFeedEvent, alloc *SharedBudgetAllocation,
) error {
panic("unimplemented: buffered sender for rangefeed #126560")
// sendUnbuffered sends the event directly to the underlying ServerStreamSender.
// It bypasses the buffer.
func (bs *BufferedSender) sendUnbuffered(ev *kvpb.MuxRangeFeedEvent) error {
return bs.sender.Send(ev)
}

func (bs *BufferedSender) SendBufferedError(ev *kvpb.MuxRangeFeedEvent) {
// Disconnect stream and cancel context. Then call SendBuffered with the error
// event.
panic("unimplemented: buffered sender for rangefeed #126560")
// run volleys buffered events back to the client. run is expected to be called
// in a goroutine async. BufferedSender will stop forwarding events after run
// completes.
func (bs *BufferedSender) run(
ctx context.Context, stopper *stop.Stopper, onError func(streamID int64),
) error {
for {
select {
case <-ctx.Done():
// Top level goroutine will receive the context cancellation and handle
// ctx.Err().
return nil
case <-stopper.ShouldQuiesce():
// Top level goroutine will receive the stopper quiesce signal and handle
// error.
return nil
case <-bs.notifyDataC:
for {
e, success := bs.popFront()
if success {
err := bs.sender.Send(e.ev)
e.alloc.Release(ctx)
if e.ev.Error != nil {
onError(e.ev.StreamID)
}
if err != nil {
return err
}
} else {
break
}
}
}
}
}

func (bs *BufferedSender) AddStream(streamID int64, cancel context.CancelFunc) {
panic("unimplemented: buffered sender for rangefeed #126560")
// popFront pops the front event from the buffer queue. It returns the event and
// a boolean indicating if the event was successfully popped.
func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
event, ok := bs.queueMu.buffer.popFront()
return event, ok
}

func (bs *BufferedSender) Start(ctx context.Context, stopper *stop.Stopper) error {
panic("unimplemented: buffered sender for rangefeed #126560")
// cleanup is called when the sender is stopped. It is expected to free up
// buffer queue and no new events should be buffered after this.
func (bs *BufferedSender) cleanup(ctx context.Context) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
bs.queueMu.stopped = true
bs.queueMu.buffer.drain(ctx)
}

func (bs *BufferedSender) Stop() {
panic("unimplemented: buffered sender for rangefeed #126560")
// Used for testing only.
func (bs *BufferedSender) len() int {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
return int(bs.queueMu.buffer.len())
}

func (bs *BufferedSender) Error() chan error {
panic("unimplemented: buffered sender for rangefeed #126560")
// Used for testing only.
func (bs *BufferedSender) waitForEmptyBuffer(ctx context.Context) error {
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 50,
}
for re := retry.StartWithCtx(ctx, opts); re.Next(); {
bs.queueMu.Lock()
caughtUp := bs.queueMu.buffer.len() == 0 // nolint:deferunlockcheck
bs.queueMu.Unlock()
if caughtUp {
return nil
}
}
if err := ctx.Err(); err != nil {
return err
}
return errors.New("buffered sender failed to send in time")
}
Loading
Loading