diff --git a/etcdserver/server.go b/etcdserver/server.go index f5807243ee6..f9bd1acabcc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -622,8 +622,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { if srv.Cfg.EnableLeaseCheckpoint { // setting checkpointer enables lease checkpoint feature. - srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { + if !srv.ensureLeadership() { + if lg := srv.getLogger(); lg != nil { + lg.Warn("Ignore the checkpoint request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(srv.ID()))) + } else { + plog.Warningf("Ignore the checkpoint request because current member %d isn't a leader", uint64(srv.ID())) + } + return lease.ErrNotPrimary + } + srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) + return nil }) } @@ -1098,7 +1109,23 @@ func (s *EtcdServer) run() { func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { s.goAttach(func() { + // We shouldn't revoke any leases if current member isn't a leader, + // because the operation should only be performed by the leader. When + // the leader gets blocked on the raft loop, such as writing WAL entries, + // it can't process any events or messages from raft. It may think it + // is still the leader even the leader has already changed. + // Refer to https://github.com/etcd-io/etcd/issues/15247 lg := s.Logger() + if !s.ensureLeadership() { + if lg != nil { + lg.Warn("Ignore the lease revoking request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.ID()))) + } else { + plog.Warningf("Ignore the lease revoking request because current member %d isn't a leader", uint64(s.ID())) + } + return + } + // Increases throughput of expired leases deletion process through parallelization c := make(chan struct{}, maxPendingRevokes) for _, curLease := range leases { @@ -1135,6 +1162,38 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { }) } +// ensureLeadership checks whether current member is still the leader. +func (s *EtcdServer) ensureLeadership() bool { + lg := s.Logger() + + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) + defer cancel() + if err := s.linearizableReadNotify(ctx); err != nil { + if lg != nil { + lg.Warn("Failed to check current member's leadership", zap.Error(err)) + } else { + plog.Warningf("Failed to check current member's leadership: %s", err) + } + + return false + } + + newLeaderId := s.raftStatus().Lead + if newLeaderId != uint64(s.ID()) { + if lg != nil { + lg.Warn("Current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.ID())), + zap.Uint64("new-lead", newLeaderId)) + } else { + plog.Warningf("Current member %d isn't a leader (new-lead=%d)", uint64(s.ID()), newLeaderId) + } + + return false + } + + return true +} + func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 955399fc8df..a1040aca63a 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -285,6 +285,17 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { if s.isLeader() { + // If s.isLeader() returns true, but we fail to ensure the current + // member's leadership, there are a couple of possibilities: + // 1. current member gets stuck on writing WAL entries; + // 2. current member is in network isolation status; + // 3. current member isn't a leader anymore (possibly due to #1 above). + // In such case, we just return error to client, so that the client can + // switch to another member to continue the lease keep-alive operation. + if !s.ensureLeadership() { + return -1, lease.ErrNotPrimary + } + if err := s.waitAppliedIndex(); err != nil { return 0, err } diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index a157f57b68e..7f9742ccda6 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -730,7 +730,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - var expectedExp time.Time + expectedExp := time.Now().Add(5 * time.Second) for { if err = lac.Send(lreq); err != nil { break diff --git a/lease/lessor.go b/lease/lessor.go index a60f4666ef4..02718b94e59 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -78,7 +78,7 @@ type RangeDeleter func() TxnDelete // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to // avoid circular dependency with mvcc. -type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) +type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error type LeaseID int64 @@ -424,7 +424,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { // By applying a RAFT entry only when the remainingTTL is already set, we limit the number // of RAFT entries written per lease to a max of 2 per checkpoint interval. if clearRemainingTTL { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil { + return -1, err + } } le.mu.Lock() @@ -660,7 +662,9 @@ func (le *lessor) checkpointScheduledLeases() { le.mu.Unlock() if len(cps) != 0 { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil { + return + } } if len(cps) < maxLeaseCheckpointBatchSize { return diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 264a1aa1819..5280be0751e 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -248,10 +248,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer os.RemoveAll(dir) le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) - fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) } + return nil } defer le.Stop() // Set checkpointer @@ -538,7 +539,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) - le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { + le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error { close(checkpointedC) if len(lc.Checkpoints) != 1 { t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints)) @@ -547,6 +548,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { if c.Remaining_TTL != 1 { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } + return nil }) _, err := le.Grant(1, 2) if err != nil { diff --git a/tests/e2e/v3_lease_no_proxy_test.go b/tests/e2e/v3_lease_no_proxy_test.go new file mode 100644 index 00000000000..776b63ce3d2 --- /dev/null +++ b/tests/e2e/v3_lease_no_proxy_test.go @@ -0,0 +1,156 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/testutil" +) + +// TestLeaseRevoke_IgnoreOldLeader verifies that leases shouldn't be revoked +// by old leader. +// See the case 1 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093. +func TestLeaseRevoke_IgnoreOldLeader(t *testing.T) { + testLeaseRevokeIssue(t, true) +} + +// TestLeaseRevoke_ClientSwitchToOtherMember verifies that leases shouldn't +// be revoked by new leader. +// See the case 2 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093. +func TestLeaseRevoke_ClientSwitchToOtherMember(t *testing.T) { + testLeaseRevokeIssue(t, false) +} + +func testLeaseRevokeIssue(t *testing.T, connectToOneFollower bool) { + defer testutil.AfterTest(t) + + ctx := context.Background() + + t.Log("Starting a new etcd cluster") + epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{ + clusterSize: 3, + goFailEnabled: true, + goFailClientTimeout: 40 * time.Second, + }) + require.NoError(t, err) + defer func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }() + + leaderIdx := epc.WaitLeader(t) + t.Logf("Leader index: %d", leaderIdx) + + epsForNormalOperations := epc.procs[(leaderIdx+2)%3].EndpointsGRPC() + t.Logf("Creating a client for normal operations: %v", epsForNormalOperations) + client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second}) + require.NoError(t, err) + defer client.Close() + + var epsForLeaseKeepAlive []string + if connectToOneFollower { + epsForLeaseKeepAlive = epc.procs[(leaderIdx+1)%3].EndpointsGRPC() + } else { + epsForLeaseKeepAlive = epc.EndpointsGRPC() + } + t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive) + clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second}) + require.NoError(t, err) + defer clientForKeepAlive.Close() + + resp, err := client.Status(ctx, epsForNormalOperations[0]) + require.NoError(t, err) + oldLeaderId := resp.Leader + + t.Log("Creating a new lease") + leaseRsp, err := client.Grant(ctx, 20) + require.NoError(t, err) + + t.Log("Starting a goroutine to keep alive the lease") + doneC := make(chan struct{}) + stopC := make(chan struct{}) + startC := make(chan struct{}, 1) + go func() { + defer close(doneC) + + respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID) + require.NoError(t, kerr) + // ensure we have received the first response from the server + <-respC + startC <- struct{}{} + + for { + select { + case <-stopC: + return + case <-respC: + } + } + }() + + t.Log("Wait for the keepAlive goroutine to get started") + <-startC + + t.Log("Trigger the failpoint to simulate stalled writing") + err = epc.procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`) + require.NoError(t, err) + + cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId) + executeUntil(cctx, t, func() { + for { + resp, err = client.Status(ctx, epsForNormalOperations[0]) + if err == nil && resp.Leader != oldLeaderId { + t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader) + return + } + time.Sleep(100 * time.Millisecond) + } + }) + cancel() + + t.Log("Writing a key/value pair") + _, err = client.Put(ctx, "foo", "bar") + require.NoError(t, err) + + t.Log("Sleeping 30 seconds") + time.Sleep(30 * time.Second) + + t.Log("Remove the failpoint 'raftBeforeSave'") + err = epc.procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave") + require.NoError(t, err) + + // By default, etcd tries to revoke leases every 7 seconds. + t.Log("Sleeping 10 seconds") + time.Sleep(10 * time.Second) + + t.Log("Confirming the lease isn't revoked") + leases, err := client.Leases(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(leases.Leases)) + + t.Log("Waiting for the keepAlive goroutine to exit") + close(stopC) + <-doneC +}