-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvserver/rangefeed: remove per-registration goroutines #125872
base: master
Are you sure you want to change the base?
Conversation
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
ba85154
to
437fa42
Compare
0df767e
to
ecca22c
Compare
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
5a5c6f6
to
45c7231
Compare
2902de8
to
2bbe823
Compare
Previously, UnbufferedSender maintained a list of cancel functions that were canceled when processing an error. The goal of this cancellation was to tear down running resource in the registration. On error, something like the following would happen: UnbufferedSender.SendBufferedError(err) would call cancel. This cancellation would cancel the context inherited by the registration's runOutputLoop, causing it to exit. The registration would then call disconnect, cancelling additional contexts, that also seem to be intended to control the output loop, which is no longer running. Disconnect would then send an error back to UnbufferedSender, which would ignore it to prevent a duplicate sending of errors. There are other error paths, where some of the components describe above, aren't unnecessary, but, hopefully you can see that this is ripe for some refactoring. Here, we take a small step that we believe will also pave the way for a clearer implementation of the BufferredSender. Now, all error paths, start by calling registration.Disconnect(err). The registration, cancels the context controlling the runOutputLoop and sends an error to the stream, which the stream then sends to the client. There are some paths were Disconnect may be called twice still. This simplification is made possible by one bit of indirection: The processor now adds the registration to a map that lives on the stream manager. This map will eventually also store any deferred cleanups that the process requires when that stream is shutting down. Note, we considered passing the registrations cleanup function back up the call stack from register and may still explore that in the future. However, it is not without challenge. Namely, the second we've added the registration to the processor, it is possible that an error is delivered from the processor that the stream manager would not yet know how to handle. We expect further naming and organizational improvements over the coming commits. Epic: none Release note: None Co-authored-by: Wenyi Hu [email protected]
After the output loop completes, 2 cleanup steps are performed: 1) The registration is removed from the processors registry 2) The process is potentially stopped and removed from the replica. Before this change, both of these happened syncronously after the output loop finished. With this change, step (1) happens asyncronously. To facilitate this, an overflow mechanism is provided. This overflow mechanism potentially allocates. Note that we expect that the number of requests is relatively small and should be O(rangefeeds_on_range) so hopefully this mechanism won't be used often. Step (2) is now handled by the processor itself. After processing an unregister request, if the set of registrations falls to zero, we enqueue a Stop event for ourselves. Then, when processing the Stop, we unregister ourselves from the replica. Note that this may look like a small semantics change since the previous unregister callback called Stop() which processes all events. However, note that a Stopped event is processed after all other events, so any events in the queue at the point of processing the unregistration that enqueued the stop will be processed. The motivation for this change is to eventually allow cleanup step (1) to be run as part of registration.disconnect(), which needs to be non-blocking. This is desired for a future change in which there is not a dedicated goroutine to perform this cleanup. Part of: cockroachdb#110432 Release note: note
This patch adds a node level BufferedSender which uses a queue to buffer events before forwarding events to underlying grpc stream. Part of: cockroachdb#110432 Release note: note Co-authored-by: Steven Danna [email protected]
26e037d
to
b5ba0df
Compare
This patch adds unbufferedRegistration. UnbufferedRegistration is like BufferedRegistration but uses BufferedSender to buffer live raft updates instead of a using fixed size channel and having a dedicated per-registration goroutine to volley events to underlying gRPC stream. Instead, there is only one BufferedSender for each incoming node.MuxRangefeed gRPC call. BufferedSender is responsible for buffering and sending its updates to the underlying gRPC stream in a dedicated goroutine O(node). Resolved: cockroachdb#110432 Release note: A new cluster setting `kv.rangefeed.buffered_stream_sender.enabled` can now be used to allow rangefeed to use buffered sender for all rangefeed feeds instead of buffering events separately per client per range. Co-authored-by: Steven Danna [email protected]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a first pass. Even though I understand the high-level picture, there is too much here, especially in the first commit, to really review in depth; entire files have moved and Reviewable is already extremely confused and this would only get worse. I would like to suggest breaking the first commit up into a sequence of PRs that we can then iterate and improve on, which should now be possible since we know how it should all shake out.
While doing so, I would try to look at ways to avoid the "obfuscation" that arises from sharing the sender
interface, and avoid code movement except in commits that don't make any semantic changes. My (perhaps naive) suggestion would be:
- extract the
StreamManager
. (There is no buffered stream yet, so it's used only for the unbuffered stream). (Extract it in place, then move it to new file). We merge that. - Do the disconnector refactor. (If this can be split up further, great).
- now unless I've missed something, all the cleanup has happened and we can introduce the unbuffered sender (i.e. we pick up the sequence of commits you have here). First commit we devise a way to avoid sharing the
sender
interface. Then introduce the interface for the buffered sender. Make sure StreamManager can route it. (buffered sender still unimplemented here, just have a stub). Then replace the stub with the real thing, add tests, etc. - add unbuffered registration.
WDYT?
Reviewed 7 of 24 files at r14, 20 of 20 files at r18, 7 of 7 files at r19, 8 of 8 files at r20, 12 of 12 files at r21, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @wenyihu6)
pkg/kv/kvserver/rangefeed/buffered_sender.go
line 72 at r20 (raw file):
// sendUnbuffered sends the event directly to the underlying ServerStreamSender. // It bypasses the buffer.
But this doesn't seem to be called (PerRangeEventSink.SendUnbuffered
calls it, but this sink wouldn't wrap a buffered sender I think).
So we could drop this method from here and the sender
interface and instead make an interface-widening assertion only in the case of the unbuffered sender.
pkg/kv/kvserver/rangefeed/buffered_sender.go
line 96 at r20 (raw file):
for { e, success := bs.popFront() if success {
nit:
if !success {
break
}
// do the happy path
pkg/kv/kvserver/rangefeed/stream_manager.go
line 21 at r18 (raw file):
// +-----------------+ +-----------------+ +-----------------+ // | | Disconnect | | r.disconnect | | // | MuxRangeFeed +-------------->| SteamManager +----------------->| Registration |
StreamManager
pkg/kv/kvserver/rangefeed/stream_manager.go
line 49 at r18 (raw file):
errCh chan error // Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
Patch up the comment to explain that sender is thread-safe (which I think is what it is now saying). I think it's helpful to also mention the possible implementation (as the comment does now).
pkg/kv/kvserver/rangefeed/stream_manager.go
line 108 at r18 (raw file):
// OnError is a callback that is called when a sender sends a rangefeed // completion error back to the client. Note that we check for the existence of // streamID to avoid metrics inaccuracy when the error is sent before the stream
What happens in this case when the stream is added? Why isn't a stream always added before it can generate errors?
pkg/kv/kvserver/rangefeed/stream_manager.go
line 123 at r18 (raw file):
if err == nil { log.Fatalf(context.Background(), "unexpected: DisconnectStream called with non-nil error")
nil, not non-nil.
pkg/kv/kvserver/rangefeed/unbuffered_sender.go
line 129 at r18 (raw file):
// needs to make sure this is called only with non-nil error events. Important // to be thread-safe. func (ubs *UnbufferedSender) sendBuffered(
it's surprising to me that this method is used only in one specific case and the sibling on BufferedSender
is the main workhorse. I think this is because one interface is used for two different systems: an UnbufferedSender
is used by a PerRangeEventSink
and a BufferedSender
is used by a BufferedPerRangeEventSink
.
I find that keeping legacy impls around tends to greatly obfuscate the code. I am looking at this code:
case *BufferedSender:
return &BufferedPerRangeEventSink{
PerRangeEventSink: NewPerRangeEventSink(rangeID, streamID, sender),
}
case *UnbufferedSender:
return NewPerRangeEventSink(rangeID, streamID, sender)
default:
and I wonder why we need to use the same sender
interface in both. Can't they take different interfaces? Or can we at least make the interface "canonical" for the non-legacy (buffered) case and then just type assert in the legacy case and use the underlying type (a non-buffering sender in the unbuffered case)?
This suggestion is not for this PR. But anything that helps make this code more accessible will help for the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @tbg and @wenyihu6)
pkg/kv/kvserver/rangefeed/stream_manager.go
line 108 at r18 (raw file):
Previously, tbg (Tobias Grieger) wrote…
What happens in this case when the stream is added? Why isn't a stream always added before it can generate errors?
Just as a heads up: This has been one of the main points of back and forth in some past review.
A registration can send us error once it is registered with a processor because of an error at the processor level or from the catch-up scan. This can happen before the Disconnector has been sent back up the stack to us.
Previous iterations solved this by having the registration process register the callback with us by sending something down the stack and then setting the callback at the point of registration construction. No one was particularly happy with the complexity this appeared to add.
In this iteration, we've opted to send this Disconnector back up the stack and then solve the race by checking whether the stream is Disconnected before adding it to the map. Because we only increment stats when we added the stream to the map, we only want to decrement them on streams actually deleted from the map.
TYFTR! I started working with Steven to break down the PRs into smaller chunks. I've already started a series here (#134953, #134954, #134955, #134956, #134957, #134958, #134959). The last one is still a bit large, but I'll see if I can split it further - it will require a bit more surgery. Just wanted to get the ball rolling on the first few ones. Let me know if this is along the lines you were thinking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @tbg)
pkg/kv/kvserver/rangefeed/buffered_sender.go
line 72 at r20 (raw file):
Previously, tbg (Tobias Grieger) wrote…
But this doesn't seem to be called (
PerRangeEventSink.SendUnbuffered
calls it, but this sink wouldn't wrap a buffered sender I think).So we could drop this method from here and the
sender
interface and instead make an interface-widening assertion only in the case of the unbuffered sender.
We can discuss the contracts more in #134957 and see if there is a cleaner way to do things. It is a bit confusing since we allow PerRangeEventSink
to wrap a BufferedSender
PerRangeEventSink: NewPerRangeEventSink(rangeID, streamID, sender), |
PerRangeEventSink
and BufferedPerRangeEventSink
is because we want to have both BufferedStream
and Stream
interfaces to tell p.Register
which registration type to initialize.
pkg/kv/kvserver/rangefeed/buffered_sender.go
line 96 at r20 (raw file):
Previously, tbg (Tobias Grieger) wrote…
nit:
if !success { break } // do the happy path
I will try to remember this when we are adding the BufferedSender PR.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 21 at r18 (raw file):
Previously, tbg (Tobias Grieger) wrote…
StreamManager
Done in
// | MuxRangeFeed +-------------->| StreamManager +----------------->| Registration | |
pkg/kv/kvserver/rangefeed/stream_manager.go
line 49 at r18 (raw file):
Previously, tbg (Tobias Grieger) wrote…
Patch up the comment to explain that sender is thread-safe (which I think is what it is now saying). I think it's helpful to also mention the possible implementation (as the comment does now).
Done in
cockroach/pkg/kv/kvserver/rangefeed/stream_manager.go
Lines 49 to 50 in 39afe4e
// Implemented by UnbufferedSender and BufferedSender. Implementations should | |
// ensure that sendUnbuffered and sendBuffered are thread-safe. |
pkg/kv/kvserver/rangefeed/stream_manager.go
line 123 at r18 (raw file):
Previously, tbg (Tobias Grieger) wrote…
nil, not non-nil.
Oops, done in
"unexpected: DisconnectStream called with nil error") |
pkg/kv/kvserver/rangefeed/unbuffered_sender.go
line 129 at r18 (raw file):
Previously, tbg (Tobias Grieger) wrote…
it's surprising to me that this method is used only in one specific case and the sibling on
BufferedSender
is the main workhorse. I think this is because one interface is used for two different systems: anUnbufferedSender
is used by aPerRangeEventSink
and aBufferedSender
is used by aBufferedPerRangeEventSink
.I find that keeping legacy impls around tends to greatly obfuscate the code. I am looking at this code:
case *BufferedSender: return &BufferedPerRangeEventSink{ PerRangeEventSink: NewPerRangeEventSink(rangeID, streamID, sender), } case *UnbufferedSender: return NewPerRangeEventSink(rangeID, streamID, sender) default:
and I wonder why we need to use the same
sender
interface in both. Can't they take different interfaces? Or can we at least make the interface "canonical" for the non-legacy (buffered) case and then just type assert in the legacy case and use the underlying type (a non-buffering sender in the unbuffered case)?This suggestion is not for this PR. But anything that helps make this code more accessible will help for the future.
We can discuss this more in #134957.
The goal of this PR is to remove the per registration goroutines that move
data from per registration buffers to the client and replace it with a per mux rangefeed goroutine
managing a per mux rangefeed buffer.
Since registrations are O(ranges) and since for large scale users, rangefeeds typically cover many
ranges on a node, this should reduce the number of goroutines needed to run a rangefeed in caught up mode.
In catch-up mode, the per-registration goroutine and buffered is still used.
This per-mux-rangefeed goroutine and buffer is managed by the new BufferedSender component.
Epic: None
Known follow-ups: