Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: update TruncatedState before writing #131063

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions pkg/kv/kvserver/raft_log_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,17 @@ type replicaForTruncator interface {
getRangeID() roachpb.RangeID
// Returns the current truncated state.
getTruncatedState() kvserverpb.RaftTruncatedState
// Updates the replica state after the truncation is enacted.
setTruncatedStateAndSideEffects(
_ context.Context, _ *kvserverpb.RaftTruncatedState, expectedFirstIndexPreTruncation kvpb.RaftIndex,
) (expectedFirstIndexWasAccurate bool)
// Updates the stats related to the raft log size after the truncation is
// Updates the replica truncated state before the truncation is enacted.
setTruncatedState(
_ kvserverpb.RaftTruncatedState,
expectedFirstIndexPreTruncation kvpb.RaftIndex,
isDeltaTrusted bool,
)
// Updates the stats related to the raft log size before the truncation is
// enacted.
setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool)
setTruncationDelta(deltaBytes int64)
// Updates the replica state after the truncation is enacted.
applySideEffects(_ context.Context, _ *kvserverpb.RaftTruncatedState)
// Returns the pending truncations queue. The caller is allowed to mutate
// the return value by additionally acquiring pendingLogTruncations.mu.
getPendingTruncs() *pendingLogTruncations
Expand Down Expand Up @@ -574,6 +578,19 @@ func (t *raftLogTruncator) tryEnactTruncations(
pendingTruncs.reset()
return
}
// Update the Replica's in-memory TruncatedState before applying the write
// batch to storage. Readers of the raft log storage who synchronize with it
// via r.mu, and read TruncatedState, will then expect to find entries at
// indices > TruncatedState.Index in the log. If we write the batch first, and
// only then update TruncatedState, there is a time window during which the
// log storage appears to have a gap.
pendingTruncs.iterateLocked(func(index int, trunc pendingTruncation) {
if index > enactIndex {
return
}
r.setTruncatedState(trunc.RaftTruncatedState, trunc.expectedFirstIndex, trunc.isDeltaTrusted)
r.setTruncationDelta(trunc.logDeltaBytes)
})
// sync=false since we don't need a guarantee that the truncation is
// durable. Loss of a truncation means we have more of the suffix of the
// raft log, which does not affect correctness.
Expand All @@ -588,13 +605,7 @@ func (t *raftLogTruncator) tryEnactTruncations(
if index > enactIndex {
return
}
isDeltaTrusted := true
expectedFirstIndexWasAccurate := r.setTruncatedStateAndSideEffects(
ctx, &trunc.RaftTruncatedState, trunc.expectedFirstIndex)
if !expectedFirstIndexWasAccurate || !trunc.isDeltaTrusted {
isDeltaTrusted = false
}
r.setTruncationDeltaAndTrusted(trunc.logDeltaBytes, isDeltaTrusted)
r.applySideEffects(ctx, &trunc.RaftTruncatedState)
})
// Now remove the enacted truncations. It is the same iteration as the
// previous one, but we do it while holding pendingTruncs.mu. Note that
Expand Down
34 changes: 18 additions & 16 deletions pkg/kv/kvserver/raft_log_truncator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,24 @@ func (r *replicaTruncatorTest) getPendingTruncs() *pendingLogTruncations {
return &r.pendingTruncs
}

func (r *replicaTruncatorTest) setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) {
fmt.Fprintf(r.buf, "r%d.setTruncationDeltaAndTrusted(delta:%d, trusted:%t)\n",
r.rangeID, deltaBytes, isDeltaTrusted)
func (r *replicaTruncatorTest) setTruncatedState(
truncState kvserverpb.RaftTruncatedState,
expectedFirstIndexPreTruncation kvpb.RaftIndex,
isDeltaTrusted bool,
) {
expectedFirstIndexWasAccurate := r.truncState.Index+1 == expectedFirstIndexPreTruncation
r.truncState = truncState
fmt.Fprintf(r.buf, "r%d.setTruncatedState(exp:%d, trusted:%t) => accurate:%t\n",
r.rangeID, expectedFirstIndexPreTruncation, isDeltaTrusted, expectedFirstIndexWasAccurate)
}
func (r *replicaTruncatorTest) setTruncationDelta(deltaBytes int64) {
fmt.Fprintf(r.buf, "r%d.setTruncationDelta(delta:%d)\n", r.rangeID, deltaBytes)
}

func (r *replicaTruncatorTest) applySideEffects(
_ context.Context, ts *kvserverpb.RaftTruncatedState,
) {
fmt.Fprintf(r.buf, "r%d.applySideEffects(index:%d)\n", r.rangeID, ts.Index)
}

func (r *replicaTruncatorTest) sideloadedBytesIfTruncatedFromTo(
Expand All @@ -162,19 +177,6 @@ func (r *replicaTruncatorTest) getStateLoader() stateloader.StateLoader {
return r.stateLoader
}

func (r *replicaTruncatorTest) setTruncatedStateAndSideEffects(
_ context.Context,
truncState *kvserverpb.RaftTruncatedState,
expectedFirstIndexPreTruncation kvpb.RaftIndex,
) (expectedFirstIndexWasAccurate bool) {
expectedFirstIndexWasAccurate = r.truncState.Index+1 == expectedFirstIndexPreTruncation
r.truncState = *truncState
fmt.Fprintf(r.buf,
"r%d.setTruncatedStateAndSideEffects(..., expectedFirstIndex:%d) => trusted:%t\n",
r.rangeID, expectedFirstIndexPreTruncation, expectedFirstIndexWasAccurate)
return expectedFirstIndexWasAccurate
}

func (r *replicaTruncatorTest) writeRaftStateToEngine(
t *testing.T, eng storage.Engine, truncIndex kvpb.RaftIndex, lastLogEntry kvpb.RaftIndex,
) {
Expand Down
23 changes: 12 additions & 11 deletions pkg/kv/kvserver/raft_truncator_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,21 @@ func (r *raftTruncatorReplica) getTruncatedState() kvserverpb.RaftTruncatedState
return *r.mu.state.TruncatedState
}

func (r *raftTruncatorReplica) setTruncatedStateAndSideEffects(
ctx context.Context,
trunc *kvserverpb.RaftTruncatedState,
func (r *raftTruncatorReplica) setTruncatedState(
rt kvserverpb.RaftTruncatedState,
expectedFirstIndexPreTruncation kvpb.RaftIndex,
) (expectedFirstIndexWasAccurate bool) {
_, expectedFirstIndexAccurate := (*Replica)(r).handleTruncatedStateResult(
ctx, trunc, expectedFirstIndexPreTruncation)
return expectedFirstIndexAccurate
isDeltaTrusted bool,
) {
(*Replica)(r).setTruncatedStateMuLocked(&rt, expectedFirstIndexPreTruncation, isDeltaTrusted)
}

func (r *raftTruncatorReplica) setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) {
func (r *raftTruncatorReplica) applySideEffects(
ctx context.Context, trunc *kvserverpb.RaftTruncatedState,
) {
(*Replica)(r).handleTruncatedStateResult(ctx, trunc)
}

func (r *raftTruncatorReplica) setTruncationDelta(deltaBytes int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.mu.raftLogSize += deltaBytes
Expand All @@ -58,9 +62,6 @@ func (r *raftTruncatorReplica) setTruncationDeltaAndTrusted(deltaBytes int64, is
if r.mu.raftLogLastCheckSize < 0 {
r.mu.raftLogLastCheckSize = 0
}
if !isDeltaTrusted {
r.mu.raftLogSizeTrusted = false
}
}

func (r *raftTruncatorReplica) getPendingTruncs() *pendingLogTruncations {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,16 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
res.RaftLogDelta)
}
if apply {
// Update the Replica's in-memory TruncatedState before applying the write
// batch to storage. Readers of the raft log storage who synchronize with
// it via r.mu, and read TruncatedState, will then expect to find entries
// at indices > TruncatedState.Index in the log. If we write the batch
// first, and only then update TruncatedState, there is a time window
// during which the log storage appears to have a gap.
isDeltaTrusted := res.RaftExpectedFirstIndex != 0
b.r.setTruncatedStateMuLocked(res.State.TruncatedState,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I don't think we hold replicaMu here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we don't. It's locked inside the method though, just need to remove MuLocked from the name.

res.RaftExpectedFirstIndex, isDeltaTrusted)

// This truncation command will apply synchronously in this batch.
// Determine if there are any sideloaded entries that will be removed as a
// side effect.
Expand Down
27 changes: 17 additions & 10 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,17 +486,24 @@ func (r *Replica) handleLeaseResult(
assertNoLeaseJump)
}

func (r *Replica) handleTruncatedStateResult(
ctx context.Context,
t *kvserverpb.RaftTruncatedState,
func (r *Replica) setTruncatedStateMuLocked(
ts *kvserverpb.RaftTruncatedState,
expectedFirstIndexPreTruncation kvpb.RaftIndex,
) (raftLogDelta int64, expectedFirstIndexWasAccurate bool) {
isDeltaTrusted bool,
) {
r.mu.Lock()
expectedFirstIndexWasAccurate =
defer r.mu.Unlock()
expectedFirstIndexWasAccurate :=
r.mu.state.TruncatedState.Index+1 == expectedFirstIndexPreTruncation
r.mu.state.TruncatedState = t
r.mu.Unlock()
r.mu.state.TruncatedState = ts
if !expectedFirstIndexWasAccurate || !isDeltaTrusted {
r.mu.raftLogSizeTrusted = false
}
}

func (r *Replica) handleTruncatedStateResult(
ctx context.Context, t *kvserverpb.RaftTruncatedState,
) (raftLogDelta int64) {
// Clear any entries in the Raft log entry cache for this range up
// to and including the most recently truncated index.
r.store.raftEntryCache.Clear(r.RangeID, t.Index+1)
Expand All @@ -523,7 +530,7 @@ func (r *Replica) handleTruncatedStateResult(
// crashes if the filesystem is quick enough to sync it for us. Add a test
// that syncs the files removal here, and "crashes" right after, to help
// reproduce and fix #113135.
return -size, expectedFirstIndexWasAccurate
return -size
}

func (r *Replica) handleGCThresholdResult(ctx context.Context, thresh *hlc.Timestamp) {
Expand Down Expand Up @@ -607,6 +614,6 @@ func (r *Replica) handleChangeReplicasResult(
}

// TODO(sumeer): remove method when all truncation is loosely coupled.
func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64, isDeltaTrusted bool) {
(*raftTruncatorReplica)(r).setTruncationDeltaAndTrusted(delta, isDeltaTrusted)
func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64) {
(*raftTruncatorReplica)(r).setTruncationDelta(delta)
}
9 changes: 2 additions & 7 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult(
log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult")
}

isRaftLogTruncationDeltaTrusted := true
if rResult.State != nil {
if newLease := rResult.State.Lease; newLease != nil {
sm.r.handleLeaseResult(ctx, newLease, rResult.PriorReadSummary)
Expand All @@ -298,11 +297,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult(
// This strongly coupled truncation code will be removed in the release
// following LooselyCoupledRaftLogTruncation.
if newTruncState := rResult.State.TruncatedState; newTruncState != nil {
raftLogDelta, expectedFirstIndexWasAccurate := sm.r.handleTruncatedStateResult(
ctx, newTruncState, rResult.RaftExpectedFirstIndex)
if !expectedFirstIndexWasAccurate && rResult.RaftExpectedFirstIndex != 0 {
isRaftLogTruncationDeltaTrusted = false
}
raftLogDelta := sm.r.handleTruncatedStateResult(ctx, newTruncState)
rResult.RaftLogDelta += raftLogDelta
rResult.State.TruncatedState = nil
rResult.RaftExpectedFirstIndex = 0
Expand All @@ -327,7 +322,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult(
// This code path will be taken exactly when the preceding block has
// newTruncState != nil. It is needlessly confusing that these two are not
// in the same place.
sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta, isRaftLogTruncationDeltaTrusted)
sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta)
rResult.RaftLogDelta = 0
}

Expand Down
45 changes: 27 additions & 18 deletions pkg/kv/kvserver/testdata/raft_log_truncator
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ acquireReplica(1)
r1.getTruncatedState
r1.getPendingTruncs
r1.getStateLoader
r1.setTruncatedStateAndSideEffects(..., expectedFirstIndex:16) => trusted:false
r1.setTruncationDeltaAndTrusted(delta:-30, trusted:false)
r1.setTruncatedState(exp:16, trusted:true) => accurate:false
r1.setTruncationDelta(delta:-30)
r1.applySideEffects(index:22)
releaseReplica(1)
acquireReplica(2)
r2.getTruncatedState
Expand Down Expand Up @@ -167,8 +168,9 @@ acquireReplica(2)
r2.getTruncatedState
r2.getPendingTruncs
r2.getStateLoader
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:true
r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true)
r2.setTruncatedState(exp:21, trusted:true) => accurate:true
r2.setTruncationDelta(delta:-30)
r2.applySideEffects(index:22)
releaseReplica(2)
truncator ranges:

Expand Down Expand Up @@ -205,8 +207,9 @@ acquireReplica(2)
r2.getTruncatedState
r2.getPendingTruncs
r2.getStateLoader
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:false
r2.setTruncationDeltaAndTrusted(delta:-30, trusted:false)
r2.setTruncatedState(exp:21, trusted:true) => accurate:false
r2.setTruncationDelta(delta:-30)
r2.applySideEffects(index:24)
releaseReplica(2)
truncator ranges:

Expand Down Expand Up @@ -269,8 +272,9 @@ acquireReplica(2)
r2.getTruncatedState
r2.getPendingTruncs
r2.getStateLoader
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:25) => trusted:true
r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true)
r2.setTruncatedState(exp:25, trusted:true) => accurate:true
r2.setTruncationDelta(delta:-30)
r2.applySideEffects(index:26)
releaseReplica(2)
truncator ranges: 2

Expand Down Expand Up @@ -316,10 +320,12 @@ acquireReplica(2)
r2.getTruncatedState
r2.getPendingTruncs
r2.getStateLoader
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:27) => trusted:true
r2.setTruncationDeltaAndTrusted(delta:-60, trusted:false)
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:30) => trusted:true
r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true)
r2.setTruncatedState(exp:27, trusted:false) => accurate:true
r2.setTruncationDelta(delta:-60)
r2.setTruncatedState(exp:30, trusted:true) => accurate:true
r2.setTruncationDelta(delta:-30)
r2.applySideEffects(index:29)
r2.applySideEffects(index:31)
releaseReplica(2)
truncator ranges:

Expand Down Expand Up @@ -382,10 +388,12 @@ acquireReplica(2)
r2.getTruncatedState
r2.getPendingTruncs
r2.getStateLoader
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:32) => trusted:true
r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true)
r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:33) => trusted:true
r2.setTruncationDeltaAndTrusted(delta:-60, trusted:false)
r2.setTruncatedState(exp:32, trusted:true) => accurate:true
r2.setTruncationDelta(delta:-30)
r2.setTruncatedState(exp:33, trusted:false) => accurate:true
r2.setTruncationDelta(delta:-60)
r2.applySideEffects(index:32)
r2.applySideEffects(index:34)
releaseReplica(2)
truncator ranges:

Expand Down Expand Up @@ -472,8 +480,9 @@ acquireReplica(3)
r3.getTruncatedState
r3.getPendingTruncs
r3.getStateLoader
r3.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:true
r3.setTruncationDeltaAndTrusted(delta:-30, trusted:true)
r3.setTruncatedState(exp:21, trusted:true) => accurate:true
r3.setTruncationDelta(delta:-30)
r3.applySideEffects(index:22)
releaseReplica(3)
truncator ranges: 3

Expand Down
Loading