diff --git a/pkg/kv/kvserver/raft_log_truncator.go b/pkg/kv/kvserver/raft_log_truncator.go index 2f7b8818291c..0d89d1ce5c52 100644 --- a/pkg/kv/kvserver/raft_log_truncator.go +++ b/pkg/kv/kvserver/raft_log_truncator.go @@ -251,13 +251,17 @@ type replicaForTruncator interface { getRangeID() roachpb.RangeID // Returns the current truncated state. getTruncatedState() kvserverpb.RaftTruncatedState - // Updates the replica state after the truncation is enacted. - setTruncatedStateAndSideEffects( - _ context.Context, _ *kvserverpb.RaftTruncatedState, expectedFirstIndexPreTruncation kvpb.RaftIndex, - ) (expectedFirstIndexWasAccurate bool) - // Updates the stats related to the raft log size after the truncation is + // Updates the replica truncated state before the truncation is enacted. + setTruncatedState( + _ kvserverpb.RaftTruncatedState, + expectedFirstIndexPreTruncation kvpb.RaftIndex, + isDeltaTrusted bool, + ) + // Updates the stats related to the raft log size before the truncation is // enacted. - setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) + setTruncationDelta(deltaBytes int64) + // Updates the replica state after the truncation is enacted. + applySideEffects(_ context.Context, _ *kvserverpb.RaftTruncatedState) // Returns the pending truncations queue. The caller is allowed to mutate // the return value by additionally acquiring pendingLogTruncations.mu. getPendingTruncs() *pendingLogTruncations @@ -574,6 +578,19 @@ func (t *raftLogTruncator) tryEnactTruncations( pendingTruncs.reset() return } + // Update the Replica's in-memory TruncatedState before applying the write + // batch to storage. Readers of the raft log storage who synchronize with it + // via r.mu, and read TruncatedState, will then expect to find entries at + // indices > TruncatedState.Index in the log. If we write the batch first, and + // only then update TruncatedState, there is a time window during which the + // log storage appears to have a gap. + pendingTruncs.iterateLocked(func(index int, trunc pendingTruncation) { + if index > enactIndex { + return + } + r.setTruncatedState(trunc.RaftTruncatedState, trunc.expectedFirstIndex, trunc.isDeltaTrusted) + r.setTruncationDelta(trunc.logDeltaBytes) + }) // sync=false since we don't need a guarantee that the truncation is // durable. Loss of a truncation means we have more of the suffix of the // raft log, which does not affect correctness. @@ -588,13 +605,7 @@ func (t *raftLogTruncator) tryEnactTruncations( if index > enactIndex { return } - isDeltaTrusted := true - expectedFirstIndexWasAccurate := r.setTruncatedStateAndSideEffects( - ctx, &trunc.RaftTruncatedState, trunc.expectedFirstIndex) - if !expectedFirstIndexWasAccurate || !trunc.isDeltaTrusted { - isDeltaTrusted = false - } - r.setTruncationDeltaAndTrusted(trunc.logDeltaBytes, isDeltaTrusted) + r.applySideEffects(ctx, &trunc.RaftTruncatedState) }) // Now remove the enacted truncations. It is the same iteration as the // previous one, but we do it while holding pendingTruncs.mu. Note that diff --git a/pkg/kv/kvserver/raft_log_truncator_test.go b/pkg/kv/kvserver/raft_log_truncator_test.go index 7466d5c209f7..08c2df974ff2 100644 --- a/pkg/kv/kvserver/raft_log_truncator_test.go +++ b/pkg/kv/kvserver/raft_log_truncator_test.go @@ -145,9 +145,24 @@ func (r *replicaTruncatorTest) getPendingTruncs() *pendingLogTruncations { return &r.pendingTruncs } -func (r *replicaTruncatorTest) setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) { - fmt.Fprintf(r.buf, "r%d.setTruncationDeltaAndTrusted(delta:%d, trusted:%t)\n", - r.rangeID, deltaBytes, isDeltaTrusted) +func (r *replicaTruncatorTest) setTruncatedState( + truncState kvserverpb.RaftTruncatedState, + expectedFirstIndexPreTruncation kvpb.RaftIndex, + isDeltaTrusted bool, +) { + expectedFirstIndexWasAccurate := r.truncState.Index+1 == expectedFirstIndexPreTruncation + r.truncState = truncState + fmt.Fprintf(r.buf, "r%d.setTruncatedState(exp:%d, trusted:%t) => accurate:%t\n", + r.rangeID, expectedFirstIndexPreTruncation, isDeltaTrusted, expectedFirstIndexWasAccurate) +} +func (r *replicaTruncatorTest) setTruncationDelta(deltaBytes int64) { + fmt.Fprintf(r.buf, "r%d.setTruncationDelta(delta:%d)\n", r.rangeID, deltaBytes) +} + +func (r *replicaTruncatorTest) applySideEffects( + _ context.Context, ts *kvserverpb.RaftTruncatedState, +) { + fmt.Fprintf(r.buf, "r%d.applySideEffects(index:%d)\n", r.rangeID, ts.Index) } func (r *replicaTruncatorTest) sideloadedBytesIfTruncatedFromTo( @@ -162,19 +177,6 @@ func (r *replicaTruncatorTest) getStateLoader() stateloader.StateLoader { return r.stateLoader } -func (r *replicaTruncatorTest) setTruncatedStateAndSideEffects( - _ context.Context, - truncState *kvserverpb.RaftTruncatedState, - expectedFirstIndexPreTruncation kvpb.RaftIndex, -) (expectedFirstIndexWasAccurate bool) { - expectedFirstIndexWasAccurate = r.truncState.Index+1 == expectedFirstIndexPreTruncation - r.truncState = *truncState - fmt.Fprintf(r.buf, - "r%d.setTruncatedStateAndSideEffects(..., expectedFirstIndex:%d) => trusted:%t\n", - r.rangeID, expectedFirstIndexPreTruncation, expectedFirstIndexWasAccurate) - return expectedFirstIndexWasAccurate -} - func (r *replicaTruncatorTest) writeRaftStateToEngine( t *testing.T, eng storage.Engine, truncIndex kvpb.RaftIndex, lastLogEntry kvpb.RaftIndex, ) { diff --git a/pkg/kv/kvserver/raft_truncator_replica.go b/pkg/kv/kvserver/raft_truncator_replica.go index 6d7a5687ca89..8f6c895183d8 100644 --- a/pkg/kv/kvserver/raft_truncator_replica.go +++ b/pkg/kv/kvserver/raft_truncator_replica.go @@ -35,17 +35,21 @@ func (r *raftTruncatorReplica) getTruncatedState() kvserverpb.RaftTruncatedState return *r.mu.state.TruncatedState } -func (r *raftTruncatorReplica) setTruncatedStateAndSideEffects( - ctx context.Context, - trunc *kvserverpb.RaftTruncatedState, +func (r *raftTruncatorReplica) setTruncatedState( + rt kvserverpb.RaftTruncatedState, expectedFirstIndexPreTruncation kvpb.RaftIndex, -) (expectedFirstIndexWasAccurate bool) { - _, expectedFirstIndexAccurate := (*Replica)(r).handleTruncatedStateResult( - ctx, trunc, expectedFirstIndexPreTruncation) - return expectedFirstIndexAccurate + isDeltaTrusted bool, +) { + (*Replica)(r).setTruncatedStateMuLocked(&rt, expectedFirstIndexPreTruncation, isDeltaTrusted) } -func (r *raftTruncatorReplica) setTruncationDeltaAndTrusted(deltaBytes int64, isDeltaTrusted bool) { +func (r *raftTruncatorReplica) applySideEffects( + ctx context.Context, trunc *kvserverpb.RaftTruncatedState, +) { + (*Replica)(r).handleTruncatedStateResult(ctx, trunc) +} + +func (r *raftTruncatorReplica) setTruncationDelta(deltaBytes int64) { r.mu.Lock() defer r.mu.Unlock() r.mu.raftLogSize += deltaBytes @@ -58,9 +62,6 @@ func (r *raftTruncatorReplica) setTruncationDeltaAndTrusted(deltaBytes int64, is if r.mu.raftLogLastCheckSize < 0 { r.mu.raftLogLastCheckSize = 0 } - if !isDeltaTrusted { - r.mu.raftLogSizeTrusted = false - } } func (r *raftTruncatorReplica) getPendingTruncs() *pendingLogTruncations { diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index d513f75c4739..291c6f5623c1 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -437,6 +437,16 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly( res.RaftLogDelta) } if apply { + // Update the Replica's in-memory TruncatedState before applying the write + // batch to storage. Readers of the raft log storage who synchronize with + // it via r.mu, and read TruncatedState, will then expect to find entries + // at indices > TruncatedState.Index in the log. If we write the batch + // first, and only then update TruncatedState, there is a time window + // during which the log storage appears to have a gap. + isDeltaTrusted := res.RaftExpectedFirstIndex != 0 + b.r.setTruncatedStateMuLocked(res.State.TruncatedState, + res.RaftExpectedFirstIndex, isDeltaTrusted) + // This truncation command will apply synchronously in this batch. // Determine if there are any sideloaded entries that will be removed as a // side effect. diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 981ebc0535a1..6b739b15134e 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -486,17 +486,24 @@ func (r *Replica) handleLeaseResult( assertNoLeaseJump) } -func (r *Replica) handleTruncatedStateResult( - ctx context.Context, - t *kvserverpb.RaftTruncatedState, +func (r *Replica) setTruncatedStateMuLocked( + ts *kvserverpb.RaftTruncatedState, expectedFirstIndexPreTruncation kvpb.RaftIndex, -) (raftLogDelta int64, expectedFirstIndexWasAccurate bool) { + isDeltaTrusted bool, +) { r.mu.Lock() - expectedFirstIndexWasAccurate = + defer r.mu.Unlock() + expectedFirstIndexWasAccurate := r.mu.state.TruncatedState.Index+1 == expectedFirstIndexPreTruncation - r.mu.state.TruncatedState = t - r.mu.Unlock() + r.mu.state.TruncatedState = ts + if !expectedFirstIndexWasAccurate || !isDeltaTrusted { + r.mu.raftLogSizeTrusted = false + } +} +func (r *Replica) handleTruncatedStateResult( + ctx context.Context, t *kvserverpb.RaftTruncatedState, +) (raftLogDelta int64) { // Clear any entries in the Raft log entry cache for this range up // to and including the most recently truncated index. r.store.raftEntryCache.Clear(r.RangeID, t.Index+1) @@ -523,7 +530,7 @@ func (r *Replica) handleTruncatedStateResult( // crashes if the filesystem is quick enough to sync it for us. Add a test // that syncs the files removal here, and "crashes" right after, to help // reproduce and fix #113135. - return -size, expectedFirstIndexWasAccurate + return -size } func (r *Replica) handleGCThresholdResult(ctx context.Context, thresh *hlc.Timestamp) { @@ -607,6 +614,6 @@ func (r *Replica) handleChangeReplicasResult( } // TODO(sumeer): remove method when all truncation is loosely coupled. -func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64, isDeltaTrusted bool) { - (*raftTruncatorReplica)(r).setTruncationDeltaAndTrusted(delta, isDeltaTrusted) +func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64) { + (*raftTruncatorReplica)(r).setTruncationDelta(delta) } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 05a627dde604..9f3e589df341 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -287,7 +287,6 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult") } - isRaftLogTruncationDeltaTrusted := true if rResult.State != nil { if newLease := rResult.State.Lease; newLease != nil { sm.r.handleLeaseResult(ctx, newLease, rResult.PriorReadSummary) @@ -298,11 +297,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( // This strongly coupled truncation code will be removed in the release // following LooselyCoupledRaftLogTruncation. if newTruncState := rResult.State.TruncatedState; newTruncState != nil { - raftLogDelta, expectedFirstIndexWasAccurate := sm.r.handleTruncatedStateResult( - ctx, newTruncState, rResult.RaftExpectedFirstIndex) - if !expectedFirstIndexWasAccurate && rResult.RaftExpectedFirstIndex != 0 { - isRaftLogTruncationDeltaTrusted = false - } + raftLogDelta := sm.r.handleTruncatedStateResult(ctx, newTruncState) rResult.RaftLogDelta += raftLogDelta rResult.State.TruncatedState = nil rResult.RaftExpectedFirstIndex = 0 @@ -327,7 +322,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( // This code path will be taken exactly when the preceding block has // newTruncState != nil. It is needlessly confusing that these two are not // in the same place. - sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta, isRaftLogTruncationDeltaTrusted) + sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta) rResult.RaftLogDelta = 0 } diff --git a/pkg/kv/kvserver/testdata/raft_log_truncator b/pkg/kv/kvserver/testdata/raft_log_truncator index 81376d211f21..b88d74927583 100644 --- a/pkg/kv/kvserver/testdata/raft_log_truncator +++ b/pkg/kv/kvserver/testdata/raft_log_truncator @@ -91,8 +91,9 @@ acquireReplica(1) r1.getTruncatedState r1.getPendingTruncs r1.getStateLoader -r1.setTruncatedStateAndSideEffects(..., expectedFirstIndex:16) => trusted:false -r1.setTruncationDeltaAndTrusted(delta:-30, trusted:false) +r1.setTruncatedState(exp:16, trusted:true) => accurate:false +r1.setTruncationDelta(delta:-30) +r1.applySideEffects(index:22) releaseReplica(1) acquireReplica(2) r2.getTruncatedState @@ -167,8 +168,9 @@ acquireReplica(2) r2.getTruncatedState r2.getPendingTruncs r2.getStateLoader -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:true -r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +r2.setTruncatedState(exp:21, trusted:true) => accurate:true +r2.setTruncationDelta(delta:-30) +r2.applySideEffects(index:22) releaseReplica(2) truncator ranges: @@ -205,8 +207,9 @@ acquireReplica(2) r2.getTruncatedState r2.getPendingTruncs r2.getStateLoader -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:false -r2.setTruncationDeltaAndTrusted(delta:-30, trusted:false) +r2.setTruncatedState(exp:21, trusted:true) => accurate:false +r2.setTruncationDelta(delta:-30) +r2.applySideEffects(index:24) releaseReplica(2) truncator ranges: @@ -269,8 +272,9 @@ acquireReplica(2) r2.getTruncatedState r2.getPendingTruncs r2.getStateLoader -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:25) => trusted:true -r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +r2.setTruncatedState(exp:25, trusted:true) => accurate:true +r2.setTruncationDelta(delta:-30) +r2.applySideEffects(index:26) releaseReplica(2) truncator ranges: 2 @@ -316,10 +320,12 @@ acquireReplica(2) r2.getTruncatedState r2.getPendingTruncs r2.getStateLoader -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:27) => trusted:true -r2.setTruncationDeltaAndTrusted(delta:-60, trusted:false) -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:30) => trusted:true -r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +r2.setTruncatedState(exp:27, trusted:false) => accurate:true +r2.setTruncationDelta(delta:-60) +r2.setTruncatedState(exp:30, trusted:true) => accurate:true +r2.setTruncationDelta(delta:-30) +r2.applySideEffects(index:29) +r2.applySideEffects(index:31) releaseReplica(2) truncator ranges: @@ -382,10 +388,12 @@ acquireReplica(2) r2.getTruncatedState r2.getPendingTruncs r2.getStateLoader -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:32) => trusted:true -r2.setTruncationDeltaAndTrusted(delta:-30, trusted:true) -r2.setTruncatedStateAndSideEffects(..., expectedFirstIndex:33) => trusted:true -r2.setTruncationDeltaAndTrusted(delta:-60, trusted:false) +r2.setTruncatedState(exp:32, trusted:true) => accurate:true +r2.setTruncationDelta(delta:-30) +r2.setTruncatedState(exp:33, trusted:false) => accurate:true +r2.setTruncationDelta(delta:-60) +r2.applySideEffects(index:32) +r2.applySideEffects(index:34) releaseReplica(2) truncator ranges: @@ -472,8 +480,9 @@ acquireReplica(3) r3.getTruncatedState r3.getPendingTruncs r3.getStateLoader -r3.setTruncatedStateAndSideEffects(..., expectedFirstIndex:21) => trusted:true -r3.setTruncationDeltaAndTrusted(delta:-30, trusted:true) +r3.setTruncatedState(exp:21, trusted:true) => accurate:true +r3.setTruncationDelta(delta:-30) +r3.applySideEffects(index:22) releaseReplica(3) truncator ranges: 3