Skip to content

Commit

Permalink
Merge #134957
Browse files Browse the repository at this point in the history
134957: kvserver/rangefeed: wrap PerRangeEventSink inside BufferedPerRangeEventSink r=stevendanna,tbg a=wenyihu6

This patch simplifies the BufferedStream and Stream interfaces by wrapping
*PerRangeEventSink inside BufferedPerRangeEventSink.

Epic: none
Release note: none

Co-authored-by: Steven Danna [email protected]

Co-authored-by: Wenyi Hu <[email protected]>
  • Loading branch information
craig[bot] and wenyihu6 committed Nov 15, 2024
2 parents 20463ae + 4f1c256 commit d06e762
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 104 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
"budget.go",
"buffered_registration.go",
"buffered_sender.go",
"buffered_stream.go",
"catchup_scan.go",
"event_queue.go",
"event_size.go",
Expand Down
97 changes: 0 additions & 97 deletions pkg/kv/kvserver/rangefeed/buffered_stream.go

This file was deleted.

45 changes: 41 additions & 4 deletions pkg/kv/kvserver/rangefeed/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ type Stream interface {
SendError(err *kvpb.Error)
}

// BufferedStream is a Stream that can buffer events before sending them to the
// underlying Stream. Note that the caller may still choose to bypass the buffer
// and send to the underlying Stream directly by calling Send directly. Doing so
// can cause event re-ordering. Caller is responsible for ensuring that events
// are sent in order.
type BufferedStream interface {
Stream
// SendBuffered buffers the event before sending it to the underlying Stream.
// It should not block if ev.Error != nil.
SendBuffered(*kvpb.RangeFeedEvent, *SharedBudgetAllocation) error
}

// PerRangeEventSink is an implementation of Stream which annotates each
// response with rangeID and streamID. It is used by MuxRangeFeed.
type PerRangeEventSink struct {
Expand All @@ -47,6 +59,8 @@ var _ Stream = (*PerRangeEventSink)(nil)
// UnbufferedSender.SendUnbuffered is thread-safe.
func (s *PerRangeEventSink) SendUnbufferedIsThreadSafe() {}

// SendUnbuffered implements the Stream interface. It sends a RangeFeedEvent to
// the underlying grpc stream directly.
func (s *PerRangeEventSink) SendUnbuffered(event *kvpb.RangeFeedEvent) error {
response := &kvpb.MuxRangeFeedEvent{
RangeFeedEvent: *event,
Expand All @@ -56,10 +70,8 @@ func (s *PerRangeEventSink) SendUnbuffered(event *kvpb.RangeFeedEvent) error {
return s.wrapped.SendUnbuffered(response)
}

// SendError implements the Stream interface. It requests the UnbufferedSender
// to detach the stream. The UnbufferedSender is then responsible for handling
// the actual disconnection and additional cleanup. Note that Caller should not
// rely on immediate disconnection as cleanup takes place async.
// SendError implements the Stream interface. It sends an error to the stream.
// It should not block.
func (s *PerRangeEventSink) SendError(err *kvpb.Error) {
ev := &kvpb.MuxRangeFeedEvent{
RangeID: s.rangeID,
Expand All @@ -86,3 +98,28 @@ func transformRangefeedErrToClientError(err *kvpb.Error) *kvpb.Error {
}
return err
}

// BufferedPerRangeEventSink is an implementation of BufferedStream which is
// similar to PerRangeEventSink but buffers events in BufferedSender before
// forwarding events to the underlying grpc stream.
type BufferedPerRangeEventSink struct {
*PerRangeEventSink
}

var _ kvpb.RangeFeedEventSink = (*BufferedPerRangeEventSink)(nil)
var _ Stream = (*BufferedPerRangeEventSink)(nil)
var _ BufferedStream = (*BufferedPerRangeEventSink)(nil)

// SendBuffered buffers the event in BufferedSender and transfers the ownership
// of SharedBudgetAllocation to BufferedSender. BufferedSender is responsible
// for properly using and releasing it when an error occurs or when the event is
// sent. The event is guaranteed to be sent unless BufferedSender terminates
// before sending (such as due to broken grpc stream).
//
// If the function returns an error, it is safe to disconnect the stream and
// assume that all future SendBuffered on this stream will return an error.
func (s *BufferedPerRangeEventSink) SendBuffered(
event *kvpb.RangeFeedEvent, alloc *SharedBudgetAllocation,
) error {
panic("unimplemented: buffered sender for rangefeed #126560")
}
4 changes: 2 additions & 2 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2208,8 +2208,8 @@ func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {
var streamSink rangefeed.Stream
if ubs, ok := sm.(*rangefeed.UnbufferedSender); ok {
streamSink = rangefeed.NewPerRangeEventSink(req.RangeID, req.StreamID, ubs)
} else if bs, ok := sm.(*rangefeed.BufferedSender); ok {
streamSink = rangefeed.NewBufferedPerRangeEventSink(req.RangeID, req.StreamID, bs)
} else if _, ok := sm.(*rangefeed.BufferedSender); ok {
log.Fatalf(streamCtx, "unimplemented: buffered sender for rangefeed #126560")
} else {
log.Fatalf(streamCtx, "unknown sender type %T", sm)
}
Expand Down

0 comments on commit d06e762

Please sign in to comment.