-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvserver/rangefeed: introduce StreamManager #134958
base: master
Are you sure you want to change the base?
Conversation
1879446
to
16b6eb5
Compare
d289433
to
6138a54
Compare
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
7a5bdda
to
39afe4e
Compare
I added more tests for StreamManager in #135075. I found it a bit challenging to test StreamManager without a real UnbufferedSender implementation. Creating a mock sender started to feel like reinventing the wheel of an UnbufferedSender. That said, we can add more simple tests for StreamManager without relying on actual senders. Wdyt? |
re: https://reviewable.io/reviews/cockroachdb/cockroach/125872#-OBPhJgK60IsNNQ1MGIz This is a good point, and we have also debated the design here quite a bit. If we want to ensure stream is always added before errors occur, we could either: 1) pass the stream manager as part of the cockroach/pkg/kv/kvserver/rangefeed/stream_manager.go Lines 137 to 140 in 39afe4e
cockroach/pkg/kv/kvserver/rangefeed/stream_manager_test.go Lines 217 to 231 in 763cff3
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 4 files at r1, 2 of 2 files at r2, all commit messages.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @wenyihu6)
pkg/kv/kvserver/rangefeed/stream_manager.go
line 76 at r2 (raw file):
// run is the main loop for the sender. It is expected to run in the // background until a node level error is encountered which would shut down // all streams in StreamManager.
speaking from the point of view of this being an interface, is run()
supposed to launch a goroutine and then return? And should the lifetime of that goroutine be governed by the lifetime of ctx
? Or is it supposed to die with cleanup
?
Does run
even need to be in the interface? IMO it's a bit of an oddity to have a "launcher" method. Could whatever gives us the sender (NewUnbufferedSender()
or whatever) kick it off instead?
A similar comment could be made for cleanup
. Can the sender clean itself up when it is tearing down its run
? In other words, can we start and stop the sender before making the surrounding StreamManager?
Btw I'm happy for all of this to be revisited down the road. These are not blocking comments.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 83 at r2 (raw file):
// sendIsThreadSafe is a no-op declaration method. It is a contract that // sendUnbuffered and sendBuffered should be thread-safe. sendIsThreadSafe()
I've never seen a marker method like this before. Does this add anything over adding comments to send{UnB,unb}uffered
?
pkg/kv/kvserver/rangefeed/stream_manager.go
line 113 at r2 (raw file):
} // DisconnectStream disconnects the stream with the given streamID.
Can you add some words here that explain who calls this (the mux, right?) and why? And why we are leaving the stream in the map. Is the idea that when we call Disconnect
the registration will bubble up an error and we get a subsequent call to OnError
which then will remove the stream?
pkg/kv/kvserver/rangefeed/stream_manager.go
line 193 at r2 (raw file):
sm.metrics.UpdateMetricsOnRangefeedDisconnect() } sm.streams.m = make(map[int64]Disconnector)
I assume you're worried about someone calling AddStream
after Stop
? Can we make it a part of the contract that this does not happen?
pkg/kv/kvserver/rangefeed/stream_manager.go
line 198 at r2 (raw file):
// Error returns a channel that can be used to receive errors from sender.run. // Only non-nil errors are sent on this channel. func (sm *StreamManager) Error() chan error {
I think you want to return <-chan error
(caller can't write to it).
Also, I think only one error is ever received, and only on true error (nothing may ever be sent on this channel). It helps to be specific about the semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @tbg and @wenyihu6)
pkg/kv/kvserver/rangefeed/stream_manager.go
line 76 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
speaking from the point of view of this being an interface, is
run()
supposed to launch a goroutine and then return? And should the lifetime of that goroutine be governed by the lifetime ofctx
? Or is it supposed to die withcleanup
?Does
run
even need to be in the interface? IMO it's a bit of an oddity to have a "launcher" method. Could whatever gives us the sender (NewUnbufferedSender()
or whatever) kick it off instead?A similar comment could be made for
cleanup
. Can the sender clean itself up when it is tearing down itsrun
? In other words, can we start and stop the sender before making the surrounding StreamManager?Btw I'm happy for all of this to be revisited down the road. These are not blocking comments.
👍 Let's definitely follow up here.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 83 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
I've never seen a marker method like this before. Does this add anything over adding comments to
send{UnB,unb}uffered
?
I think this has gotten copy/pasta'd around between a few implementations. I suppose one benefit is that when making a new implementation, you have to type out this method name and perhaps then you think about whether your Send method is in fact thread safe.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 113 at r2 (raw file):
And why we are leaving the stream in the map. Is the idea that when we call
Disconnect
the registration will bubble up an error and we get a subsequent call toOnError
which then will remove the stream?
Yes. We could remove it here as well without much harm if we wanted some defense in depth against the registration somehow never sending us the event.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 143 at r2 (raw file):
} if _, ok := sm.streams.m[streamID]; ok { log.Fatalf(context.Background(), "stream %d already exists", streamID)
I kinda wonder if some of these are really halt-the-process level bugs vs having this return an error and changing this to an AssertionFailedf.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 163 at r2 (raw file):
sm.wg.Add(1) ctx, sm.taskCancel = context.WithCancel(ctx) if err := stopper.RunAsyncTask(ctx, "buffered stream output", func(ctx context.Context) {
s/buffered stream out/stream manager SOMETHING/
But, this goes to Tobi's point that perhaps the sender should be responsible for the actual launching of this task.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 190 at r2 (raw file):
// terminate. disconnector.Disconnect( kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)))
One would hope the compiler would do this for you, but we could move this error construction out of the loop.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 191 at r2 (raw file):
disconnector.Disconnect( kvpb.NewError(kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))) sm.metrics.UpdateMetricsOnRangefeedDisconnect()
If we gave this an argument, we could call it once with len(sm.streams.m)
d9968ef
to
dfae2eb
Compare
This patch updates RangefeedMetricsRecorder to include UpdateMetricsOnRangefeedDisconnectBy, which decrements rangefeed metrics by the given argument. While the benefits remain unclear, future commits will use this method to clean up metrics during multiple rangefeed disconnects. Epic: none Release note: none
This patch introduces StreamManager, which should be created and persisted for the lifetime of each node.MuxRangefeed call. It will handle starting and stopping the underlying wrapped sender, as well as managing individual streams/rangefeeds. While currently unused, future commits will refactor the unbuffered sender to use it. Part of: cockroachdb#110432 Release note: none Co-authored-by: Steven Danna [email protected]
dfae2eb
to
b3d4560
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @stevendanna and @tbg)
pkg/kv/kvserver/rangefeed/stream_manager.go
line 76 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
👍 Let's definitely follow up here.
Agreed, I recall Steven has mentioned something similar before. Definitely looks like something we should revisit - filed an issue to track these targeted follow ups #135332.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 83 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I think this has gotten copy/pasta'd around between a few implementations. I suppose one benefit is that when making a new implementation, you have to type out this method name and perhaps then you think about whether your Send method is in fact thread safe.
I added them when I was reading this review https://reviewable.io/reviews/cockroachdb/cockroach/125872#-OBPgnLn2srEopeTC53V and thought this method would give us a nice comment/place to discuss thread-safety for the implementations. I've removed this from the interface and added two comments for the send{UnB,B} instead.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 113 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
And why we are leaving the stream in the map. Is the idea that when we call
Disconnect
the registration will bubble up an error and we get a subsequent call toOnError
which then will remove the stream?Yes. We could remove it here as well without much harm if we wanted some defense in depth against the registration somehow never sending us the event.
+1, we're relying on the r.Disconnect
method to handle de-dup and ensure this is idempotent. It checks whether r.mu.disconnected is set.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 143 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I kinda wonder if some of these are really halt-the-process level bugs vs having this return an error and changing this to an AssertionFailedf.
Agreed, we have a few log.Fatal here which may worth a second though - tracked here #135332 as well.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 163 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
s/buffered stream out/stream manager SOMETHING/
But, this goes to Tobi's point that perhaps the sender should be responsible for the actual launching of this task.
Renamed to stream-manager-sender
. I will address the second part of the comment here #135332.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 190 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
One would hope the compiler would do this for you, but we could move this error construction out of the loop.
Good point, done.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 191 at r2 (raw file):
Previously, stevendanna (Steven Danna) wrote…
If we gave this an argument, we could call it once with len(sm.streams.m)
Done.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 193 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
I assume you're worried about someone calling
AddStream
afterStop
? Can we make it a part of the contract that this does not happen?
It should be impossible to call AddStream
or DisconnectStream
after Stop
, as this could lead to incomplete cleanup of metrics and rangefeeds. Stop
is only called during the defer of node.MuxRangefeed
. Once Stop
is called, no additional calls to AddStream
or DisconnectStream
should occur. https://github.com/wenyihu6/cockroach/blob/6db61edc4668f63a396ff1e9e7cd001e20b13c28/pkg/server/node.go#L2164 I only cleaned up the map because it seemed like the right thing to do, ensuring the map is cleaned up after the rangefeeds have been disconnected.
pkg/kv/kvserver/rangefeed/stream_manager.go
line 198 at r2 (raw file):
Previously, tbg (Tobias Grieger) wrote…
I think you want to return
<-chan error
(caller can't write to it).
Also, I think only one error is ever received, and only on true error (nothing may ever be sent on this channel). It helps to be specific about the semantics.
Clarified the semantics. How does the comment look now?
This patch introduces StreamManager, which should be created and persisted for
the lifetime of each node.MuxRangefeed call. It will handle starting and
stopping the underlying wrapped sender, as well as managing individual
streams/rangefeeds. While currently unused, future commits will refactor
the unbuffered sender to use it.
Part of: #110432
Release note: none
Co-authored-by: Steven Danna [email protected]