Skip to content

Commit

Permalink
Skip leadership check if the etcd instance is active processing heart…
Browse files Browse the repository at this point in the history
…beats

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Aug 14, 2024
1 parent b71ae9b commit 9dee9b7
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 2 deletions.
14 changes: 12 additions & 2 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ type apply struct {
type raftNode struct {
lg *zap.Logger

tickMu *sync.Mutex
tickMu *sync.RWMutex
// timestamp of the latest tick
latestTickTs time.Time
raftNodeConfig

// a chan to send/receive snapshot
Expand Down Expand Up @@ -131,8 +133,9 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
raft.SetLogger(lg)
r := &raftNode{
lg: cfg.lg,
tickMu: new(sync.Mutex),
tickMu: new(sync.RWMutex),
raftNodeConfig: cfg,
latestTickTs: time.Now(),
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
Expand All @@ -154,9 +157,16 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick()
r.latestTickTs = time.Now()
r.tickMu.Unlock()
}

func (r *raftNode) getLatestTickTs() time.Time {
r.tickMu.RLock()
defer r.tickMu.RUnlock()
return r.latestTickTs
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
Expand Down
22 changes: 22 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,10 +1174,32 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
})
}

// isActive checks if the etcd instance is still actively processing the
// heartbeat message (ticks). It returns false if no heartbeat has been
// received within 3 * tickMs.
func (s *EtcdServer) isActive() bool {
latestTickTs := s.r.getLatestTickTs()
threshold := 3 * time.Duration(s.Cfg.TickMs) * time.Millisecond
return latestTickTs.Add(threshold).After(time.Now())
}

// ensureLeadership checks whether current member is still the leader.
func (s *EtcdServer) ensureLeadership() bool {
lg := s.Logger()

if s.isActive() {
if lg != nil {
lg.Debug("The member is active, skip checking leadership",
zap.Time("latestTickTs", s.r.getLatestTickTs()),
zap.Time("now", time.Now()))
} else {
plog.Debugf("The member is active, skip checking leadership, latestTickTs: %s, now: %s",
s.r.getLatestTickTs(), time.Now())
}

return true
}

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
defer cancel()
if err := s.linearizableReadNotify(ctx); err != nil {
Expand Down
43 changes: 43 additions & 0 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
Expand Down Expand Up @@ -1995,3 +1996,45 @@ func TestWaitAppliedIndex(t *testing.T) {
})
}
}

func TestIsActive(t *testing.T) {
cases := []struct {
name string
tickMs uint
durationSinceLastTick time.Duration
expectActive bool
}{
{
name: "1.5*tickMs,active",
tickMs: 100,
durationSinceLastTick: 150 * time.Millisecond,
expectActive: true,
},
{
name: "2*tickMs,active",
tickMs: 200,
durationSinceLastTick: 400 * time.Millisecond,
expectActive: true,
},
{
name: "4*tickMs,not active",
tickMs: 150,
durationSinceLastTick: 600 * time.Millisecond,
expectActive: false,
},
}

for _, tc := range cases {
s := EtcdServer{
Cfg: ServerConfig{
TickMs: tc.tickMs,
},
r: raftNode{
tickMu: new(sync.RWMutex),
latestTickTs: time.Now().Add(-tc.durationSinceLastTick),
},
}

require.Equal(t, tc.expectActive, s.isActive())
}
}

0 comments on commit 9dee9b7

Please sign in to comment.