Skip to content

Commit

Permalink
etcdserver: wait for raft is notified on confChange before responding…
Browse files Browse the repository at this point in the history
… to client

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jun 28, 2023
1 parent bda68d8 commit cf1b668
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
31 changes: 21 additions & 10 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type toApply struct {
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
// confChangeCh synchronizes etcd server applies confChange with raft node
confChangeCh chan struct{}
}

type raftNode struct {
Expand Down Expand Up @@ -203,10 +205,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}

notifyc := make(chan struct{}, 1)
confChangeCh := make(chan struct{}, 1)
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
confChangeCh: confChangeCh,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -269,6 +273,14 @@ func (r *raftNode) start(rh *raftReadyHandler) {

r.raftStorage.Append(rd.Entries)

waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}

if !islead {
// finish processing incoming messages before we signal notifyc chan
msgs := r.processMessages(rd.Messages)
Expand All @@ -283,13 +295,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// on its own single-node cluster, before toApply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}

if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
Expand All @@ -310,6 +316,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {

// gofail: var raftBeforeAdvance struct{}
r.Advance()

if waitApply {
// notify etcdserver that raft has already been notified or advanced.
confChangeCh <- struct{}{}
}
case <-r.stopped:
return
}
Expand Down
15 changes: 14 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
return
}
var shouldstop bool
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.confChangeCh); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
Expand Down Expand Up @@ -1810,6 +1810,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
confChangeCh chan struct{},
) (appliedt uint64, appliedi uint64, shouldStop bool) {
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
Expand Down Expand Up @@ -1841,6 +1842,18 @@ func (s *EtcdServer) apply(
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf

// etcdserver need to ensure the raft has already been notified
// or advanced before it responds to the client. Otherwise, the
// following config change request may be rejected.
// See https://github.com/etcd-io/etcd/issues/15528.
select {
case <-time.After(500 * time.Millisecond):
lg := s.Logger()
lg.Warn("timed out waiting for configChange notification")
case <-confChangeCh:
}

s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})

default:
Expand Down
6 changes: 4 additions & 2 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
Data: pbutil.MustMarshal(cc),
}}

_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
confChangeCh := make(chan struct{}, 1)
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
consistIndex := srv.consistIndex.ConsistentIndex()
assert.Equal(t, uint64(2), appliedi)

Expand Down Expand Up @@ -763,7 +764,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}

_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
confChangeCh := make(chan struct{}, 1)
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}
Expand Down

0 comments on commit cf1b668

Please sign in to comment.