diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index e2f26117d98..aee855c79cd 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -72,6 +72,8 @@ type toApply struct { snapshot raftpb.Snapshot // notifyc synchronizes etcd server applies with the raft node notifyc chan struct{} + // confChangeCh synchronizes etcd server applies confChange with raft node + confChangeCh chan struct{} } type raftNode struct { @@ -203,10 +205,12 @@ func (r *raftNode) start(rh *raftReadyHandler) { } notifyc := make(chan struct{}, 1) + confChangeCh := make(chan struct{}, 1) ap := toApply{ - entries: rd.CommittedEntries, - snapshot: rd.Snapshot, - notifyc: notifyc, + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + notifyc: notifyc, + confChangeCh: confChangeCh, } updateCommittedIndex(&ap, rh) @@ -269,6 +273,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.raftStorage.Append(rd.Entries) + waitApply := false + for _, ent := range rd.CommittedEntries { + if ent.Type == raftpb.EntryConfChange { + waitApply = true + break + } + } + if !islead { // finish processing incoming messages before we signal notifyc chan msgs := r.processMessages(rd.Messages) @@ -283,13 +295,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // on its own single-node cluster, before toApply-layer applies the config change. // We simply wait for ALL pending entries to be applied for now. // We might improve this later on if it causes unnecessary long blocking issues. - waitApply := false - for _, ent := range rd.CommittedEntries { - if ent.Type == raftpb.EntryConfChange { - waitApply = true - break - } - } + if waitApply { // blocks until 'applyAll' calls 'applyWait.Trigger' // to be in sync with scheduled config-change job @@ -310,6 +316,11 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftBeforeAdvance struct{} r.Advance() + + if waitApply { + // notify etcdserver that raft has already been notified or advanced. + confChangeCh <- struct{}{} + } case <-r.stopped: return } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 636c4aaedcc..dc81c81764a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { return } var shouldstop bool - if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { + if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.confChangeCh); shouldstop { go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } @@ -1810,6 +1810,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, + confChangeCh chan struct{}, ) (appliedt uint64, appliedi uint64, shouldStop bool) { s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) for i := range es { @@ -1841,6 +1842,18 @@ func (s *EtcdServer) apply( s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf + + // etcdserver need to ensure the raft has already been notified + // or advanced before it responds to the client. Otherwise, the + // following config change request may be rejected. + // See https://github.com/etcd-io/etcd/issues/15528. + select { + case <-time.After(500 * time.Millisecond): + lg := s.Logger() + lg.Warn("timed out waiting for configChange notification") + case <-confChangeCh: + } + s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) default: diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index f5414e188fc..7ca28c0347f 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -689,7 +689,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { Data: pbutil.MustMarshal(cc), }} - _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) + confChangeCh := make(chan struct{}, 1) + _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, confChangeCh) consistIndex := srv.consistIndex.ConsistentIndex() assert.Equal(t, uint64(2), appliedi) @@ -763,7 +764,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { ents = append(ents, ent) } - _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) + confChangeCh := make(chan struct{}, 1) + _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, confChangeCh) if !shouldStop { t.Errorf("shouldStop = %t, want %t", shouldStop, true) }