Skip to content

Commit

Permalink
Merge #131113
Browse files Browse the repository at this point in the history
131113: storeliveness: check clock to determine support from r=nvanbenschoten a=miraradeva

Previously, in response to `SupportFrom` calls, the SupportManager could return `true` (support is provided) with an expiration timestamp in the past. It was the caller's responsibility to compare the returned timestamp to its clock. This logic is error-prone because the caller may only consider the boolean and conclude support is provided.

This commit moves the comparison to the current clock time to the `SupportFrom` implementations.

Part of: #125063

Release note: None

Co-authored-by: Mira Radeva <[email protected]>
  • Loading branch information
craig[bot] and miraradeva committed Sep 26, 2024
2 parents bec01ef + 72e0001 commit 343bd46
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 99 deletions.
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,13 @@ func (r *replicaRLockedStoreLiveness) SupportFor(replicaID raftpb.PeerID) (raftp
// SupportFrom implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportFrom(
replicaID raftpb.PeerID,
) (raftpb.Epoch, hlc.Timestamp, bool) {
) (raftpb.Epoch, hlc.Timestamp) {
storeID, ok := r.getStoreIdent(replicaID)
if !ok {
return 0, hlc.Timestamp{}, false
return 0, hlc.Timestamp{}
}
epoch, exp, ok := r.store.storeLiveness.SupportFrom(storeID)
if !ok {
return 0, hlc.Timestamp{}, false
}
return raftpb.Epoch(epoch), exp, true
epoch, exp := r.store.storeLiveness.SupportFrom(storeID)
return raftpb.Epoch(epoch), exp
}

// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface.
Expand Down Expand Up @@ -126,5 +123,7 @@ func raftFortificationEnabledForRangeID(fracEnabled float64, rangeID roachpb.Ran

// SupportExpired implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportExpired(ts hlc.Timestamp) bool {
return ts.Less(r.store.Clock().Now())
// A support expiration timestamp equal to the current time is considered
// expired, to be consistent with support withdrawal in Store Liveness.
return ts.LessEq(r.store.Clock().Now())
}
22 changes: 9 additions & 13 deletions pkg/kv/kvserver/storeliveness/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ type Fabric interface {
// SupportFor returns the epoch of the current uninterrupted period of Store
// Liveness support from the local store (S_local) for the store (S_remote)
// corresponding to the specified id, and a boolean indicating whether S_local
// is currently supporting S_remote.
//
// If S_local is not currently supporting S_remote, the epoch will be 0 and
// the boolean will be false.
// supports S_remote. The epoch is 0 if and only if the boolean is false.
//
// S_remote may not be aware of the full extent of support from S_local, as
// Store Liveness heartbeat response messages may be lost or delayed. However,
Expand All @@ -37,21 +34,20 @@ type Fabric interface {

// SupportFrom returns the epoch of the current uninterrupted period of Store
// Liveness support for the local store (S_local) from the store (S_remote)
// corresponding to the specified id, the timestamp until which the support is
// provided (an expiration), and a boolean indicating whether S_local is
// currently supported by S_remote.
// corresponding to the specified id, and the timestamp until which the
// support is provided (an expiration). The epoch is 0 if and only if the
// timestamp is the zero timestamp.
//
// If S_local is not currently supported by S_remote, the epoch will be 0, the
// timestamp will be the zero timestamp, and the boolean will be false.
// It is the caller's responsibility to infer whether support has expired, by
// comparing the returned timestamp to its local clock.
//
// S_local may not be aware of the full extent of support from S_remote, as
// Store Liveness heartbeat response messages may be lost or delayed. However,
// S_remote will never be unaware of support it is providing.
//
// If S_local is unaware of the remote store S_remote, false will be returned,
// and S_local will initiate a heartbeat loop to S_remote in order to
// request support so that future calls to SupportFrom may succeed.
SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp, bool)
// If S_local is unaware of the remote store S_remote, it will initiate a
// heartbeat loop to S_remote in order to request support.
SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp)

// SupportFromEnabled determines if Store Liveness requests support from
// other stores. If it returns true, then Store Liveness is sending
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/kvserver/storeliveness/store_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,13 @@ func TestStoreLiveness(t *testing.T) {

case "support-from":
remoteID := parseStoreID(t, d, "node-id", "store-id")
epoch, timestamp, supported := sm.SupportFrom(remoteID)
return fmt.Sprintf(
"epoch: %+v, expiration: %+v, support provided: %v",
epoch, timestamp, supported,
)
epoch, timestamp := sm.SupportFrom(remoteID)
return fmt.Sprintf("epoch: %+v, expiration: %+v", epoch, timestamp)

case "support-for":
remoteID := parseStoreID(t, d, "node-id", "store-id")
epoch, supported := sm.SupportFor(remoteID)
return fmt.Sprintf(
"epoch: %+v, support provided: %v",
epoch, supported,
)
return fmt.Sprintf("epoch: %+v, support provided: %v", epoch, supported)

case "send-heartbeats":
now := parseTimestamp(t, d, "now")
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (sm *SupportManager) SupportFor(id slpb.StoreIdent) (slpb.Epoch, bool) {

// SupportFrom implements the Fabric interface. It delegates the response to the
// SupportManager's supporterStateHandler.
func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp, bool) {
func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp) {
ss, ok := sm.requesterStateHandler.getSupportFrom(id)
if !ok {
// If this is the first time SupportFrom has been called for this store,
Expand All @@ -119,13 +119,13 @@ func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Times
context.Background(), 2,
"store %+v enqueued to add remote store %+v", sm.storeID, id,
)
return 0, hlc.Timestamp{}, false
return 0, hlc.Timestamp{}
}
// An empty expiration implies support has expired.
if ss.Expiration.IsEmpty() {
return 0, hlc.Timestamp{}, false
return 0, hlc.Timestamp{}
}
return ss.Epoch, ss.Expiration, true
return ss.Epoch, ss.Expiration
}

// SupportFromEnabled implements the Fabric interface and determines if Store
Expand Down
24 changes: 13 additions & 11 deletions pkg/kv/kvserver/storeliveness/support_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
require.NoError(t, sm.Start(ctx))

// Start sending heartbeats to the remote store by calling SupportFrom.
epoch, expiration, supported := sm.SupportFrom(remoteStore)
epoch, expiration := sm.SupportFrom(remoteStore)
require.Equal(t, slpb.Epoch(0), epoch)
require.Equal(t, hlc.Timestamp{}, expiration)
require.False(t, supported)

// Ensure heartbeats are sent.
msgs := ensureHeartbeats(t, sender, 10)
Expand All @@ -84,13 +83,12 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
// Ensure support is provided as seen by SupportFrom.
testutils.SucceedsSoon(
t, func() error {
epoch, expiration, supported = sm.SupportFrom(remoteStore)
if !supported {
epoch, expiration = sm.SupportFrom(remoteStore)
if expiration.IsEmpty() {
return errors.New("support not provided yet")
}
require.Equal(t, slpb.Epoch(1), epoch)
require.Equal(t, requestedExpiration, expiration)
require.True(t, supported)
return nil
},
)
Expand All @@ -114,13 +112,16 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
sm := NewSupportManager(store, engine, options, settings, stopper, clock, sender)
require.NoError(t, sm.Start(ctx))

// Pause the clock so support is not withdrawn before calling SupportFor.
manual.Pause()

// Process a heartbeat from the remote store.
heartbeat := &slpb.Message{
Type: slpb.MsgHeartbeat,
From: remoteStore,
To: sm.storeID,
Epoch: slpb.Epoch(1),
Expiration: sm.clock.Now().AddDuration(time.Second),
Expiration: sm.clock.Now().AddDuration(options.LivenessInterval),
}
require.NoError(t, sm.HandleMessage(heartbeat))

Expand All @@ -145,6 +146,9 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
require.Equal(t, slpb.Epoch(1), epoch)
require.True(t, supported)

// Resume the clock, so support can be withdrawn.
manual.Resume()

// Wait for support to be withdrawn.
testutils.SucceedsSoon(
t, func() error {
Expand Down Expand Up @@ -178,8 +182,7 @@ func TestSupportManagerEnableDisable(t *testing.T) {
require.NoError(t, sm.Start(ctx))

// Start sending heartbeats by calling SupportFrom.
_, _, supported := sm.SupportFrom(remoteStore)
require.False(t, supported)
sm.SupportFrom(remoteStore)
ensureHeartbeats(t, sender, 10)

// Disable Store Liveness and make sure heartbeats stop.
Expand Down Expand Up @@ -314,11 +317,10 @@ func TestSupportManagerDiskStall(t *testing.T) {
ensureNoHeartbeats(t, sender, sm.options.HeartbeatInterval, 0)

// SupportFrom and SupportFor calls are still being answered.
epoch, _, supported := sm.SupportFrom(remoteStore)
epoch, _ := sm.SupportFrom(remoteStore)
require.Equal(t, slpb.Epoch(1), epoch)
require.True(t, supported)

epoch, supported = sm.SupportFor(remoteStore)
epoch, supported := sm.SupportFor(remoteStore)
require.Equal(t, slpb.Epoch(1), epoch)
require.True(t, supported)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/storeliveness/testdata/basic
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

send-heartbeats now=100 liveness-interval=10s
----
Expand All @@ -24,7 +24,7 @@ responses:

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 110.000000000,0, support provided: true
epoch: 1, expiration: 110.000000000,0

support-for node-id=2 store-id=2
----
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/storeliveness/testdata/liveness_interval
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) requests and receives support with
Expand All @@ -23,7 +23,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 110.000000000,0, support provided: true
epoch: 1, expiration: 110.000000000,0


# -------------------------------------------------------------
Expand All @@ -42,7 +42,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 121.000000000,0, support provided: true
epoch: 1, expiration: 121.000000000,0


# -------------------------------------------------------------
Expand All @@ -61,4 +61,4 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 121.000000000,0, support provided: true
epoch: 1, expiration: 121.000000000,0
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/storeliveness/testdata/multi-store
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

support-from node-id=1 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

support-from node-id=2 store-id=3
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

support-from node-id=2 store-id=4
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

send-heartbeats now=100 liveness-interval=10s
----
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/storeliveness/testdata/requester_state
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) successfully establishes support.
Expand All @@ -22,7 +22,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 110.000000000,0, support provided: true
epoch: 1, expiration: 110.000000000,0

debug-requester-state
----
Expand All @@ -47,7 +47,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 210.000000000,0, support provided: true
epoch: 1, expiration: 210.000000000,0


# -------------------------------------------------------------
Expand All @@ -65,7 +65,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

debug-requester-state
----
Expand All @@ -90,7 +90,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0


# -------------------------------------------------------------
Expand All @@ -103,27 +103,27 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

handle-messages
msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=0
----

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

handle-messages
msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=2 expiration=400
----

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0


# -------------------------------------------------------------
# Store (n1, s1) requests support but receives to response.
# Store (n1, s1) requests support but receives no response.
# -------------------------------------------------------------

send-heartbeats now=500 liveness-interval=10s
Expand All @@ -133,7 +133,7 @@ heartbeats:

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

debug-requester-state
----
Expand Down Expand Up @@ -183,7 +183,7 @@ heartbeats:

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

send-heartbeats now=700 liveness-interval=10s
----
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/storeliveness/testdata/restart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) establishes support for and from (n2, s2).
Expand Down Expand Up @@ -129,7 +129,7 @@ heartbeats:

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) sends heartbeats with an incremented epoch.
Expand Down
Loading

0 comments on commit 343bd46

Please sign in to comment.