Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storeliveness: check clock to determine support from #131113

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,13 @@ func (r *replicaRLockedStoreLiveness) SupportFor(replicaID raftpb.PeerID) (raftp
// SupportFrom implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportFrom(
replicaID raftpb.PeerID,
) (raftpb.Epoch, hlc.Timestamp, bool) {
) (raftpb.Epoch, hlc.Timestamp) {
storeID, ok := r.getStoreIdent(replicaID)
if !ok {
return 0, hlc.Timestamp{}, false
return 0, hlc.Timestamp{}
}
epoch, exp, ok := r.store.storeLiveness.SupportFrom(storeID)
if !ok {
return 0, hlc.Timestamp{}, false
}
return raftpb.Epoch(epoch), exp, true
epoch, exp := r.store.storeLiveness.SupportFrom(storeID)
return raftpb.Epoch(epoch), exp
}

// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface.
Expand Down Expand Up @@ -124,5 +121,7 @@ func raftFortificationEnabledForRangeID(fracEnabled float64, rangeID roachpb.Ran

// SupportExpired implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportExpired(ts hlc.Timestamp) bool {
return ts.Less(r.store.Clock().Now())
// A support expiration timestamp equal to the current time is considered
// expired, to be consistent with support withdrawal in Store Liveness.
return ts.LessEq(r.store.Clock().Now())
}
22 changes: 9 additions & 13 deletions pkg/kv/kvserver/storeliveness/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ type Fabric interface {
// SupportFor returns the epoch of the current uninterrupted period of Store
// Liveness support from the local store (S_local) for the store (S_remote)
// corresponding to the specified id, and a boolean indicating whether S_local
// is currently supporting S_remote.
//
// If S_local is not currently supporting S_remote, the epoch will be 0 and
// the boolean will be false.
// supports S_remote. The epoch is 0 if and only if the boolean is false.
//
// S_remote may not be aware of the full extent of support from S_local, as
// Store Liveness heartbeat response messages may be lost or delayed. However,
Expand All @@ -37,21 +34,20 @@ type Fabric interface {

// SupportFrom returns the epoch of the current uninterrupted period of Store
// Liveness support for the local store (S_local) from the store (S_remote)
// corresponding to the specified id, the timestamp until which the support is
// provided (an expiration), and a boolean indicating whether S_local is
// currently supported by S_remote.
// corresponding to the specified id, and the timestamp until which the
// support is provided (an expiration). The epoch is 0 if and only if the
// timestamp is the zero timestamp.
//
// If S_local is not currently supported by S_remote, the epoch will be 0, the
// timestamp will be the zero timestamp, and the boolean will be false.
// It is the caller's responsibility to infer whether support has expired, by
// comparing the returned timestamp to its local clock.
//
// S_local may not be aware of the full extent of support from S_remote, as
// Store Liveness heartbeat response messages may be lost or delayed. However,
// S_remote will never be unaware of support it is providing.
//
// If S_local is unaware of the remote store S_remote, false will be returned,
// and S_local will initiate a heartbeat loop to S_remote in order to
// request support so that future calls to SupportFrom may succeed.
SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp, bool)
// If S_local is unaware of the remote store S_remote, it will initiate a
// heartbeat loop to S_remote in order to request support.
SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp)

// SupportFromEnabled determines if Store Liveness requests support from
// other stores. If it returns true, then Store Liveness is sending
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/kvserver/storeliveness/store_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,13 @@ func TestStoreLiveness(t *testing.T) {

case "support-from":
remoteID := parseStoreID(t, d, "node-id", "store-id")
epoch, timestamp, supported := sm.SupportFrom(remoteID)
return fmt.Sprintf(
"epoch: %+v, expiration: %+v, support provided: %v",
epoch, timestamp, supported,
)
epoch, timestamp := sm.SupportFrom(remoteID)
return fmt.Sprintf("epoch: %+v, expiration: %+v", epoch, timestamp)

case "support-for":
remoteID := parseStoreID(t, d, "node-id", "store-id")
epoch, supported := sm.SupportFor(remoteID)
return fmt.Sprintf(
"epoch: %+v, support provided: %v",
epoch, supported,
)
return fmt.Sprintf("epoch: %+v, support provided: %v", epoch, supported)

case "send-heartbeats":
now := parseTimestamp(t, d, "now")
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (sm *SupportManager) SupportFor(id slpb.StoreIdent) (slpb.Epoch, bool) {

// SupportFrom implements the Fabric interface. It delegates the response to the
// SupportManager's supporterStateHandler.
func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp, bool) {
func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Timestamp) {
ss, ok := sm.requesterStateHandler.getSupportFrom(id)
if !ok {
// If this is the first time SupportFrom has been called for this store,
Expand All @@ -118,13 +118,13 @@ func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Times
context.Background(), 2,
"store %+v enqueued to add remote store %+v", sm.storeID, id,
)
return 0, hlc.Timestamp{}, false
return 0, hlc.Timestamp{}
}
// An empty expiration implies support has expired.
if ss.Expiration.IsEmpty() {
return 0, hlc.Timestamp{}, false
return 0, hlc.Timestamp{}
}
return ss.Epoch, ss.Expiration, true
return ss.Epoch, ss.Expiration
}

// SupportFromEnabled implements the Fabric interface and determines if Store
Expand Down
24 changes: 13 additions & 11 deletions pkg/kv/kvserver/storeliveness/support_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
require.NoError(t, sm.Start(ctx))

// Start sending heartbeats to the remote store by calling SupportFrom.
epoch, expiration, supported := sm.SupportFrom(remoteStore)
epoch, expiration := sm.SupportFrom(remoteStore)
require.Equal(t, slpb.Epoch(0), epoch)
require.Equal(t, hlc.Timestamp{}, expiration)
require.False(t, supported)

// Ensure heartbeats are sent.
msgs := ensureHeartbeats(t, sender, 10)
Expand All @@ -84,13 +83,12 @@ func TestSupportManagerRequestsSupport(t *testing.T) {
// Ensure support is provided as seen by SupportFrom.
testutils.SucceedsSoon(
t, func() error {
epoch, expiration, supported = sm.SupportFrom(remoteStore)
if !supported {
epoch, expiration = sm.SupportFrom(remoteStore)
if expiration.IsEmpty() {
return errors.New("support not provided yet")
}
require.Equal(t, slpb.Epoch(1), epoch)
require.Equal(t, requestedExpiration, expiration)
require.True(t, supported)
return nil
},
)
Expand All @@ -114,13 +112,16 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
sm := NewSupportManager(store, engine, options, settings, stopper, clock, sender)
require.NoError(t, sm.Start(ctx))

// Pause the clock so support is not withdrawn before calling SupportFor.
manual.Pause()

// Process a heartbeat from the remote store.
heartbeat := &slpb.Message{
Type: slpb.MsgHeartbeat,
From: remoteStore,
To: sm.storeID,
Epoch: slpb.Epoch(1),
Expiration: sm.clock.Now().AddDuration(time.Second),
Expiration: sm.clock.Now().AddDuration(options.LivenessInterval),
}
sm.HandleMessage(heartbeat)

Expand All @@ -145,6 +146,9 @@ func TestSupportManagerProvidesSupport(t *testing.T) {
require.Equal(t, slpb.Epoch(1), epoch)
require.True(t, supported)

// Resume the clock, so support can be withdrawn.
manual.Resume()

// Wait for support to be withdrawn.
testutils.SucceedsSoon(
t, func() error {
Expand Down Expand Up @@ -178,8 +182,7 @@ func TestSupportManagerEnableDisable(t *testing.T) {
require.NoError(t, sm.Start(ctx))

// Start sending heartbeats by calling SupportFrom.
_, _, supported := sm.SupportFrom(remoteStore)
require.False(t, supported)
sm.SupportFrom(remoteStore)
ensureHeartbeats(t, sender, 10)

// Disable Store Liveness and make sure heartbeats stop.
Expand Down Expand Up @@ -312,11 +315,10 @@ func TestSupportManagerDiskStall(t *testing.T) {
ensureNoHeartbeats(t, sender, sm.options.HeartbeatInterval, 0)

// SupportFrom and SupportFor calls are still being answered.
epoch, _, supported := sm.SupportFrom(remoteStore)
epoch, _ := sm.SupportFrom(remoteStore)
require.Equal(t, slpb.Epoch(1), epoch)
require.True(t, supported)

epoch, supported = sm.SupportFor(remoteStore)
epoch, supported := sm.SupportFor(remoteStore)
require.Equal(t, slpb.Epoch(1), epoch)
require.True(t, supported)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/storeliveness/testdata/basic
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

send-heartbeats now=100 liveness-interval=10s
----
Expand All @@ -24,7 +24,7 @@ responses:

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 110.000000000,0, support provided: true
epoch: 1, expiration: 110.000000000,0

support-for node-id=2 store-id=2
----
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/storeliveness/testdata/liveness_interval
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) requests and receives support with
Expand All @@ -23,7 +23,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 110.000000000,0, support provided: true
epoch: 1, expiration: 110.000000000,0


# -------------------------------------------------------------
Expand All @@ -42,7 +42,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 121.000000000,0, support provided: true
epoch: 1, expiration: 121.000000000,0


# -------------------------------------------------------------
Expand All @@ -61,4 +61,4 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 121.000000000,0, support provided: true
epoch: 1, expiration: 121.000000000,0
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/storeliveness/testdata/multi-store
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

support-from node-id=1 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

support-from node-id=2 store-id=3
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

support-from node-id=2 store-id=4
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

send-heartbeats now=100 liveness-interval=10s
----
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/storeliveness/testdata/requester_state
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) successfully establishes support.
Expand All @@ -22,7 +22,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 110.000000000,0, support provided: true
epoch: 1, expiration: 110.000000000,0

debug-requester-state
----
Expand All @@ -47,7 +47,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 1, expiration: 210.000000000,0, support provided: true
epoch: 1, expiration: 210.000000000,0


# -------------------------------------------------------------
Expand All @@ -65,7 +65,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

debug-requester-state
----
Expand All @@ -90,7 +90,7 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0


# -------------------------------------------------------------
Expand All @@ -103,27 +103,27 @@ handle-messages

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

handle-messages
msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=1 expiration=0
----

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

handle-messages
msg type=MsgHeartbeatResp from-node-id=2 from-store-id=2 epoch=2 expiration=400
----

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0


# -------------------------------------------------------------
# Store (n1, s1) requests support but receives to response.
# Store (n1, s1) requests support but receives no response.
# -------------------------------------------------------------

send-heartbeats now=500 liveness-interval=10s
Expand All @@ -133,7 +133,7 @@ heartbeats:

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

debug-requester-state
----
Expand Down Expand Up @@ -183,7 +183,7 @@ heartbeats:

support-from node-id=2 store-id=2
----
epoch: 2, expiration: 410.000000000,0, support provided: true
epoch: 2, expiration: 410.000000000,0

send-heartbeats now=700 liveness-interval=10s
----
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/storeliveness/testdata/restart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) establishes support for and from (n2, s2).
Expand Down Expand Up @@ -129,7 +129,7 @@ heartbeats:

support-from node-id=2 store-id=2
----
epoch: 0, expiration: 0,0, support provided: false
epoch: 0, expiration: 0,0

# -------------------------------------------------------------
# Store (n1, s1) sends heartbeats with an incremented epoch.
Expand Down
Loading
Loading