Skip to content

Commit bb50229

Browse files
committed
rangefeed: defer unregistration until stream is removed from manager
Unregistering the registration from the processor has the side-effect of closing the underlying memory budget. We don't want that to happen until all of the events have been cleared from the queue. Here, we pass the unregistration callback up via the Disconnector and then the stream manager calls it when removing the stream. Epic: none Release note: None
1 parent 96930c7 commit bb50229

File tree

6 files changed

+135
-10
lines changed

6 files changed

+135
-10
lines changed

pkg/kv/kvserver/rangefeed/buffered_registration.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,13 @@ func (br *bufferedRegistration) IsDisconnected() bool {
158158
return br.mu.disconnected
159159
}
160160

161+
// Unregister implements Disconnector.
162+
//
163+
// The bufferedRegistration unregisters itself via Disconnect because it is
164+
// responsible for all of its buffered memory and thus there is no reason to
165+
// delay unregistration.
166+
func (br *bufferedRegistration) Unregister() {}
167+
161168
// Disconnect cancels the output loop context for the registration and passes an
162169
// error to the output error stream for the registration.
163170
// Safe to run multiple times, but subsequent errors would be discarded.

pkg/kv/kvserver/rangefeed/processor_test.go

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,18 +1618,112 @@ func TestIntentScannerOnError(t *testing.T) {
16181618
}
16191619
err := s.Start(stopper, erroringScanConstructor)
16201620
require.ErrorContains(t, err, "scanner error")
1621+
}
1622+
1623+
// TestProcessorMemoryAccountingOnError tests that when a
1624+
// buffered sender disconnects because of an error, the memory budget continues
1625+
// to account for any previously buffered events until they are actually sent.
1626+
//
1627+
// Note, this tests the case where the error is a memory overflow, but any error
1628+
// that disconnects our registration could have been used.
1629+
func TestProcessorMemoryAccountingOnError(t *testing.T) {
1630+
defer leaktest.AfterTest(t)()
1631+
1632+
ctx := context.Background()
1633+
stopper := stop.NewStopper()
1634+
defer stopper.Stop(ctx)
1635+
1636+
queueCap := int64(10)
1637+
streamID := int64(1)
1638+
1639+
st := cluster.MakeTestingClusterSettings()
1640+
RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap)
1641+
1642+
fb := newTestBudget(math.MaxInt64)
1643+
testServerStream := newTestServerStream()
1644+
bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics())
1645+
1646+
smMetrics := NewStreamManagerMetrics()
1647+
sm := NewStreamManager(bs, smMetrics)
1648+
require.NoError(t, sm.Start(ctx, stopper))
1649+
defer sm.Stop(ctx)
1650+
1651+
// Create a processor with our budget.
1652+
p, h, pStopper := newTestProcessor(t,
1653+
withBudget(fb),
1654+
withRangefeedTestType(scheduledProcessorWithBufferedSender))
1655+
defer pStopper.Stop(ctx)
1656+
1657+
// Block the sender so the buffer will fill up.
1658+
unblock := testServerStream.BlockSend()
1659+
defer func() {
1660+
if unblock != nil {
1661+
unblock()
1662+
}
1663+
}()
1664+
1665+
startTime := hlc.Timestamp{WallTime: 1}
1666+
sm.RegisteringStream(streamID)
1667+
registered, d, _ := p.Register(
1668+
ctx,
1669+
roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")},
1670+
startTime,
1671+
nil, /* catchUpIter */
1672+
false, /* withDiff */
1673+
false, /* withFiltering */
1674+
false, /* withOmitRemote */
1675+
noBulkDelivery,
1676+
sm.NewStream(streamID, 1 /* rangeID */),
1677+
)
1678+
require.True(t, registered)
1679+
sm.AddStream(streamID, d)
16211680

1622-
// The processor should be stopped eventually.
1623-
p := (s).(*ScheduledProcessor)
1681+
// Overflow the queue.
1682+
for i := range queueCap + 1 {
1683+
v := writeValueOpWithKV(roachpb.Key("k"), hlc.Timestamp{WallTime: startTime.WallTime + i + 1}, []byte("val"))
1684+
require.True(t, p.ConsumeLogicalOps(ctx, v))
1685+
}
1686+
1687+
// Once all events have been sent to the registration, we should be overflowed
1688+
// and disconnection.
1689+
h.syncEventC()
16241690
testutils.SucceedsSoon(t, func() error {
1625-
select {
1626-
case <-p.stoppedC:
1627-
_, ok := sch.shards[shardIndex(p.ID(), len(sch.shards), p.Priority)].procs[p.ID()]
1628-
require.False(t, ok)
1629-
require.False(t, sch.priorityIDs.Contains(p.ID()))
1691+
if d.IsDisconnected() {
16301692
return nil
1631-
default:
1632-
return errors.New("processor not stopped")
16331693
}
1694+
return errors.New("waiting for registration to disconnect")
1695+
})
1696+
1697+
// At this point, the registration should be disconnected but the buffered
1698+
// sender still has events in its queue. Assert that the memory budget still
1699+
// accounts for the memory in that queue.
1700+
//
1701+
// NB: This could be racy if change the structure of the code in the future.
1702+
// Namely, perhaps it isn't zero, now, but perhaps it becomes zero at some
1703+
// time in the future. We try to defend against that here by sending 2 sync
1704+
// events to help ensure we've definitely processed any processor requests.
1705+
//
1706+
// At the time this test was written, this test caught the bug on every run.
1707+
h.syncEventC()
1708+
h.syncEventC()
1709+
1710+
fb.mu.Lock()
1711+
budgetUsed := fb.mu.memBudget.Used()
1712+
fb.mu.Unlock()
1713+
require.Greater(t, budgetUsed, int64(0),
1714+
"memory budget should still account for events in buffered sender after overflow")
1715+
1716+
// Unblocking the sender should drain the queue and free everything from the
1717+
// memory budget.
1718+
unblock()
1719+
unblock = nil
1720+
1721+
testutils.SucceedsSoon(t, func() error {
1722+
fb.mu.Lock()
1723+
defer fb.mu.Unlock()
1724+
if used := fb.mu.memBudget.Used(); used != 0 {
1725+
return errors.Errorf("budget still has %d bytes allocated", used)
1726+
}
1727+
return nil
16341728
})
16351729
}

pkg/kv/kvserver/rangefeed/registry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Disconnector interface {
2929
// Disconnected is a permanent state; once IsDisconnected returns true, it
3030
// always returns true
3131
IsDisconnected() bool
32+
// Unregister is called when an error has finally been delivered to the
33+
// underlying stream.
34+
Unregister()
3235
}
3336

3437
// registration defines an interface for registration that can be added to a

pkg/kv/kvserver/rangefeed/sender_helper_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,5 @@ func (c *cancelCtxDisconnector) IsDisconnected() bool {
208208
defer c.mu.Unlock()
209209
return c.mu.disconnected
210210
}
211+
212+
func (c *cancelCtxDisconnector) Unregister() {}

pkg/kv/kvserver/rangefeed/stream_manager.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func (sm *StreamManager) OnError(streamID int64) {
123123
defer sm.streams.Unlock()
124124
if d, ok := sm.streams.m[streamID]; ok {
125125
assertTrue(d.IsDisconnected(), "OnError called on connected registration")
126+
d.Unregister()
126127
delete(sm.streams.m, streamID)
127128
sm.metrics.ActiveMuxRangeFeed.Dec(1)
128129
}
@@ -163,6 +164,13 @@ func (sm *StreamManager) AddStream(streamID int64, d Disconnector) {
163164
if d.IsDisconnected() {
164165
// If the stream is already disconnected, we don't add it to streams. The
165166
// registration will have already sent an error to the client.
167+
//
168+
// TODO(ssd): Technically this error event might live in the buffer still
169+
// and unregistering now may close the underlying memory budget related to
170+
// that event. At the moment, there isn't a better place to do this however
171+
// because the whole point of IsDisconnected() is that the error event might have
172+
// raced us and already be at the client.
173+
d.Unregister()
166174
return
167175
}
168176
if _, ok := sm.streams.m[streamID]; ok {
@@ -218,6 +226,9 @@ func (sm *StreamManager) Stop(ctx context.Context) {
218226
// sent to the client after shutdown, but the gRPC stream will still
219227
// terminate.
220228
disconnector.Disconnect(rangefeedClosedErr)
229+
// At this point the sender has been cleaned up so any memory allocations it
230+
// had should already be gone.
231+
disconnector.Unregister()
221232
}
222233
sm.streams.m = nil
223234
}

pkg/kv/kvserver/rangefeed/unbuffered_registration.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ func (ubr *unbufferedRegistration) disconnectLocked(pErr *kvpb.Error) {
181181
}
182182
ubr.mu.disconnected = true
183183
ubr.stream.SendError(pErr)
184-
ubr.removeRegFromProcessor(ubr)
184+
// NB: The unbuffered registration does not unregister itself on Disconnect
185+
// because it still has memory in the buffered sender and we do not want to
186+
// free any underlying memory budgets until that has been cleared.
185187
}
186188

187189
// IsDisconnected returns true if the registration is disconnected.
@@ -191,6 +193,12 @@ func (ubr *unbufferedRegistration) IsDisconnected() bool {
191193
return ubr.mu.disconnected
192194
}
193195

196+
// Unregister implements Disconnector.
197+
func (ubr *unbufferedRegistration) Unregister() {
198+
assertTrue(ubr.IsDisconnected(), "connected registration in Unregister")
199+
ubr.removeRegFromProcessor(ubr)
200+
}
201+
194202
// runOutputLoop is run in a goroutine. It is responsible for running the
195203
// catch-up scan, and then publishing any events buffered in catchUpBuf to the
196204
// sender (or discarding catch-up buffer in the case of an error).

0 commit comments

Comments
 (0)