Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] backport ignore old leaders leases revoking requests #17425

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,15 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {

if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
if !srv.ensureLeadership() {
srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(srv.ID())))
return lease.ErrNotPrimary
}

srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
return nil
})
}

Expand Down Expand Up @@ -1145,7 +1152,19 @@ func (s *EtcdServer) run() {

func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
s.GoAttach(func() {
// We shouldn't revoke any leases if current member isn't a leader,
// because the operation should only be performed by the leader. When
// the leader gets blocked on the raft loop, such as writing WAL entries,
// it can't process any events or messages from raft. It may think it
// is still the leader even the leader has already changed.
// Refer to https://github.com/etcd-io/etcd/issues/15247
lg := s.Logger()
if !s.ensureLeadership() {
lg.Warn("Ignore the lease revoking request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.ID())))
return
}

// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, curLease := range leases {
Expand Down Expand Up @@ -1178,6 +1197,29 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
})
}

// ensureLeadership checks whether current member is still the leader.
func (s *EtcdServer) ensureLeadership() bool {
lg := s.Logger()

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
defer cancel()
if err := s.linearizableReadNotify(ctx); err != nil {
lg.Warn("Failed to check current member's leadership",
zap.Error(err))
return false
}

newLeaderId := s.raftStatus().Lead
if newLeaderId != uint64(s.ID()) {
lg.Warn("Current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.ID())),
zap.Uint64("new-lead", newLeaderId))
return false
}

return true
}

// Cleanup removes allocated objects by EtcdServer.NewServer in
// situation that EtcdServer::Start was not called (that takes care of cleanup).
func (s *EtcdServer) Cleanup() {
Expand Down
11 changes: 11 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,17 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
// If s.isLeader() returns true, but we fail to ensure the current
// member's leadership, there are a couple of possibilities:
// 1. current member gets stuck on writing WAL entries;
// 2. current member is in network isolation status;
// 3. current member isn't a leader anymore (possibly due to #1 above).
// In such case, we just return error to client, so that the client can
// switch to another member to continue the lease keep-alive operation.
if !s.ensureLeadership() {
return -1, lease.ErrNotPrimary
}

if err := s.waitAppliedIndex(); err != nil {
return 0, err
}
Expand Down
10 changes: 7 additions & 3 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type RangeDeleter func() TxnDelete

// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
// avoid circular dependency with mvcc.
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error

type LeaseID int64

Expand Down Expand Up @@ -423,7 +423,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil {
return -1, err
}
}

le.mu.Lock()
Expand Down Expand Up @@ -659,7 +661,9 @@ func (le *lessor) checkpointScheduledLeases() {
le.mu.Unlock()

if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil {
return
}
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
Expand Down
6 changes: 4 additions & 2 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer os.RemoveAll(dir)

le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
}
return nil
}
defer le.Stop()
// Set checkpointer
Expand Down Expand Up @@ -540,7 +541,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error {
close(checkpointedC)
if len(lc.Checkpoints) != 1 {
t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
Expand All @@ -549,6 +550,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
if c.Remaining_TTL != 1 {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
return nil
})
_, err := le.Grant(1, 2)
if err != nil {
Expand Down
157 changes: 157 additions & 0 deletions tests/e2e/v3_lease_no_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

// TestLeaseRevoke_IgnoreOldLeader verifies that leases shouldn't be revoked
// by old leader.
// See the case 1 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093.
func TestLeaseRevoke_IgnoreOldLeader(t *testing.T) {
testLeaseRevokeIssue(t, true)
}

// TestLeaseRevoke_ClientSwitchToOtherMember verifies that leases shouldn't
// be revoked by new leader.
// See the case 2 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093.
func TestLeaseRevoke_ClientSwitchToOtherMember(t *testing.T) {
testLeaseRevokeIssue(t, false)
}

func testLeaseRevokeIssue(t *testing.T, connectToOneFollower bool) {
e2e.BeforeTest(t)

ctx := context.Background()

t.Log("Starting a new etcd cluster")
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
GoFailEnabled: true,
GoFailClientTimeout: 40 * time.Second,
})
require.NoError(t, err)
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

leaderIdx := epc.WaitLeader(t)
t.Logf("Leader index: %d", leaderIdx)

epsForNormalOperations := epc.Procs[(leaderIdx+2)%3].EndpointsGRPC()
t.Logf("Creating a client for normal operations: %v", epsForNormalOperations)
client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second})
require.NoError(t, err)
defer client.Close()

var epsForLeaseKeepAlive []string
if connectToOneFollower {
epsForLeaseKeepAlive = epc.Procs[(leaderIdx+1)%3].EndpointsGRPC()
} else {
epsForLeaseKeepAlive = epc.EndpointsGRPC()
}
t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive)
clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second})
require.NoError(t, err)
defer clientForKeepAlive.Close()

resp, err := client.Status(ctx, epsForNormalOperations[0])
require.NoError(t, err)
oldLeaderId := resp.Leader

t.Log("Creating a new lease")
leaseRsp, err := client.Grant(ctx, 20)
require.NoError(t, err)

t.Log("Starting a goroutine to keep alive the lease")
doneC := make(chan struct{})
stopC := make(chan struct{})
startC := make(chan struct{}, 1)
go func() {
defer close(doneC)

respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID)
require.NoError(t, kerr)
// ensure we have received the first response from the server
<-respC
startC <- struct{}{}

for {
select {
case <-stopC:
return
case <-respC:
}
}
}()

t.Log("Wait for the keepAlive goroutine to get started")
<-startC

t.Log("Trigger the failpoint to simulate stalled writing")
err = epc.Procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`)
require.NoError(t, err)

cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId)
testutils.ExecuteUntil(cctx, t, func() {
for {
resp, err = client.Status(ctx, epsForNormalOperations[0])
if err == nil && resp.Leader != oldLeaderId {
t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader)
return
}
time.Sleep(100 * time.Millisecond)
}
})
cancel()

t.Log("Writing a key/value pair")
_, err = client.Put(ctx, "foo", "bar")
require.NoError(t, err)

t.Log("Sleeping 30 seconds")
time.Sleep(30 * time.Second)

t.Log("Remove the failpoint 'raftBeforeSave'")
err = epc.Procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave")
require.NoError(t, err)

// By default, etcd tries to revoke leases every 7 seconds.
t.Log("Sleeping 10 seconds")
time.Sleep(10 * time.Second)

t.Log("Confirming the lease isn't revoked")
leases, err := client.Leases(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(leases.Leases))

t.Log("Waiting for the keepAlive goroutine to exit")
close(stopC)
<-doneC
}
8 changes: 8 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,14 @@ func (epc *EtcdProcessCluster) EndpointsV3() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsV3() })
}

func (epc *EtcdProcessCluster) EndpointsGRPC() []string {
ivanvc marked this conversation as resolved.
Show resolved Hide resolved
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsGRPC() })
}

func (epc *EtcdProcessCluster) EndpointsHTTP() []string {
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsHTTP() })
}

func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret []string) {
for _, p := range epc.Procs {
ret = append(ret, f(p)...)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestV3LeaseFailover(t *testing.T) {

// send keep alive to old leader until the old leader starts
// to drop lease request.
var expectedExp time.Time
expectedExp := time.Now().Add(5 * time.Second)
for {
if err = lac.Send(lreq); err != nil {
break
Expand Down
Loading