diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 2a315ea58656..5c3f6234d243 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -99,6 +99,9 @@ type raftNode struct { stopped chan struct{} done chan struct{} + + // used by liveness probe to check whether the raftloop is blocked. + dummyc chan struct{} } type raftNodeConfig struct { @@ -142,6 +145,7 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { applyc: make(chan toApply), stopped: make(chan struct{}), done: make(chan struct{}), + dummyc: make(chan struct{}), } if r.heartbeat == 0 { r.ticker = &time.Ticker{} @@ -322,6 +326,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { // notify etcdserver that raft has already been notified or advanced. raftAdvancedC <- struct{}{} } + case <-r.dummyc: + r.lg.Debug("Received dummy event") case <-r.stopped: return } @@ -413,6 +419,17 @@ func (r *raftNode) onStop() { close(r.done) } +func (r *raftNode) trySendDummyEvent(timeout time.Duration) error { + select { + case r.dummyc <- struct{}{}: + case <-r.done: + case <-time.After(timeout): + return fmt.Errorf("failed to send dummy event in %s", timeout.String()) + } + + return nil +} + // for testing func (r *raftNode) pauseSending() { p := r.transport.(rafthttp.Pausable) diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 079aa13e2f74..ef29a701d99c 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/client/pkg/v3/types" @@ -322,3 +323,70 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) { } } } + +func TestTrySendDummyEvent(t *testing.T) { + testCases := []struct { + name string + drainApply bool + stopped bool + expectBlocked bool + }{ + { + name: "normal case", + drainApply: true, + stopped: false, + expectBlocked: false, + }, + { + name: "blocked on apply", + drainApply: false, + stopped: false, + expectBlocked: true, + }, + { + name: "not blocked due to stopped", + drainApply: false, + stopped: true, + expectBlocked: false, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + n := newNopReadyNode() + + r := newRaftNode(raftNodeConfig{ + lg: zaptest.NewLogger(t), + Node: n, + storage: mockstorage.NewStorageRecorder(""), + raftStorage: raft.NewMemoryStorage(), + transport: newNopTransporter(), + }) + srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zaptest.NewLogger(t), r: *r} + + srv.r.start(&raftReadyHandler{ + getLead: func() uint64 { return 0 }, + updateLead: func(uint64) {}, + updateLeadership: func(bool) {}, + }) + defer srv.r.Stop() + + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + Entries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + + if tc.drainApply { + _ = <-srv.r.applyc + } + + if tc.stopped { + close(r.done) + } + + err := r.trySendDummyEvent(2 * time.Second) + assert.Equal(t, tc.expectBlocked, err != nil, err) + }) + } +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1d48fa6732cb..5ce6a58b1632 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1262,6 +1262,17 @@ func (s *EtcdServer) Stop() { s.HardStop() } +// IsRaftLoopBlocked checks whether the raft loop has blocked for at least +// the duration specified by `timeout`, and it defaults to 2*ElectionTimeout, +// which is the maximum time to trigger a new leader election. +// If the returned error isn't nil, then it's blocked; otherwise not. +func (s *EtcdServer) IsRaftLoopBlocked(timeout time.Duration) error { + if timeout == 0 { + timeout = 2 * s.Cfg.ElectionTimeout() + } + return s.r.trySendDummyEvent(timeout) +} + // ReadyNotify returns a channel that will be closed when the server // is ready to serve client requests func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }