From 0faea5a6c1f41ed17fe249ac671669a923c6f4c0 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 12 Sep 2024 23:09:09 +0100 Subject: [PATCH] kvserver: enable admitted vector protocol This commit enables AdmittedState annotations in RaftMessageRequest and RaftMessageRequestBatch protocol. The leader starts receiving admitted vector updates from all replicas. Epic: none Release note: none --- pkg/kv/kvserver/raft_transport.go | 4 ---- pkg/kv/kvserver/replica_raft.go | 8 +++++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 96e36310f5cd..eea4cdc54ac6 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -749,10 +749,6 @@ func (t *RaftTransport) processQueue( maybeAnnotateWithAdmittedStates := func( batch *kvserverpb.RaftMessageRequestBatch, admitted []kvflowcontrolpb.PiggybackedAdmittedState, ) { - // TODO(pav-kv): send these protos once they are populated correctly. - if true { - return - } batch.AdmittedStates = append(batch.AdmittedStates, admitted...) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 0694fe40fbd1..67cf2a2f8dff 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1943,11 +1943,13 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. if msg.Type == raftpb.MsgAppResp && !msg.Reject { + // TODO(pav-kv): Should we make it conditional to leader using RACv2? There + // is no harm in attaching these annotations, but the can end up being a + // no-op. OTOH, transition to V2 should be quick, so maybe it's fine not to + // bother. admitted := r.flowControlV2.AdmittedState() // The admitted state must be in the coordinate system of the leader's log. - if admitted.Term == msg.Term && false { - // TODO(pav-kv): enable this annotation when it is covered with tests, and - // the leader knows how to handle it. + if admitted.Term == msg.Term { req.AdmittedState = kvflowcontrolpb.AdmittedState{ Term: admitted.Term, Admitted: admitted.Admitted[:],